Implementing messaging via MassTransit

Sooner or later, many projects are faced with the problem of messaging. Often messages must be exchanged between elements of distributed systems using different approaches and technologies. In modern systems, to solve this problem, as a rule, message buses are used, which make it possible to abstract the work with messages for various system components. Therefore, when we were faced with the task of implementing such an approach from scratch, all we had to do was choose the appropriate solution. We decided to share this basic implementation and describe how to get started with the message bus on the .NET Core platform.

The initial data was simple: working under .NET and working with RabbitMQ, since these are the tools we use on most of our projects.

One of the most popular solutions for .NET is MassTransit. This is exactly what we chose. The library is a message service bus, which is an abstraction over most popular message brokers (Azure Service Bus, RabbitMQ, Kafka, etc.), which allows developers not to spend a lot of time on a specific implementation, but rather focus on functionality tasks.

First, let’s look at what MassTransit is and how it generally works.

To do this, let’s try to implement the simplest messaging using the library.

In our case, the task was to receive technical objects via a message bus. Let’s see how we can implement this using MassTransit.

To work, our application must contain several required elements:

  1. Message. Definitely, we will need the message itself, which we want to send and deliver to the desired address.

Messages in MassTransit come in two basic types: Commands and Events, and are implemented using .NET reference types: records, interfaces, or classes.

Commands are messages that tell a service to do something and typically there is one recipient (Consumer) for each command.

Events are messages that notify that something has happened and that can be sent to multiple subscribed recipients at once.

  1. Someone has to send the message. And MassTransit provides two main sending methods: Send and Publish.

A message sent using Send is delivered to a specific address. Typically, commands are sent using Send.

A message published through Publish is distributed to all subscribers who subscribed to that message type. For example, as an event notification

  1. Consumer. And finally, it would be nice for the message to reach someone. Otherwise, what is it all for? In MassTransit, you can subscribe to one or more types of messages. To do this, the subscriber class must inherit the IConsumer<TMessage> interface and implement its Task Consume(ConsumeContext<TMessage> context) method.

We connect MassTransit in the Program.cs file. For this example, we use the In-Memory transfer mechanism, which is quite suitable for the demo version, since it does not require the use of a specific message broker. And, when we launch the application, we see that MessageConsumer and output to the console successfully receive messages sent via Publish.

Looks good, MassTransit works. But to solve our problem (as, perhaps, most real development problems), we will still have to modify something.

The main problem with this implementation is that In-Memory in MassTransit can only run within a single process. That is, firstly, the lifetime of messages is limited by the work of the process, and, secondly, if we want to divide the sending and receiving of messages between processes (and we want to), this tool will not suit us. This greatly limits the scope of this approach.

Therefore, in our project we will need to spice up the implementation with a real message broker and minor improvements. We will use RabbitMq as a broker, since we are already working with it on our other projects. Let’s see what needs to be changed to work with it

Let’s implement a messaging mechanism via MassTransit with RabbitMQ.

Working with RabbitMQ via MassTransit is done using the MassTransit.RabbitMQ package.

To connect RabbitMQ via MassTransit, let’s slightly change Startup.cs:

When implementing a message broker, sending and receiving messages will be separated and processed by different applications. In these applications, we will place contracts for messages and consumers separately.

Let’s transfer our prepared messages to the Contracts project, which is a class library:

Let’s add a message publishing mechanism to our main project. Let’s do this using a controller that works with technical objects:

RabbitMQ supports several types of distribution mechanisms (Exchange) between the queue and the sender. MassTransit uses Fanout Exchange by default. This type of Exchange distributes all received messages among all queues that are subscribed to this message type. The MassTransit documentation states that this type of messaging was chosen because it is the most performant for RabbitMQ.

This option is quite suitable for our task, all that remains is to create a message subscriber and subscribe to messages of type TechnicalObjectUploaded. Accordingly, the final stage of messaging through RabbitMQ will be to implement message receiving.

To do this, let’s turn to a separately created project that stores our Consumers. Let’s place in it a class that subscribes to messages of type TechnicalObjectUploaded:

Also, we need to add a RabbitMQ connection to Startup.cs similar to the project with message publishers, with the difference that in this case we need to indicate that the project has consumers:

Done, you (and we!) are amazing! When we launch projects and send messages upon the CreateTechnicalObject request, we will see that the messages are placed in the corresponding RabbitMQ queue, from where they will later be received by the consumer from the Consumers project.

What if something goes wrong?

But what happens if an exception occurs in the consumer while receiving a message?

In this case, MassTransit provides a Fault<T> message, which is a generic contract and includes information about the error. If a FaultAddress is defined in Fault, then the error is sent directly to this address, otherwise, it is published to all consumers subscribed to the message.

Like a regular message, we can receive and process it. Let’s do this for our technical objects, selecting a separate consumer for this case:

By default, when an exception is caught, such messages are put into a separate queue, which has the _error postfix. That is, when sending a message to my-queue, the error will be added to the my-queue_error queue. If desired, this behavior can be customized for each specific case.

To summarize, MassTransit allowed us to create an implementation of two messaging approaches with minimal changes when changing approaches. This basic implementation will allow us to use the main messaging mechanism, adding the number of contracts and recipients of these contracts necessary for further tasks.