Building distributed apps in .NET with MassTransit

• 13 minutes read C#.NETDistributed appsMicroservicesMassTransitRabbitMQ

#

Introduction

So many apps that are being built today are part of distributed systems made up of two or more services that communicate with each other. They do so by sending messages, or publishing events. They are so called "Event-driven architectures".

Learning everything that enables this, the technologies and the patterns, might be cumbersome, in particular if you don't know where to start.

The goal of this article is to give you a starting point from which you can continue exploring building distributed applications in C#/.NET.

In this article will be talking about MassTransit, what it is about, and viewing some examples of how to use it.

We will be using RabbitMQ as our transport. It is easy to get started with using Docker. The contents of the Docker Compose files will be provided.

Contents

Here is what is covered in this article:

  1. What is a distributed app?
  2. What is asynchronous messaging?
  3. What is MassTransit?
  4. Getting started
    1. The app
  5. Basic concepts
  6. Publishing and consuming a message
    1. Message
    2. Consumer
    3. Publisher
    4. Running the app
  7. Publishing message from consumer
  8. Remote procedure call (RPC)
  9. Configuring endpoints manually
  10. Error handling and retry
  11. Consuming faults
  12. State machines and Sagas
  13. Transactional Outbox pattern
  14. Scaling vertically
  15. Testing in MassTransit
  16. Distributed tracing
  17. Using another transport
  18. Conclusion

What is a distributed app?

A distributed app is an application that consists of multiple services that work together. Hence the app is distributed across those multiple services.

Does it sound familiar? Yep, Microservices.

We need ways for these services to communicate - should we use HTTP or gRPC? But those create coupling between services? There is another option: Asynchronous messaging with brokers and queues. But there are so many types of brokers and APIs to learn to master. It takes time to learn the specifics of each.

That is where MassTransit comes in.

What is asynchronous messaging?

First we should explain what asynchronous communication is about.

Asynchronous messaging is a communication method where participants on both sides are allowed to pause and resume conversations on their own terms. The party that is sending a message does not have to wait for a connection and a response.

This is something entirely different from asynchronous code execution in await in C#.

Asynchronous messaging enables us the implement event-driven patterns, including event sourcing, within a system of multiple services that depend on each other.

There are multiple technologies that each allow for asynchronous messaging. Each with its own concepts. MassTransit supports many of the popular ones, both those that can run on-premise and those that are exclusively hosted in the cloud.

We will use RabbitMQ, which is a queue-based system. It can be explained as we are sending messages to exchanges in a post office, that then distribute the messages (copies) across dedicated postboxes, or queues, that receivers consume.

MassTransit will abstract all these details related to a "transport" away, but you should know about them.

What is MassTransit?

MassTransit is a modern open-source framework that facilitates asynchronous messaging between services. Some would call it a service bus. It certainly can be used that way, but is not limited to that. The main selling point is that you can build loosely coupled systems with little effort.

The framework provides a solid unified abstraction of producers and consumers, on top of transports such as RabbitMQ, Azure ServiceBus, Amazon SQS etc. In addition to that, it also come with functionality for error handling, retries, and scaling. There even is an experimental mode for using SQL Server for sending messages, instead of a message bus.

MassTransit is easy to get started with, since it requires just a few lines of code. And it is based on dependency injection.

I recommend you to look into their excellent documentation, as well as awesome videos produced by its creator and maintainer, Chris Patterson (@PhatBoyG).

Some of the samples have been put together specifically for this article, while some have been taken from MassTransit's own docs.

Getting started

Run this, to add the latest version of MassTransit to your application: (or an empty ASP.NET Core Web app: dotnet new web)

dotnet add package MassTransit.RabbitMQ

This will add MassTransit with RabbitMQ support to your project.

The app

using MassTransit;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.AddConsumers(typeof(Program).Assembly);

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("guest");
            h.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

var app = builder.Build();

app.MapGet("/hello", () => "Hello, World!");

app.Run();

Now we need a RabbitMQ instance to run against.

Here is a Docker Compose file that runs a container in Docker:

version: '3.4'
name: masstransit-test

services:
  rabbitmq:
    image: rabbitmq:management
    container_name: rabbitmq
    hostname: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      - RABBITMQ_ERLANG_COOKIE=SomeRandomStringHere
    volumes:
      - "./data/rabbitmq:/var/lib/rabbitmq"

