Implementing messaging via MassTransit
Implementing messaging via MassTransit
Sooner or later, many projects are faced with the problem of messaging. Often messages must be exchanged between elements of distributed systems using different approaches and technologies. In modern systems, to solve this problem, as a rule, message buses are used, which make it possible to abstract the work with messages for various system components. Therefore, when we were faced with the task of implementing such an approach from scratch, all we had to do was choose the appropriate solution. We decided to share this basic implementation and describe how to get started with the message bus on the .NET Core platform.
The initial data was simple: working under .NET and working with RabbitMQ, since these are the tools we use on most of our projects.
One of the most popular solutions for .NET is MassTransit. This is exactly what we chose. The library is a message service bus, which is an abstraction over most popular message brokers (Azure Service Bus, RabbitMQ, Kafka, etc.), which allows developers not to spend a lot of time on a specific implementation, but rather focus on functionality tasks.
First, let’s look at what MassTransit is and how it generally works.
To do this, let’s try to implement the simplest messaging using the library.
In our case, the task was to receive technical objects via a message bus. Let’s see how we can implement this using MassTransit.
To work, our application must contain several required elements:
- Message. Definitely, we will need the message itself, which we want to send and deliver to the desired address.
Messages in MassTransit come in two basic types: Commands and Events, and are implemented using .NET reference types: records, interfaces, or classes.
Commands are messages that tell a service to do something and typically there is one recipient (Consumer) for each command.
1 2 3 4 5 6 7 8 9 |
// Example of a command message for evaluating a technical object public record CreateTechnicalObjectAssessment( string RiskCategoryName, uint SortingOrder, bool IsApplicable, string Justification); |
Events are messages that notify that something has happened and that can be sent to multiple subscribed recipients at once.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
// Message - event about receiving a technical object public record TechnicalObjectUploaded( Guid Id, string Name, string ManufacturerName, string ManufacturerModel, string Code, Guid? ParentId, DateTime UploadedAt); |
- Someone has to send the message. And MassTransit provides two main sending methods: Send and Publish.
A message sent using Send is delivered to a specific address. Typically, commands are sent using Send.
A message published through Publish is distributed to all subscribers who subscribed to that message type. For example, as an event notification
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
public class MessagePublisher : BackgroundService { // Connecting the IBus interface to publish messages private readonly IBus _bus; public MessagePublisher(IBus bus) { _bus = bus; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { while (!stoppingToken.IsCancellationRequested) { // Publish a message await _bus.Publish(new TechnicalObjectUploaded( Guid.NewGuid(), "Установка Л-24/6", "Цех №1", "КМ-24521-Т", "25м", Guid.NewGuid(), DateTime.Now), stoppingToken); await Task.Delay(10000); } } } |
- Consumer. And finally, it would be nice for the message to reach someone. Otherwise, what is it all for? In MassTransit, you can subscribe to one or more types of messages. To do this, the subscriber class must inherit the IConsumer<TMessage> interface and implement its Task Consume(ConsumeContext<TMessage> context) method.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// Subscriber to messages of type TechnicalObjectUploaded public class TechnicalObjectsConsumer : IConsumer<TechnicalObjectUploaded> { private readonly ILogger<TechnicalObjectsConsumer> _logger; public TechnicalObjectsConsumer(ILogger<TechnicalObjectsConsumer> logger) { _logger = logger; } // When receiving a message of type TechnicalObjectUploaded, display information about the technical object public Task Consume(ConsumeContext<TechnicalObjectUploaded> context) { _logger.LogInformation("New technical object {Id} was uploaded at {Time}", context.Message.Id, context.Message.UploadedAt); return Task.CompletedTask; } } |
We connect MassTransit in the Program.cs file. For this example, we use the In-Memory transfer mechanism, which is quite suitable for the demo version, since it does not require the use of a specific message broker. And, when we launch the application, we see that MessageConsumer and output to the console successfully receive messages sent via Publish.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
// Add MassTransit builder.Services.AddMassTransit(x => { // We indicate that all subscribers of this assembly are suitable for us x.AddConsumers(typeof(Program).Assembly); // In this example we use the InMemory implementation x.UsingInMemory((context, cfg) => { cfg.ConfigureEndpoints(context); }); }); builder.Services.AddHostedService<MessagePublisher>(); |
Looks good, MassTransit works. But to solve our problem (as, perhaps, most real development problems), we will still have to modify something.
The main problem with this implementation is that In-Memory in MassTransit can only run within a single process. That is, firstly, the lifetime of messages is limited by the work of the process, and, secondly, if we want to divide the sending and receiving of messages between processes (and we want to), this tool will not suit us. This greatly limits the scope of this approach.
Therefore, in our project we will need to spice up the implementation with a real message broker and minor improvements. We will use RabbitMq as a broker, since we are already working with it on our other projects. Let’s see what needs to be changed to work with it
Let’s implement a messaging mechanism via MassTransit with RabbitMQ.
Working with RabbitMQ via MassTransit is done using the MassTransit.RabbitMQ package.
To connect RabbitMQ via MassTransit, let’s slightly change Startup.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// Add MassTransit builder.Services.AddMassTransit(x => { // By default MassTransit uses Pascal case. // Since in most cases Kebab case will be used in endpoints, // let's point this out explicitly. x.SetKebabCaseEndpointNameFormatter(); // We change InMemory to RabbitMq and specify the data for connecting to the server. // Otherwise, the connection is similar to InMemory, so its implementation is in MassTransit // нdeliberately replicates the behavior of RabbitMq x.UsingRabbitMq ((context, cfg) => { // Here are the standard settings for connecting to RabbitMqcfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); |
When implementing a message broker, sending and receiving messages will be separated and processed by different applications. In these applications, we will place contracts for messages and consumers separately.
Let’s transfer our prepared messages to the Contracts project, which is a class library:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
// A message notifying us that a new technical object has been received public record TechnicalObjectUploaded( Guid Id, string Name, string ManufacturerName, string ManufacturerModel, string Code, Guid? ParentId, DateTime UploadedAt); |
Let’s add a message publishing mechanism to our main project. Let’s do this using a controller that works with technical objects:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
[ApiController] [Authorize] public class TechnicalObjectsController : Controller { // Add a message publishing mechanism // Note that this time we are using the IPublishEndpoint interface // Unlike IBus used in the previous article, IPublishEndpoint // Allows controller request scope to be processed // At the same time, IBus is implemented as a Singleton, which is not suitable for a controller request private readonly IPublishEndpoint _publishEndpoint; private readonly IMediator _mediator; private readonly IMapper _mapper; public OrdersController(IPublishEndpoint publishEndpoint, IMediator mediator, IMapper mapper) { _publishEndpoint = publishEndpoint; _mediator = mediator; _mapper = mapper; } /// <summary> /// Create a technical object. /// </summary> [HttpPost("create-technical-object")] public async Task CreateTechnicalObject([FromBody] CreateTechnicalObjectCommandInfo commandInfo, CancellationToken cancellationToken) { var command = _mapper.Map<CreateTechnicalObjectCommand>(commandInfo); var createdObject = await _mediator.Send(command, cancellationToken); // We publish a message about creation using the method Publish await _publishEndpoint.Publish<TechnicalObjectUploaded>(new { Id = createdObject.Id, Name = createdObject.Name, ManufacturerName = createdObject.ManufacturerName, ManufacturerModel = createdObject.ManufacturerModel, Code = createdObject.Code, ParentId = createdObject.ParentId, UploadedAt = DateTime.Now }); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
[ApiController] [Authorize] public class TechnicalObjectsController : Controller { // Add a message publishing mechanism // Note that this time we are using the IPublishEndpoint interface // Unlike IBus used in the previous article, IPublishEndpoint // Allows controller request scope to be processed // At the same time, IBus is implemented as a Singleton, which is not suitable for a controller request private readonly IPublishEndpoint _publishEndpoint; private readonly IMediator _mediator; private readonly IMapper _mapper; public OrdersController(IPublishEndpoint publishEndpoint, IMediator mediator, IMapper mapper) { _publishEndpoint = publishEndpoint; _mediator = mediator; _mapper = mapper; } /// <summary> /// Create a technical object. /// </summary> [HttpPost("create-technical-object")] public async Task CreateTechnicalObject([FromBody] CreateTechnicalObjectCommandInfo commandInfo, CancellationToken cancellationToken) { var command = _mapper.Map<CreateTechnicalObjectCommand>(commandInfo); var createdObject = await _mediator.Send(command, cancellationToken); // We publish a message about creation using the method Publish await _publishEndpoint.Publish<TechnicalObjectUploaded>(new { Id = createdObject.Id, Name = createdObject.Name, ManufacturerName = createdObject.ManufacturerName, ManufacturerModel = createdObject.ManufacturerModel, Code = createdObject.Code, ParentId = createdObject.ParentId, UploadedAt = DateTime.Now }); } } |
RabbitMQ supports several types of distribution mechanisms (Exchange) between the queue and the sender. MassTransit uses Fanout Exchange by default. This type of Exchange distributes all received messages among all queues that are subscribed to this message type. The MassTransit documentation states that this type of messaging was chosen because it is the most performant for RabbitMQ.
This option is quite suitable for our task, all that remains is to create a message subscriber and subscribe to messages of type TechnicalObjectUploaded. Accordingly, the final stage of messaging through RabbitMQ will be to implement message receiving.
To do this, let’s turn to a separately created project that stores our Consumers. Let’s place in it a class that subscribes to messages of type TechnicalObjectUploaded:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// Subscriber to messages like TechnicalObjectUploaded public class TechnicalObjectUploadedConsumer : IConsumer<TechnicalObjectUploaded> { private readonly ILogger<TechnicalObjectUploadedConsumer> _logger; public MessageConsumer(ILogger<TechnicalObjectUploadedConsumer> logger) { _logger = logger; } // When receiving a message like TechnicalObjectUploadedConsumer, display data about a new technical object public Task Consume(ConsumeContext<TechnicalObjectUploaded> context) { _logger.LogInformation("New technical object {Id} was uploaded at {Time}", context.Message.Id, context.Message.UploadedAt); return Task.CompletedTask; } } |
Also, we need to add a RabbitMQ connection to Startup.cs similar to the project with message publishers, with the difference that in this case we need to indicate that the project has consumers:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
// Add MassTransit builder.Services.AddMassTransit(x => { x.SetKebabCaseEndpointNameFormatter(); // Adding consumers var assembly = typeof(Program).Assembly; x.AddConsumers(assembly); x.UsingRabbitMq((context, cfg) => { //Here are the standard settings for connecting to RabbitMq cfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); |
Done, you (and we!) are amazing! When we launch projects and send messages upon the CreateTechnicalObject request, we will see that the messages are placed in the corresponding RabbitMQ queue, from where they will later be received by the consumer from the Consumers project.
What if something goes wrong?
But what happens if an exception occurs in the consumer while receiving a message?
In this case, MassTransit provides a Fault<T> message, which is a generic contract and includes information about the error. If a FaultAddress is defined in Fault, then the error is sent directly to this address, otherwise, it is published to all consumers subscribed to the message.
Like a regular message, we can receive and process it. Let’s do this for our technical objects, selecting a separate consumer for this case:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
// We create a separate consumer to receive error messages about technical creation object public class TechnicalObjectsFaultConsumer : IConsumer<Fault<TechnicalObjectUploaded>> { private readonly ILogger<TechnicalObjectsFaultConsumer> _logger; public TechnicalObjectsFaultConsumer(ILogger<TechnicalObjectsFaultConsumer> logger) { _logger = logger; } // When the Fault<T> event occurs, we intercept the error message public Task Consume(ConsumeContext<Fault<TechnicalObjectUploaded>> context) { var error = context.Message.Exceptions[0]; _logger.LogError(error.Message); return Task.CompletedTask; } } |
By default, when an exception is caught, such messages are put into a separate queue, which has the _error postfix. That is, when sending a message to my-queue, the error will be added to the my-queue_error queue. If desired, this behavior can be customized for each specific case.
To summarize, MassTransit allowed us to create an implementation of two messaging approaches with minimal changes when changing approaches. This basic implementation will allow us to use the main messaging mechanism, adding the number of contracts and recipients of these contracts necessary for further tasks.
Related Posts
Leave a Reply Cancel reply
Service
Categories
- DEVELOPMENT (102)
- DEVOPS (53)
- FRAMEWORKS (25)
- IT (24)
- QA (14)
- SECURITY (13)
- SOFTWARE (13)
- UI/UX (6)
- Uncategorized (8)