While working on a microservice architecture system you will definitely come to a point when you need to communicate with other services. You can either call these service:
- Synchronously: Calling Rest API of other service and exchange data through JSON response.
- Asynchronously: Using a 3rd party message queue services (like Rabbit MQ, Apache Kafka, AWS Simple Queue Service) and handle data when a message request is received.
It is actually ideal to use the asynchronous path because there are many disadvantages for synchronous communication, such as:
- If your service URL is changed you need to update all the other services with the new service URL
- If your services make chain calls then the response time will grow and that will make your application very unresponsive.
Today I'm going to show you how you can implement asynchronous communication in your .NET 6 service applications. We are going to use Amazon Simple Queue Service, a fully managed message queuing for microservices, distributed systems, and serverless applications. First, we need a queue. You can create one by following this guideline: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/step-create-queue.html
I have create a queue called neuron-blast-queue. Next, let's open Visual Studio 2022 and create a new Web API project:
Now, Let's create a Services folder, create an interface IMessageHandler.cs and an implementation class MessageHander.cs. We also need a DTO class MessageResquest.cs which will hold our request information. Declare a method Task HandleMessage(MessageResquest request) in your IMessageHandler interface. Our code will look like this:
// MessageRequest.cs namespace AwsSqsDemo.Services { public class MessageRequest { public RequestTypes RequestType { get; set; } public string RequestData { get; set; } = default!; } public enum RequestTypes { UserCreation = 1 } } // IMessageHandler.cs namespace AwsSqsDemo.Services { public interface IMessageHandler { Task HandleMessage(MessageRequest request); } } // MessageHandler.cs namespace AwsSqsDemo.Services { public class MessageHandler : IMessageHandler { public async Task HandleMessage(MessageRequest request) { throw new NotImplementedException(); } } }
Okay, now let's add AWSSDK.SQS nuget package in our project:
Let's create a class AwsSqsClient.cs which will handle all our SQS related tasks like sending or receiving messages:
using Amazon; using Amazon.Runtime; using Amazon.SQS; using Amazon.SQS.Model; using AwsSqsDemo.Services; using System.Text.Json; namespace AwsSqsDemo.Utils { public class AwsSqsClient { private readonly AmazonSQSClient _client; public AwsSqsClient() { // These values as well as other configurations related values should be retrived from appsettings.json, I've hardcoded these for demo purpose var awsCreds = new BasicAWSCredentials("AccessKey", "SecretKey"); _client = new AmazonSQSClient(awsCreds, RegionEndpoint.APSoutheast1); } public async Task PublishAsync(MessageRequest messageBody) { try { var request = new SendMessageRequest() { QueueUrl = "neuron-blast-queue-url", MessageBody = JsonSerializer.Serialize(messageBody) }; var response = await _client.SendMessageAsync(request); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { Console.WriteLine($"Message added to queue: {response.MessageId}"); } else { Console.WriteLine($"Message {response.MessageId} was not sent to queue. Please check"); } } catch (Exception ex) { Console.WriteLine(ex.Message); } } public async TaskReceiveAsync(CancellationToken cancellationToken) { var responseObject = new MessageRequest() { RequestType = RequestTypes.Empty }; try { var request = new ReceiveMessageRequest() { QueueUrl = "neuron-blast-queue-url", }; var response = await _client.ReceiveMessageAsync(request, cancellationToken); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { var message = response.Messages.FirstOrDefault(); if (message != null) { if (!string.IsNullOrEmpty(message.Body)) responseObject = JsonSerializer.Deserialize (message.Body) ?? responseObject; await _client.DeleteMessageAsync("neuron-blast-queue-url", message.ReceiptHandle, cancellationToken); } } else { Console.WriteLine($"Message could not be received. Please check"); } } catch (Exception ex) { Console.WriteLine(ex.Message); } return responseObject; } } }
Now we need a background job processor which will listen to SQS queue and handle our requests whenever there's a message available. I'm going to create a class Worker.cs which will inherit a system class BackgroundService (IHostedService implementation). This our code:
using AwsSqsDemo.Services; using AwsSqsDemo.Utils; namespace AwsSqsDemo { public class Worker : BackgroundService { private readonly AwsSqsClient _client; private readonly IServiceScopeFactory _scopeFactory; public Worker(IServiceScopeFactory scopeFactory) { _client = new AwsSqsClient(); _scopeFactory = scopeFactory; } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { var response = await _client.ReceiveAsync(cancellationToken); try { using (var scope = _scopeFactory.CreateScope()) { var messageHandler = scope.ServiceProvider.GetRequiredService(); await messageHandler.HandleMessage(response); } } catch (Exception ex) { Console.WriteLine(ex.Message); } } } } }
And this is the code for HandleMessage method:
namespace AwsSqsDemo.Services { public class MessageHandler : IMessageHandler { public async Task HandleMessage(MessageRequest request) { try { switch (request.RequestType) { case RequestTypes.UserCreation: //Handle User Creation Logic Here break; } } catch (Exception ex) { Console.WriteLine(ex.Message); } } } }
Now, let's send a message to the queue. For test purpose I'm sending a message from UserController.cs:
using AwsSqsDemo.Services; using AwsSqsDemo.Utils; using Microsoft.AspNetCore.Mvc; using System.Text.Json; namespace AwsSqsDemo.Controllers { [Route("api/[controller]")] [ApiController] public class UserController : ControllerBase { [HttpPost] public async TaskCreateUser() { try { var client = new AwsSqsClient(); await client.PublishAsync(new MessageRequest() { RequestType = RequestTypes.UserCreation, RequestData = JsonSerializer.Serialize(new { Name = "Bishal Sarker" }) }); return Ok(); } catch(Exception ex) { return BadRequest(ex.Message); } } } }
Finally, inject all the services in Program.cs:
That's all people! Let me know about thoughts in the comments. Thanks.