Provided that you have Docker Desktop installed, run this to start RabbitMQ:

docker compose up -d

But if you run the app, it doesn't do anything yet. We need to add some producers and consumers.

Basic concepts

There are three central concepts to know about:

  • Message - The message (or data) being sent. We also talk about a contract.
  • Producer - Produces messages, i.e. publishes, or sends them.
  • Consumer - Consumes messages by performing some logic. It's a message handler.

Then there is the concept of a Transport, which is the means by which messages are being sent. In our case it is RabbitMQ, but there are others as mentioned, including Azure Service Bus.

A message do often take on the meaning of an event that is signifying that something has happened - as reflected in the name of the message, for example, OrderPlaced.

An event can either be "thin" - meaning it just contains details regarding what the event was concerned, or it might also contain state. The latter is referred to as Event-carried state transfer. There are many opinions on what road to go when designing events.

Publishing and consuming a message

The most basic thing you can do is to publish a message which can be consumed by one or more consumers. So let's add some things to our app that enable us to do just that.

Message

We first need to define a contract (message type) for our message.

The message contract is a record with properties that have explicit init setters. This enables for the properties to be deserialized. (Records don't have public setters by default)

public record SendMessage
{
    public required string Text { get; init; }
}

It is worth noting that messages, by convention, are identified by the full name of the type (namespace + type). That is unless you configure the binding manually. Coincidentally, you can redefine a message in another assembly as long as all instances have the same name, and are in the same namespace.

Message contracts are usually shared via common class libraries. But you may re-declare them if you prefer. Just make sure that they follow the rules mentioned above,

Consumer

The consumer that consumes the message looks like this:

namespace WebApp;

public sealed class SendMessageConsumer : IConsumer<SendMessage>
{
    public async Task Consume(ConsumeContext<SendMessage> context)
    {
        Console.WriteLine($"The message is: {context.Message.Text}");
    }
}

So how does MassTransit know about this consumer, and the message?

If we look at the code for our Web app, x.AddConsumers(typeof(Program).Assembly); will discover and register the consumers in the executing assembly - looking for classes that implement IConsumer<T>. And cfg.ConfigureEndpoints(context); will configure the RabbitMQ endpoints for us.

Publisher

What is now needed is somewhere to publish the message from, so we do it from an API endpoint. We inject and use the IPublishEndpoint to publish messages:

app.MapPost("/test", static async (string text, IPublishEndpoint publishEndpoint) => await publishEndpoint.Publish(new SendMessage { Text = text }))
    .WithName("WebApp_Test")
    .WithTags("WebApp");

The IPublishEndpoint is a scoped service.

Running the app

Here is a .http file that can be used to test the endpoint in Visual Studio, and VS Code:

@WebApp_HostAddress = http://localhost:<YOUR-PORT>

POST {{WebApp_HostAddress}}/test
Accept: application/json

"Hello, World!"

###

You can use this unless you want to setup Open API and test it in Swagger UI.

When you send that POST request to the endpoint, the message will be published to a queue in RabbitMQ. The consumer in the same app will receive the message, and write "The message is: Hello, World!" to the console.

RabbitMQ Management plugin

This is article is not about RabbitMQ, but you should know about this.

RabbitMQ has a Management plugin that provides a useful Web UI. In it you can view your exchanges and queues in real-time.

In your browser, navigate to: http://localhost:15672/

Provided that you are running the app, the UI will display at least one connection, an exchange, and a queue. Available to you is the current state of each queue. You can publish messages manually from here. You can also view statistics showing how many messages are being processed to predict throughput.

On a side note: MassTransit will clean-up resources in RabbitMQ when an app stops.

Where are the other services?

But... wait! Isn't this supposed to show how to build a distributed app? We need another service.

That's easy to fix! Just copy the project! 😊

Publishing message from consumer

When publishing a message from a consumer, you don't need to inject any extra service. The ConsumeContext itself provides the functionality for communicating with the bus.

This is what a modified version of our consumer would look like:

using MassTransit;

namespace WebApp;

public sealed class SendMessageConsumer : IConsumer<SendMessage>
{
    public async Task Consume(ConsumeContext<SendMessage> context)
    {
        Console.WriteLine($"The message is: {context.Message.Text}");

        await context.Publish(new MessageSent {
            Date = DateTimeOffset.UtcNow
        });
    }
}

In the other project you will define this consumer for the MessageSent message:

using MassTransit;

namespace WebApp;

public sealed class MessageSentConsumer : IConsumer<MessageSent>
{
    public async Task Consume(ConsumeContext<MessageSent> context)
    {
        Console.WriteLine($"The time was: {context.Message.Date}");
    }
}

Btw, here is the contract:

public record MessageSent
{
    public required DateTimeOffset Date { get; init; }
}

Remote procedure call (RPC)

You can do remote procedure calls (RPC), meaning that you send a request, and await a response back. Not unlike HTTP requests and responses, but still loosely coupled because of the inherent asynchrony of RabbitMQ as a transport.

You can obtain a client by injecting the IRequestClient<T> interface, where T is the message type. Then use the async GetResponse<TResponse> method to send a request and await the response.

using MassTransit;

namespace WebApp;

public sealed class GetTodosConsumer(ITodoService todoService) : IConsumer<GetTodos>
{
    public async Task Consume(ConsumeContext<GetTodos> context)
    {
        var todos = await todoService.GetTodosAsync(context.CancellationToken);

        consumeContext.RespondAsync<GetTodosResponse>(new GetTodosResponse {
            Todos = todos
        });
    }
}

This sample is not complete. The ITodoService class has been omitted as it is not relevant for this example.

The contracts:

public record Todo
{
    public required string Text { get; init; }

    public required bool Done { get; init; }
}

public record GetTodos();

public record GetTodosResponse
{
    public required IEnumerable<Todo> Todos { get; init; }
}

Here is the how you use the request client:

app.MapGet("/todos", static async (IRequestClient<GetTodos> requestClient) => {
    var response = await requestClient.GetResponse<GetTodosResponse>(new GetTodos());
    return response.Todos;
}
.WithName("WebApp_GetTodos")
.WithTags("WebApp");

Handling multiple response types

A RPC request might have multiple responds with different response types. This is how you would handle these responses from the client:

var response = await client.GetResponse<OrderStatusResult, OrderNotFound>(new { OrderId = id});

if (response.Is(out Response<OrderStatusResult> responseA))
{
    // do something with the order
}
else if (response.Is(out Response<OrderNotFound> responseB))
{
    // the order was not found
}

This is not a complete sample though.

Configuring endpoints manually

In the samples so far, endpoints have been configured automatically. But you can do it manually for the chosen transport.

Here we have declare a ReceiveEndpoint which maps to the queue "submit-order". (Spoilers) It has retries configured. The endpoint is handled by the consumer SubmitOrderConsumer.

services.AddMassTransit(x =>
{
    x.AddConsumer<SubmitOrderConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ReceiveEndpoint("submit-order", e =>
        {
            e.UseMessageRetry(r => r.Immediate(5));

            e.ConfigureConsumer<SubmitOrderConsumer>(context);
        });
    });

Error handling and Retry

It sometimes happens that a consumer fails for some reason, an exception is being thrown. For instance, because of the database failing. In that case you would want to retry running the consumer.

public class SubmitOrderConsumer :
    IConsumer<SubmitOrder>
{
    ISessionFactory _sessionFactory;

    public async Task Consume(ConsumeContext<SubmitOrder> context)
    {
        using(var session = _sessionFactory.OpenSession())
        using(var transaction = session.BeginTransaction())
        {
            var customer = session.Get<Customer>(context.Message.CustomerId);

            // continue with order processing

            transaction.Commit();
        }
    }
}

Retries can be configured like so:

services.AddMassTransit(x =>
{
    x.AddConsumer<SubmitOrderConsumer>();

    x.UsingRabbitMq((context,cfg) =>
    {
        cfg.UseMessageRetry(r => r.Immediate(5));

        cfg.ConfigureEndpoints(context);
    });
});

This will apply to all consumers. But you can apply them to specific ones as well.

You can even use exception filters to make MassTransit just handle certain exception types.

e.UseMessageRetry(r => 
{
    r.Handle<ArgumentNullException>();
    r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
    r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
});

Consuming faults

When a consumer has truly failed, you perhaps want to handle that fault. You do that by creating a consumer that consumes a message of Fault<T> where T is the message type.

public class DashboardFaultConsumer :
    IConsumer<Fault<SubmitOrder>>
{
    public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
    {
        // update the dashboard
    }
}

For RabbitMQ, MassTransit publishes faulted messages to consumer specific error queues. In the process, a message gets wrapped by the fault information as represented by Fault<T>. Imagine something similar happening when using another transport.

State machines and Sagas

MassTransit has its own state machine API, called Automatonymous, which allows you to define the state machines, with states and transitions in between, driven by the messages. The library can even be used standalone outside of MassTransit.

Automatonymous makes it easier to implement the Saga design pattern. A saga is a way to ensure data consistency across multiple services within a transaction.

There are two approaches to implementing sagas: choreography and orchestration. Both patterns are used to coordinate messages. Choreography is about each service knowing how to handle those states from the messages it receives. On the other hand, the orchestration pattern has a centralized point which orchestrates all messages sent between services. This could imply that a separate orchestrator service is running the state machine. Which approach to choose depends on the case.

This is an incomplete example (source) of what a state machine for an orchestrator looks like in MassTransit. It demonstrates the concepts of events and states within that state machine.

public interface SubmitOrder
{
    Guid OrderId { get; }

    DateTime OrderDate { get; }
}

public interface OrderAccepted
{
    Guid OrderId { get; }    
}

public class OrderState :
    SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }

    public DateTime? OrderDate { get; set; }
}

