Enterprise Application: Basic Pub/Sub (Publish/Subscribe) in C#, Java, and Golang using AWS SNS

Estimated read time 4 min read

Certainly! Implementing a basic Pub/Sub (Publish/Subscribe) system involves creating a publisher that sends messages and one or more subscribers that receive and process those messages. Below are simplified examples of Pub/Sub implementations for C#, Java, and Golang using AWS SNS (Simple Notification Service) for publishing and AWS SQS (Simple Queue Service) for subscribing.

C# Pub/Sub with AWS SNS and SQS:

using Amazon;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;
using System;

class Program
{
    static void Main()
    {
        string accessKey = "Your_AWS_Access_Key";
        string secretKey = "Your_AWS_Secret_Key";
        string topicArn = "Your_SNS_Topic_ARN";
        string queueUrl = "Your_SQS_Queue_URL";

        var snsClient = new AmazonSimpleNotificationServiceClient(accessKey, secretKey, RegionEndpoint.USWest2);
        var sqsClient = new AmazonSQSClient(accessKey, secretKey, RegionEndpoint.USWest2);

        // Create an SNS topic
        var createTopicResponse = snsClient.CreateTopic(new CreateTopicRequest { Name = "MyTopic" });
        var topicArn = createTopicResponse.TopicArn;

        // Create an SQS queue
        var createQueueResponse = sqsClient.CreateQueue(new CreateQueueRequest { QueueName = "MyQueue" });
        var queueUrl = createQueueResponse.QueueUrl;

        // Subscribe the queue to the topic
        snsClient.Subscribe(new SubscribeRequest { TopicArn = topicArn, Protocol = "sqs", Endpoint = queueUrl });

        // Publish a message to the topic
        snsClient.Publish(new PublishRequest { TopicArn = topicArn, Message = "Hello from C# Pub/Sub" });

        // Receive and process messages from the queue (subscriber)
        var receiveMessageResponse = sqsClient.ReceiveMessage(new ReceiveMessageRequest { QueueUrl = queueUrl });

        foreach (var message in receiveMessageResponse.Messages)
        {
            Console.WriteLine("Received message: " + message.Body);
        }

        // Delete the topic and queue when done (cleanup)
        snsClient.DeleteTopic(new DeleteTopicRequest { TopicArn = topicArn });
        sqsClient.DeleteQueue(new DeleteQueueRequest { QueueUrl = queueUrl });
    }
}

Java Pub/Sub with AWS SNS and SQS:

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreateTopicRequest;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageRequest;

public class PubSubExample {
    public static void main(String[] args) {
        String accessKey = "Your_AWS_Access_Key";
        String secretKey = "Your_AWS_Secret_Key";
        String region = "us-west-2";
        String topicName = "MyTopic";
        String queueName = "MyQueue";

        BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);

        AmazonSNS snsClient = AmazonSNSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("sns." + region + ".amazonaws.com", region))
                .build();

        AmazonSQS sqsClient = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(credentials))
                .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration("sqs." + region + ".amazonaws.com", region))
                .build();

        // Create an SNS topic
        String topicArn = snsClient.createTopic(new CreateTopicRequest(topicName)).getTopicArn();

        // Create an SQS queue
        String queueUrl = sqsClient.createQueue(new CreateQueueRequest(queueName)).getQueueUrl();

        // Subscribe the queue to the topic
        snsClient.subscribe(topicArn, "sqs", queueUrl);

        // Publish a message to the topic
        snsClient.publish(new PublishRequest(topicArn, "Hello from Java Pub/Sub"));

        // Receive and process messages from the queue (subscriber)
        String receiptHandle = sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages().get(0).getReceiptHandle();
        System.out.println("Received message: " + sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl)).getMessages().get(0).getBody());

        // Delete the topic and queue when done (cleanup)
        snsClient.deleteTopic(topicArn);
        sqsClient.deleteQueue(queueUrl);
    }
}

Golang (Go) Pub/Sub with AWS SNS and SQS:

package main

import (
	"fmt"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/sns"
	"github.com/aws/aws-sdk-go/service/sqs"
)

func main() {
	accessKey := "Your_AWS_Access_Key"
	secretKey := "Your_AWS_Secret_Key"
	region := "us-west-2"
	topicName := "MyTopic"
	queueName := "MyQueue"

	creds := credentials.NewStaticCredentials(accessKey, secretKey, "")

	sess := session.Must(session.NewSession(&aws.Config{
		Region:      aws.String(region),
		Credentials: creds,
	}))

	snsClient := sns.New(sess)
	sqsClient := sqs.New(sess)

	// Create an SNS topic
	createTopicOutput, err := snsClient.CreateTopic(&sns.CreateTopicInput{Name: &topicName})
	if err != nil {
		fmt.Println("Error creating topic:", err)
		return
	}
	topicArn := *createTopicOutput.TopicArn

	// Create an SQS queue
	createQueueOutput, err := sqsClient.CreateQueue(&sqs.CreateQueueInput{QueueName: &queueName})
	if err != nil {
		fmt.Println("Error creating queue:", err)
		return
	}
	queueUrl := *createQueueOutput.QueueUrl

	// Subscribe the queue to the topic
	_, err = snsClient.Subscribe(&sns.SubscribeInput{
		Protocol: aws.String("sqs"),
		Endpoint: &queueUrl,
		TopicArn: &topicArn,
	})
	if err != nil {
		fmt.Println("Error subscribing queue to topic:", err)
		return
	}

	// Publish a message to the topic
	_, err = snsClient.Publish(&sns.PublishInput{
		TopicArn: &topicArn,
		Message:  aws.String("Hello from Golang Pub/Sub"),
	})
	if err != nil {
		fmt.Println("Error publishing message:", err)
		return
	}

	// Receive and process messages from the queue (subscriber)
	receiveMessageOutput, err := sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{QueueUrl: &queue

Related Articles