Dec 11 Keith | C#, Dev, Rx

Rx Event Aggregator

Ah the Event Aggregator, you either love it or you hate it. I tend to hate it 95% of the time and like it on very rare occasions. One such time was during a port of some old Prism code, another was for use building a status bar. At least RX can spice it up.

The interface I use:

using System;

namespace GeoTagger.Core
{
    public interface IEventAggregator
    {
        IObservable<T> GetEventStream<T>();
        void Publish<T>(T payload);
    }
}

The implementation:

using System;
using System.Collections.Generic;
using System.Disposables;
using System.Linq;
using GeoTagger.Core.Extension;

namespace GeoTagger.Core
{
    public class EventAggregator : IEventAggregator
    {
        private readonly object _observablesByTypeKeyLock = new object();

        private readonly Dictionary<string, Tuple<object, object>> _observablesByTypeKey = new Dictionary<string, Tuple<object, object>>();

        public IObservable<T> GetEventStream<T>()
        {
            IObservable<T> stream;
            var key = typeof(T).ToString();

            lock (_observablesByTypeKeyLock)
            {
                if (_observablesByTypeKey.ContainsKey(key))
                {
                    Tuple<object, object> tuple = _observablesByTypeKey[key];
                    stream = (IObservable<T>)tuple.Item2;
                }
                else
                {
                    Type specificSubjectType = typeof(Subject<>).MakeGenericType(new[] { typeof(T) });
                    var subject = (Subject<T>)Activator.CreateInstance(specificSubjectType, new object[] { });

                    var removeEventStreamFromCache = Disposable.Create(
                        () =>
                        {
                            lock (_observablesByTypeKeyLock)
                            {
                                _observablesByTypeKey.Remove(key);
                            }
                        }
                    );

                    stream = subject.AddDisposable(removeEventStreamFromCache).Publish().RefCount();

                    var tuple = new Tuple<object, object>(subject, stream);
                    _observablesByTypeKey.Add(key, tuple);
                }
            }
            return stream;
        }

        public void Publish<T>(T payload)
        {
            var key = typeof(T).ToString();
            
            Tuple<object, object> tuple;
            
            lock (_observablesByTypeKeyLock)
                _observablesByTypeKey.TryGetValue(key, out tuple);
            
            if (tuple != null)
            {
                ((Subject<T>)tuple.Item1).OnNext(payload);
            }
        }
    }
}

Tests I hear you say, well I did write couple, handy so help you see how it’s used.

using System;
using GeoTagger.Core;
using NUnit.Framework;

namespace GeoTagger.Tests.Core
{
    [TestFixture]
    public class EventAggregatorTests
    {
        [Test]
        public void Publish_PublishesWhenThereAreSubscribers()
        {
            var expected = "Keith";
            string actual = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<string>()
                .Subscribe(
                    payload =>
                    {
                        actual = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual);
        }

        [Test]
        public void Publish_DoesNotThrowWithNoSubscribers()
        {
            var eventAggregator = new EventAggregator();
            eventAggregator.Publish("Boo");
        }

        [Test]
        public void Publish_WithMutipleSubscribersPayloadDeliveredToAll()
        {
            var expected = new object();
            object actual1 = null, actual2 = null;

            var eventAggregator = new EventAggregator();

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            eventAggregator.GetEventStream<Object>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );


            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);
        }

        [Test]
        public void PublishAgainstDisposedSubscriber_ExpectNoPayloadDelevered()
        {
            int expected = 5;
            int actual1 = 0, actual2 = 0;

            var eventAggregator = new EventAggregator();

            var disposable1 = eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual1 = payload;
                    }
                );

            // note we won't dispose this one 
            eventAggregator.GetEventStream<int>()
                .Subscribe(
                    payload =>
                    {
                        actual2 = payload;
                    }
                );

            eventAggregator.Publish(expected);

            Assert.AreEqual(expected, actual1);
            Assert.AreEqual(expected, actual2);

            disposable1.Dispose();

            eventAggregator.Publish(6);

            Assert.AreEqual(expected, actual1);

            // the second subscription should remain
            Assert.AreEqual(6, actual2);
        }
    }
}

Under the covers the EventAggregator manages observable streams. Steams are keyed by payload type and callers of GetEventStream() will receive the same underlying stream. Publishing and ref counting these streams ensures that there will only be one underlying. Once all callers dispose their streams the implementation will remove the stream from its cache. If a caller call Publish() but there are no subscribers, nothing happens.

2 blog posts in 1 day, personal record lol.

Comments (2) -

Mar 24 John Metta

John Metta

Where's System.Disposables? I thought it was in System.Reactive, but I don't seem to be able to compile your code because System.Disposables is missing.

Mar 26 Keith Woods

Keith Woods

Its in System.CoreEx.dll, comes with the RX distro.

Add comment




  Country flag
biuquote
  • Comment
  • Preview
Loading


About the author

Keith Woods

Keith Woods works for Lab49, a consulting firm that builds advanced solutions for the financial services industry.