public interface OrderSubmitted
{
    Guid OrderId { get; }    
}

public class OrderSubmittedEvent :
    OrderSubmitted
{
    public OrderSubmittedEvent(Guid orderId)
    {
        OrderId = orderId;
    }

    public Guid OrderId { get; }    
}

public class OrderStateMachine :
    MassTransitStateMachine<OrderState>
{
    public OrderStateMachine()
    {
        Event(() => SubmitOrder, x => x.CorrelateById(context => context.Message.OrderId));

        Event(() => OrderAccepted, x => x.CorrelateById(context => context.Message.OrderId));

        Initially(
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate)
                .Publish(context => (OrderSubmitted)new OrderSubmittedEvent(context.Saga.CorrelationId))
                .TransitionTo(Submitted),
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Submitted,
            When(OrderAccepted)
                .TransitionTo(Accepted));

        During(Accepted,
            When(SubmitOrder)
                .Then(x => x.Saga.OrderDate = x.Message.OrderDate));
    }

    public State Submitted { get; private set; }
    
    public State Accepted { get; private set; }

    public Event<SubmitOrder> SubmitOrder { get; private set; }

    public Event<OrderAccepted> OrderAccepted { get; private set; }
}

Each service could have its own state machine, if your decide to do choreography. In the end, it is about where the state machine logic exists. Whether, or not, you desire to centralize the logic in an orchestrator.

