Real-World PubSub Messaging with RabbitMQ

Tags: pubsub, Architecture, WebAPI

In my previous post I discussed using RabbitMQ with ASP.NET Web Api subscribers. Now I'd like to expand on that a bit by using a more interesting real-world message along with a message processor on the client side.

But first let me explain why I went with Web Api instead of Windows Services. I know that Windows Services are the best practice for backend processing of messages on the network. However, I've got one eye on the cloud. I'd have to port a Windows Service over to an Azure WebJob. A Web Api can go straight up to Azure. More importantly, I want to use RESTful web service endpoints in a message-driven architecture and I want to be to call those same endpoints from another process if need be. This fits perfectly with the DRY principle and a Microservices approach.

In the RabbitPublisher of the previous post the message body was nothing more than the current time. Now I want to replace that with a complex type as my message payload. So I'll add this Message class:

Code:
namespace RabbitPublisher
{
    public class Message
    {
        public string EventCode { get; set; }
        public string Key { get; set; }
        public string Value { get; set; }
    }
}

This message type is pretty generic. It describes an event in the system and a key-value pair that allows the subscriber to call a service and fetch additional data if necessary. Next in the Program.cs file I'll add a little message generator method to simulate a random message coming out of the system. For this simple example, each message describes one of three states a widget can be in: spinning up, spinning down, or malfunctioning.

Code:
/// <summary>
/// Simulate a random message from the system
/// </summary>
/// <returns></returns>
private static byte[] BuildMessage()
{
    var message = new Message();
    var r = new Random().Next(0, 2);
    switch (r)
    {
        case 0:
            message.EventCode = "WIDGET-SPINUP";
            message.Key = "ID";
            message.Value = "111";
            break;

        case 1:
            message.EventCode = "WIDGET-SPINDOWN";
            message.Key = "ID";
            message.Value = "333";
            break;

        default:
            message.EventCode = "WIDGET-MALFUNCTION";
            message.Key = "ID";
            message.Value = "222";
            break;
    }

    var serializedMessage = JsonConvert.SerializeObject(message);
    return Encoding.UTF8.GetBytes(serializedMessage);
}

I had to add the Json.NET nuget package to use the JsonConvert class. Last, I call this method from within Main in order to get a random message each time I run:

Code:
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare(exchange: "test-exchange", 
            type: "fanout", 
            durable: true);

        //var message = "Current time: " + DateTime.Now.ToLongTimeString(); 
        //var body = Encoding.UTF8.GetBytes(message); 
        var body = BuildMessage();

        channel.BasicPublish(exchange: "test-exchange",
            routingKey: "",
            basicProperties: null,
            body: body);
    }
}

That's it for the publisher. Each time I run the publisher example I generate one of the three random messages and publish it to RabbitMQ. Now let's turn to the RabbitSubscriber. I want to process the message differently depending upon the event that was raised. For example, if my widget is spinning up I might want to monitor its rotational speed. If my widget is spinning down I might want to lock it's axle when it comes to a full stop. And of course if it is malfunctioning I might have a whole series of other things I'd like to do. Yes, I'm totally making this up.

So I need a different processor depending upon the event received. This is where the factory method pattern comes into play. I created this folder structure on the RabbitSubscriber along with the following classes:

The IMessageHandlerProcessor interface is very straightforward:

Code:
using RabbitSubscriber.Models;

namespace RabbitSubscriber.MessageHandlers
{
    public interface IMessageHandlerProcessor
    {
        string Process(Message message);
    }
}

Each concrete handler might take the message and process it very differently. For this simple example each one will merely return a custom message to show the message that it received:

Code:
using RabbitSubscriber.Models;

namespace RabbitSubscriber.MessageHandlers.ConcreteHandlers
{
    public class WidgetSpinupMessageHandler : IMessageHandlerProcessor
    {
        public string Process(Message message)
        {
            return string.Format("Received spinup message for ID {0}", message.Value);
        }
    }
}

using RabbitSubscriber.Models;

namespace RabbitSubscriber.MessageHandlers.ConcreteHandlers
{
    public class WidgetSpindownMessageHandler : IMessageHandlerProcessor
    {
        public string Process(Message message)
        {
            return string.Format("Received spindown message for ID {0}", message.Value);
        }
    }
}

using RabbitSubscriber.Models;

namespace RabbitSubscriber.MessageHandlers.ConcreteHandlers
{
    public class WidgetMalfunctionMessageHandler : IMessageHandlerProcessor
    {
        public string Process(Message message)
        {
            return string.Format("Received malfunction message for ID {0}", message.Value);
        }
    }
}

For the MessageHandlerProcessor (which I could have easily called MessageHandlerFactory I suppose) I will use a basic switch statement to return an IMessageHandlerProcessor implementation based on the event code in the message itself:

Code:
using System;
using RabbitSubscriber.MessageHandlers.ConcreteHandlers;
using RabbitSubscriber.Models;

namespace RabbitSubscriber.MessageHandlers
{
    public static class MessageHandlerProcessor
    {
        public static IMessageHandlerProcessor Create(Message message)
        {
            switch (message.EventCode)
            {
                case "WIDGET-SPINUP":
                    return new WidgetSpinupMessageHandler();
                    
                case "WIDGET-SPINDOWN":
                    return new WidgetSpindownMessageHandler();

                case "WIDGET-MALFUNCTION":
                    return new WidgetMalfunctionMessageHandler();

                default:
                    throw new Exception("Unknown message event code!");
            }
        }
    }
}

Yes I'm one of those paranoid types who throw exceptions in the default section. You never know. I have one last change to the RabbitSubscriber. Remember the ConsumerOnReceived delegate function from the previous blog example? We're receiving more than just a timestamp now. So I need to change it to call the handler factory and get back a concrete processor for the message. I'll still write out to the Debug output window as before:

Code:
private static void ConsumerOnReceived(object sender, BasicDeliverEventArgs ea)
{
    var serializedMessage = Encoding.UTF8.GetString(ea.Body);
    var message = JsonConvert.DeserializeObject<Message>(serializedMessage);
    var handler = MessageHandlerProcessor.Create(message);
    var result = handler.Process(message);
    Debug.WriteLine(result);
}

When I publish random messages from the RabbitPublisher these show up in the subscriber:

Simple and maintainable code.