Poor Man's Publish-Subscribe WCF Service

Tags: pubsub, Architecture, WCF, ChannelFactory, netTCPBinding

I'm going to describe a WCF implementation of a publish-subscribe pattern that is used here at a medium-sized organization where I'm a lead developer. For reasons I won't go into MSMQ and the cloud were not options for us. Yet we needed durability, reliability, and all of the goodness that comes with a service bus. So we rolled our own "poor man's" pub sub (or observer pattern) service a simplified version of which I'll describe here.

First a big shout out to Juval Loewy whose book Programming WCF Services describes a publish-subscribe framework. It's well thought out and I highly recommend it. My implementation is inspired by his framework. I'm going to assume you already know a good deal about WCF but if you don't get Loewy's book.

This diagram explains the pub sub architecture. A message or event is raised and sent to the publisher. The publisher in turn is responsible for notifying all of the subscribers of that event. In this way the originator of the message is decoupled from the subscribers.

I organized the WCF publisher service into three projects: 

  • Publisher
  • Publisher.Contracts
  • Publisher.Domain

Publisher is the main WCF service application with two files: Publisher.svc and Web.config. 

Publisher.Contracts contains the service and operation contracts for the Publisher service. And following Miguel Castro's advice I put all domain functionality, the models, repositories, and Entity Framework wiring into the separate Publisher.Domain project. This project is ignorant of WCF and can be reused in other solutions. I won't go into all of the EF plumbing but let's look at the Subscription model:

public class Subscription
{
    public int ID { get; set; }
    public string Name { get; set; }
    public string EventCode { get; set; }
    public string Address { get; set; }
    public string EndpointIdentity { get; set; }
}

By way of example if you wanted to simulate a SubscriptionRepository class to serve up test subscriptions here is one:

public class SubscriptionRepository
{
    public static IList<Subscription> GetSubscriptions(string eventCode)
    {
        return GetMockSubscriptions()
            .Where(s => s.EventCode == eventCode.Trim().ToUpper())
            .ToList();
    }

    private static IEnumerable<Subscription> GetMockSubscriptions()
    {
        return new[]
        {
            new Subscription()
            {
                ID = 1,
                Name = "Subscriber 1",
                EventCode = "EVENT_A",
                Address = "net.tcp://foo.com/Subscriber1/Subscriber.svc",
                EndpointIdentity = "serviceaccountuser@domain.com"
            },
            new Subscription()
            {
                ID = 2,
                Name = "Subscriber 1",
                EventCode = "EVENT_B",
                Address = "net.tcp://foo.com/Subscriber1/Subscriber.svc",
                EndpointIdentity = "serviceaccountuser@domain.com"
            },
            new Subscription()
            {
                ID = 3,
                Name = "Subscriber 2",
                EventCode = "EVENT_B",
                Address = "net.tcp://bar.com/Subscriber2/Subscriber.svc",
                EndpointIdentity = "serviceaccountuser@domain.com"
            },
        };
    }
}

So as you can see we have subscriptions and a repository to return them based on the event code passed in. The idea with the publish-subscribe pattern is that every publisher will have zero or more subscribers. In this example there is only one subscriber who wants to be notified when event code A is published. But there are two subscribers who want to be notified when event code B is published. Notice the Address and EndpointIdentity properties of a Subscription. That will come in handy later because given those two pieces of information we can create a channel to call the subscriber.

Let's look at the Publisher.Contracts project and the brains behind it all. The service contract does one thing and and one thing only:

[ServiceContract]
public interface INotification
{
    [OperationContract]
    void Notify(string eventCode);
}

Pretty straightforward. Before I describe the service itself let's look at a base class from which the service will derive. This base class has one job: given a Subscription build and return a communication channel that the publisher can use to communicate with the subscriber service.

public abstract class PublisherBase<T> where T : class
{
    internal static T BuildSubscriberClientChannel(Subscription subscriber)
    {
        var binding = CreateNetTcpBinding();
        var endpoint = CreateEndpointAddress(subscriber);
        return ChannelFactory<T>.CreateChannel(binding, endpoint);
    }

    internal static NetTcpBinding CreateNetTcpBinding()
    {
        var binding = new NetTcpBinding
        {
            ReliableSession = { Enabled = true },
            TransactionFlow = false,
            Security = { Mode = SecurityMode.Transport }
        };

        binding.Security.Transport.ClientCredentialType = TcpClientCredentialType.Windows;
        binding.Security.Transport.ProtectionLevel = ProtectionLevel.EncryptAndSign;
        binding.Security.Message.ClientCredentialType = MessageCredentialType.Windows;
        return binding;
    }