Transactional Outbox pattern

The transactional outbox pattern solves the issue of consistency when both saving data and emitting events. You might want data to be saved to the database before publishing the events.

It stores the messages in an outbox (in a database) in the same transaction as the data. If the save succeeds, the messages will be published by a periodic process that is checking the outbox in a specific interval.

If you have domain events then this patterns is pretty much a requirement.

Scaling vertically

Using MassTransit you can easily scale services vertically.

Using a queue-based transport, having multiple instances (or replicas) of the same app, consuming the same queues, will cause the instances to compete for messages. This is how you get both scaling, and load balancing for free.

This will work for other transports, even if they are not necessarily using message queues.

Testing in MassTransit

MassTransit provides a TestHarness that is an in-memory implementation of a transport specifically built for testing. Using that you can verify that messages have been published and consumed.

Consider this unit test case (NUnit):

[Test] 
public async Task An_example_unit_test() 
{
    await using var provider = new ServiceCollection()
        .AddYourBusinessServices() // register all of your normal business services
        .AddMassTransitTestHarness(x =>
        {
            x.AddConsumer<SubmitOrderConsumer>();
        })
        .BuildServiceProvider(true);

    var harness = provider.GetRequiredService<ITestHarness>();

    await harness.Start();

    var client = harness.GetRequestClient<SubmitOrder>();

    var response = await client.GetResponse<OrderSubmitted>(new
    {
        OrderId = InVar.Id,
        OrderNumber = "123"
    });

    Assert.IsTrue(await harness.Sent.Any<OrderSubmitted>());

    Assert.IsTrue(await harness.Consumed.Any<SubmitOrder>());

    var consumerHarness = harness.GetConsumerHarness<SubmitOrderConsumer>();

    Assert.That(await consumerHarness.Consumed.Any<SubmitOrder>());

    // test side effects of the SubmitOrderConsumer here
}

