.NET / Unity向けの新たなメッセージングライブラリ「ZeroMessenger」をリリースしました!今回もOSSとしてGithubで公開しています。
これは何かというと、インメモリでのPublish/SubscribeパターンやMessengerパターンなどの実装に特化したライブラリです。MessagePipeやUniRxのMessageBroker<T>のようなものといえばわかりやすいでしょうか。
細かい使い方はREADMEの方に載っていますが、基本的にはC#erにとって馴染みのあるAPIになっているので、以下のコードで雰囲気は十分理解できるんじゃないでしょうか。
using System;
using ZeroMessenger;
// メッセージを購読
var subscription = MessageBroker<Message>.Default.Subscribe(x =>
{
Console.WriteLine(x.Text);
});
// メッセージを発行
MessageBroker<Message>.Default.Publish(new Message("Hello!"));
// 購読を解除
subscription.Dispose();
// メッセージに使用する型
public record struct Message(string Text) { }
UniRxを使ったことがある方にはわかりやすいと思います。というのも、ZeroMessengerのMessageBroker<T>の基本的なAPIはUniRx.MessageBroker<T>そのまんまだからです。UniRxの後継ライブラリであるR3にはMessageBroker<T>に当たる機能が存在しないため、ZeroMessengerは代替として十分選択肢に挙げられるでしょう。また、MessageBroker<T>はインスタンス化して使うこともできるため、RxのSubject<T>のようなbetter eventとしての用途にも利用可能です。
もちろん、MessagePipeのようなDIコンテナ上でのPub/Subもサポートしています。以下はGeneric Hostと組み合わせて使用するサンプルです。(AddZeroMessenger()を利用するにはZeroMessenger.DependencyInjectionパッケージが必要です)
using ZeroMessenger;
using ZeroMessenger.DependencyInjection;
Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
// Zero Messengerを追加する
services.AddZeroMessenger();
services.AddSingleton<ServiceA>();
services.AddSingleton<ServiceB>();
})
.Build()
.Run();
public record struct Message(string Text) { }
public class ServiceA
{
IMessagePublisher<Message> publisher;
public ServiceA(IMessagePublisher<Message> publisher)
{
this.publisher = publisher;
}
public Task SendAsync(CancellationToken cancellationToken = default)
{
publisher.Publish(new Message("Hello!"));
}
}
public class ServiceB : IDisposable
{
IDisposable subscription;
public ServiceB(IMessageSubscriber<Message> subscriber)
{
subscription = subscriber.Subscribe(x =>
{
Console.WriteLine(x);
});
}
public void Dispose()
{
subscription.Dispose();
}
}
こちらではIMessagePublisher<T> / IMessageSubscriber<T>を用いてPublishとSubscribeを行います。PublishAsync/SubscribeAwaitといった非同期APIも用意されていて、そちらも同じインターフェースで扱うことが可能です。
MessageBroker<T>はこの両方を実装しているため、event的な使い方をする場合はIMessageSubscriber<T>として公開すると良いでしょう。
class Foo
{
MessageBroker<int> onFoo = new();
// 購読機能だけを公開
public IMessageSubscriber<int> OnFoo => onFoo;
...
}
ベンチマーク・最適化手法
パフォーマンスについても見ていきましょう。以下は8つのSubscriberに対するPublish呼び出しのベンチマークです。
見ての通り、MessagePipeやR3よりもさらに高速です。当然、Publish呼び出しにおける余計なアロケーションは一切ありません。(“Zero”Messengerですから…)
さらに、Publishの速度だけでなくSubscribeについても最速・省アロケーションとなるように最適化されています。
SubscribeのパフォーマンスはPublishほど重要ではありませんが、アプリケーション起動時のパフォーマンスに関わるので速いに越したことはないでしょう。また、最小限のアロケーションとなるように調整されているので、メモリ消費量も少なく済みます。
実装のポイントはIMessageSubscriberとMessageHandler/AsyncMessageHandlerの定義です。
public interface IMessageSubscriber<T>
{
IDisposable Subscribe(MessageHandler<T> handler);
IDisposable SubscribeAwait(AsyncMessageHandler<T> handler, AsyncSubscribeStrategy subscribeStrategy = AsyncSubscribeStrategy.Sequential);
}
public abstract class MessageHandler<T> : MessageHandlerNode<T>
{
public void Handle(T message) { ... }
}
public abstract class AsyncMessageHandler<T> : MessageHandlerNode<T>
{
public ValueTask HandleAsync(T message, CancellationToken cancellationToken) { ... }
}
MessageHandler/AsyncMessageHandlerは抽象クラスであり、ハンドラ自身がSubscriberの連結リストのノードとして動作するようになっています。ハンドラを保持する配列が必要ないため、メモリ消費量は最小になります。列挙の性能が若干犠牲になっていますが、Subscriberの抱えるハンドラは多くでも10個程度であることがほとんどなので、大きな差にはならないはずです。
さらにMessageHandlerNode<T>はIDisposableであり、購読管理を同時に行えるようになっています。これによりSubscriptionのアロケーションが不要になるため、さらにメモリ消費量を削減できます。
このあたりの最適化はR3のReactiveProperty<T>の実装をパクt…参考にしています。ただし、ZeroMessengerはRxとは異なりオペレータ等を実装する必要はないため、拡張性を犠牲に連結リストと購読管理をHandler自体に統合したよりアグレッシブな最適化を実現しています。
そのため同一のハンドラを別のSubscriberに渡すことができないという制約があります(例外が発生します)が、実際Subscribeにはラムダ式によるAnonymousMessageHandlerを渡すケースが大半を占めるため、実用上問題となることはないでしょう。
Filter (middleware)
ZeroMessengerはメッセージの前後をフックするための機能としてFilterをサポートしています。これはASP.NET CoreのミドルウェアやMessagePipe・MagicOnionのFilterに当たる機能で、async decoratorパターンによる柔軟なカスタマイズを可能にします。
// 前後にログを挟むFilter
public class LoggingFilter<T> : IMessageFilter<T>
{
public async ValueTask InvokeAsync(T message, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> next)
{
Console.WriteLine("Before");
await next(message, cancellationToken);
Console.WriteLine("After");
}
}
作成したFilterはMessageBroker<T>に追加したり、DIでグローバルに追加したりできるほか、Subscribe毎の追加も可能です。Filterの適用順は追加順と同じになります。
// MessageBroker<T>に追加
broker.AddFilter<LoggingFilter<T>>();
// DIコンテナに追加
Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
services.AddZeroMessenger(messenger =>
{
// 型を指定して追加
messenger.AddFilter<LoggingFilter<Message>>();
// open genericsで追加
messenger.AddFilter(typeof(LoggingFilter<>));
});
})
.Build()
.Run();
// Subscribe時に追加
subscriber
.WithFilter<LoggingFilter<Message>>()
.Subscribe(x => { });
Unity対応
ZeroMessengerはUnityにも対応しています。R3などと同様、NuGetForUnity経由でNuGetパッケージをインストールしてもらえればOKです。
さらに、ZeroMessengeer.DependenyInjectionと同様の拡張をVContainer向けにも用意しています。こちらはUnityのPackage Manager経由でインストールが可能です。
using VContainer;
using VContainer.Unity;
using ZeroMessenger.VContainer;
public class ExampleLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// ZeroMessengerを追加
builder.AddZeroMessenger();
}
}
vs MessagePipe
とまあ色々紹介してきましたが、.NET / Unity向けのメッセージングライブラリには既に有名なものとしてCysharpさんのMessagePipeがあります。パフォーマンスはともかく、わざわざ新しいライブラリを出すことに意味はあるのか?というと、あります。
MessagePipeは機能も豊富でパフォーマンスにも優れた素晴らしいライブラリですが、根幹のAPI設計がやや古く、インメモリでの単純なPub/Subを行う上では微妙に不便な部分があります。個人的に扱いづらいと感じているのがインターフェースの定義。同期/非同期やキー付き、Bufferedが全て別のインターフェースとして分割されており、実装の際にはこれらの使い分けを考えなければなりません。実際にはこれらを区別せず、Publisher/Subscriberという括りで統一的に扱いたい場面の方が多いでしょう。
そこでZeroMessengerではインターフェースをIMessagePublisher<T> / IMessageSubscriber<T>の2つに完全に統合し、同期/非同期のAPIを統一的に扱えるようにしました。また、R3のSubscribeAwaitと同様のメソッドを用意することで、購読における非同期処理の扱いやすさが向上しています。キーによるフィルタリングについてはSubscriberのWithFilterで代替可能です。
また、リリースされたのがR3以前ということもあり、R3と併用する際の噛み合わせが良くないという問題もあります。(R3対応のAsObservable拡張がない、DisposableBagが重複しているなど)
ZeroMessengerではR3向けの拡張パッケージを標準で提供しており、より効率的なToObservableやSubscribeToPublish/SubscribeAwaitToPublishなどのオペレータも用意しています。プロジェクトでR3と併用したい場面ではこちらの方が強力でしょう。
vs Reactive Extensions
ZeroMessengerにはMessageBroker<T>による”better event”、Observerパターンの実装としての側面もありますが、その用途としては既にReactive Extensionsが存在します。Rxと比較すると機能は限定されていますが、Push型の非同期シーケンスである点や、購読の扱い(Subscribeによる購読、IDisposable.Disposeによる購読解除)などに関しては概ね同じです。
また、MessageBroker<T>は完了の概念が存在しないSubject<T>とみなすことができます。要するにRxRelayと同じようなものであり、OnErrorやOnCompletedで完了しないことが保証されるため、ステートレスで扱いやすいeventとして利用できます。またベンチマークからもわかる通り、余計な機能が含まれないぶんパフォーマンス的にも有利です。
ただまあ実際のところは、イベントとしての用途に関してはやはりR3を利用するという方針で良いと思います。Rxが備えた豊富なオペレータは非常に強力ですし、Rx.NETはパフォーマンスやasync/awaitとの連携に難がありますが、R3はasync/awaitに対応したオペレータが用意されており、パフォーマンスも十分です。
そのため、通常のイベントにはR3のSubjectやReactiveProperty等を利用し、よりスコープの広いPub/Subの実装にはZeroMessengerを利用するというように使い分けると良いでしょう。必要があればZeroMessenger.R3の機能を用いてObservableに変換することも可能です。
まとめ
Pub/Subによる広いスコープでのメッセージングはRxだけではやや不足気味で、やっぱりこういった専用のライブラリがあった方が、特にDIを多用するプロジェクトでは綺麗に書けるんじゃないかなあという気はしています。PublishAsyncによる購読処理の待機やグローバルなFilterの実現はRxでは不可能であり、それでいてPub/Subを利用する上では欲しい機能でしょう。
というわけでZeroMessenger、かなりいい感じになっていると思うので、ぜひ使ってみてください…!