    internal static EndpointAddress CreateEndpointAddress(Subscription subscriber)
    {
        var uri = new Uri(subscriber.Address);
        var identity = EndpointIdentity.CreateUpnIdentity(subscriber.EndpointIdentity);
        return new EndpointAddress(uri, identity, new AddressHeaderCollection());
    }
}

For the sake of simplicity in this post, I'm making some design-time decisions. All of our subscribers must use the net.tcp protocol, authenticate with Windows Kerberos security, and communicate over an encrypted transport. The identity used to authenticate the call (serviceaccountuser@domain.com) is a user principal name that your sysadmin will need to configure on the IIS server:

    setspn -a HTTP/domain.com DOMAIN\serviceaccountuser
    setspn -a HTTP/server DOMAIN\serviceaccountuser

Like I said earlier I'm going to assume you're up to speed on WCF security. See Loewy's book or MSDN for selecting transport and credential types in your particular situation. I  can tell you that the security here is perfect for an intranet environment. By choosing transport security as our security mode the channel over which the messages are sent is both encrypted and signed. I have not seen performance issues with this configuration. And it's way better than the old days when an ASMX service had to send and receive SOAP packets over a much slower SSL. Other than the decision to use NetTcpBinding in an intranet scenario I'm using the ChannelFactory class to generate the communication channel we'll need to call the subscriber. 

Last, we need to create a PublisherService that implements our INotification service contract (the Notify operation). But I want to do it in such a way that the caller is not hanging out waiting a long time for the request-reply operation to complete. If there are 12 subscribers for an event then I don't want the caller blocking and taking up that connection while our publisher service generates 12 channels and calls each subscriber before a TimeoutException occurs. I could have configured a one-way operation and perhaps you'll want to do that. But for our purposes here I'm using a simple delegate function to pass off the work to another thread so I can respond back to the client right away.

[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public class PublisherService : PublisherBase<INotification>, INotification
{
    public delegate void PublishEventDelegate(string eventCode);

    public void Notify(string eventCode)
    {
        var eventDelegate = new PublishEventDelegate(PublishEvent);
        eventDelegate.BeginInvoke(eventCode, null, null);
    }

    private static void PublishEvent(string eventCode)
    {
        var subscribers = SubscriptionRepository.GetSubscriptions(eventCode);
        foreach (var subscriber in subscribers)
        {
            try
            {
                var channel = BuildSubscriberClientChannel(subscriber);
                using (channel as IDisposable)
                {
                    channel.Notify(eventCode);
                }
            }
            catch
            {
                Debug.WriteLine("Unable to publish event to subscriber");
            }
        }
    }
}

That's pretty much it. The PublisherService receives an event code (a very generic payload in this case, perhaps a more robust data contract in your situation) and for each subscription to that event creates a communication channel and calls the Notify operation. Notice the catch block. What I did in my implementation was build in some durability there instead of just throwing it away. Perhaps I'll describe that in another blog post. 

Here's my service configuration in Web.config:

<service
    name="Publisher.Contracts.PublisherService"
    behaviorConfiguration="publisherServiceBehavior">
    <endpoint
      address="net.tcp://localhost/Publisher/Publisher.svc"
      binding="netTcpBinding"
      contract="Publisher.Contracts.INotification"
      bindingConfiguration="reliableTCPBinding" />
    <endpoint
      address="mex"
      binding="mexTcpBinding"
      contract="IMetadataExchange"
    />
</service>

There's not much to the binding because the security was done programmatically above in the PublisherBase class:

<bindings>
  <netTcpBinding>
    <binding name="reliableTCPBinding">
      <reliableSession enabled="true" />
    </binding>
  </netTcpBinding>
</bindings>

The service behavior is straight out of the box:

<behaviors>
  <serviceBehaviors>
    <behavior name="publisherServiceBehavior">
      <serviceMetadata 
          httpGetEnabled="true" 
          policyVersion="Policy12" />
      <serviceDebug 
          httpHelpPageEnabled="true" 
          includeExceptionDetailInFaults="true" />
    </behavior>
  </serviceBehaviors>
</behaviors>

We now have a bare-bones secure publish-subscribe service. There's room for improvement. For instance Loewy's implementation supports concurrent publishing of the events with a WaitCallback delegate function. So if it's important for all of the subscribers to get notifications in near real time then you'll want to take a look at that. Message durability is also missing in this example. What I did was to save the failed message for that subscriber in a local store. Then I have a process to check for failed messages at regular intervals and to try to resend them. That's the "poor man" part of this implementation because you get that for free with MSMQ. But it works like a champ so I'm not going to complain.

4 Comments

Comments have been disabled for this content.