Distributed tracing

Building distributed apps involves challenges when doing tracing and debugging communication between services - what services are being hit during the course of a request.

Distributed tracing alleviates that headache by collecting and putting this information together, so that it can be easily visualized in a tool like Jaeger, or Zipkin. You can easily see latencies, and get information about specific errors.

MassTransit has built in support for tracing with the OpenTelemetry standard.

To add tracing to your project, add these packages:

<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.3.2" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.0.0-rc9.6" />
<PackageReference Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.0.0-rc9.6" />
<PackageReference Include="OpenTelemetry.Instrumentation.Http" Version="1.0.0-rc9.6" />
<PackageReference Include="OpenTelemetry.Exporter.Zipkin" Version="1.4.0-alpha.2" />
<PackageReference Include="OpenTelemetry.Instrumentation.MassTransit" Version="1.0.0-beta.3" /

Add tracing to the service container:

app.Services.AddOpenTelemetry()
    .WithTracing(tracing =>
    {
        tracing.SetResourceBuilder(resource)
                .AddAspNetCoreInstrumentation()
                .AddHttpClientInstrumentation()
                .AddMassTransitInstrumentation()
                .AddSource("MassTransit")
                .AddZipkinExporter(zipkin =>
                {
                    var zipkinUrl = "http://localhost:9411";
                    zipkin.Endpoint = new Uri($"{zipkinUrl}/api/v2/spans");
                });
        //.AddConsoleExporter();
    });

Here is the Docker Compose:

version: '3.4'
name: masstransit-test

services:
  rabbitmq:
    image: rabbitmq:management
    container_name: rabbitmq
    hostname: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      - RABBITMQ_ERLANG_COOKIE=SomeRandomStringHere
    volumes:
      - "./data/rabbitmq:/var/lib/rabbitmq"

  zipkin:
    image: openzipkin/zipkin
    container_name: zipkin
    ports:
      - 9411:9411

Run it:

docker compose up -d

You can now open your browser, and navigate to: http://localhost:9411/zipkin/

There you can query the requests and follow the flow of operations, including where messages are being passed.

Beyond this, you can add additional packages that provide metrics for your applications, which can be viewed in tools like Grafana. But that is not in scope for this article.

Using another transport

So what if you want to use another transport? You might want to run your app in the cloud using Azure Service Bus.

It is easy to just switch out RabbitMQ for something else, as it is just a matter of installing the provider and declaring what provider to use. Then MassTransit will configure the rest for you.

For Azure Service Bus you add this to your project file

<PackageReference Include="MassTransit.Azure.ServiceBus.Core" Version="8.1.1" />

Then configure like so:

builder.Services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.AddConsumers(typeof(Program).Assembly);

    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host($"sb://{builder.Configuration["Azure:ServiceBus:Namespace"]}.servicebus.windows.net");

        cfg.ConfigureEndpoints(context);
    });
});

This article is not about how you configure Azure Service Bus.

Using different transports depending on environment

This piece of code shows you how to configure MassTransit to use RabbitMQ locally, but Azure Service Bus when in production:

builder.Services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.AddConsumers(typeof(Program).Assembly);

    if (builder.Environment.IsProduction())
    {
        x.UsingAzureServiceBus((context, cfg) =>
        {
            cfg.Host($"sb://{builder.Configuration["Azure:ServiceBus:Namespace"]}.servicebus.windows.net");

            cfg.ConfigureEndpoints(context);
        });
    }
    else
    {
        x.UsingRabbitMq((context, cfg) =>
        {
            var rabbitmqHost = builder.Configuration["RABBITMQ_HOST"] ?? "localhost";

            cfg.Host(rabbitmqHost, "/", h =>
            {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.ConfigureEndpoints(context);
        });
    }
});

Conclusion

We have seen how to use MassTransit to build distributed apps based on asynchronous messaging. You have been introduced to the basic concepts, as well as as how to handle errors. We also covered sagas that are implemented using MassTransits own library for building state machines.

If you liked this article, please leave a like. And comment below if you have any questions.

Marina Sundström

  • A personal website and blog about life and software development
  • Code is provided with no warranty.
Social

Back to top

© 2024 Marina Sundström • Built with .NET & Blazor - Hosted by GitHub Pages

An unhandled error has occurred. Reload 🗙