コンテンツへスキップ

【UniRx】UnityではじめるReactive Extensions

  • UniRx

今回の記事はReactive Extensions (Rx)について。

Unityで開発をする上で、UniRxは必要不可欠と言っても過言ではないほど強力なライブラリです。が、UniRx、というよりRxそのものが持つ複雑さ故に、C#初心者には勧めにくいライブラリでもあります。とりあえず便利だからと勧められて、よくわからないまま何となくSubjectやReactivePropertyを使っている人も結構いるんじゃないでしょうか…?

というわけで今回の記事では「Rxとは何か?」という基礎的な部分の解説から始め、「Rxの仕組み・利用方法」についても解説していきたいと思います。

また、この記事ではLINQについてを理解している前提で話を進めていくので、「LINQなんもわからん!」という方は下の記事を先に読むことをお勧めします。

Reactive Extensions(Rx)とは

Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.

(Reactive Extensions (Rx) は、監視可能なシーケンスと LINQ スタイルのクエリ演算子を使用して、非同期およびイベントベースのプログラムを作成するためのライブラリである。)

Reactive Extensions(Rx)とは、Microsoftの開発した「非同期/イベントなどの処理をLINQ形式で宣言的に記述する」ことを可能にするライブラリです。このRxを用いることで、手続き型では書きにくい複雑なイベント処理や非同期処理を宣言的に記述することができます。

Rxという概念が非常に優れていたため、ReactiveXとして多数の言語・プラットフォームに移植され、多くの注目を集めました。今回挙げたUniRxはneuecc氏によるUnity向けのRxであり、AOT対応のほか、UnityでRxを扱うための様々な機能が豊富に用意されています。

LINQとRx

とまあRxについてサラッと紹介しましたが、正直この説明では「なんか凄そうだけど、結局Rxってなんなん?」という感じなので、より詳しく説明していきましょう。

Rxにおける最大の飛躍は「イベントを時間軸に乗るストリームとみなし、それをLINQで表現する」ところにあります。この考え方が革命的であったために、ここまでRxが普及したと言っても過言ではないでしょう。そして、同時にこの概念がRxの基本であるため、ここを理解することが最初の関門になります。

LINQ to Objects

Rxの前に、まずは普通のLINQ to Objectsについて考えてみます・
例えば以下のような配列があったとしましょう。

// 何の変哲もないint配列
var array = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

この中から「奇数だけを抽出してstringに変換し、それをforeachで回す」場合、LINQを用いて以下のように記述できます。

// Whereで奇数のみを抽出し、Selectでstringに変換
var query = array.Where(x => x % 2 == 1).Select(x => x.ToString());

foreach (var str in query)
{
   Debug.Log(str);
}

これを図で表すと、以下のようになります。

オペレータによって新たなIEnumerable<T>を生成し、foreachで回しています。ここまでは普通のLINQなので問題ないでしょう。

LINQ to Events

では、これがRxの場合はどうなるのでしょうか。今度はUnityのUpdate関数を例に考えてみます。

上の図は、Updateが呼ばれていく様子を先程のような形で表したものです。横軸が「長さ」から「時間」に変わったのが見て取れるでしょうか。

このようにRxでは、時間軸を横軸に取り、イベントをストリームと見なします。これはUpdateなどに限った話ではなく、ボタンのクリックなども同様に…

このような形で表現できます。イベントを時間軸上に乗せるだけなので「あらゆるものをRxとして表現する」ことが可能です。

さて、それではもう一度上の図を、今度は先ほどのLINQ to Objectsの図と見比べてみてください。横軸がLengthからTimeに変わっただけで、他はそっくりそのままじゃないでしょうか…?

そう。なぜこのような形でイベントを捉え直したかというと、このようにストリームとしてみなせばLINQが適用可能になるからです。コレクションに対して強力な効果を発揮してきたLINQが、Rxを使えばイベントに対しても利用できるようになります。

では、実際にコードで書いてみましょう。例えばUpdate中に「スペースキーが押された時」だけ処理をしたい場合、普通に書くとこんな感じになります。

// Update関数があって...
void Update()
{
    // もしSpaceキーが押されたなら...
    if (Input.GetKeyDown(KeyCode.Space))
    {
        // 何らかの処理を行う
        Debug.Log("スペースキーが押された!");
    }
}

これをUniRxを用いて書くとこんな感じ。

void Start()
{
    // Observable.EveryUpdate(UniRxの機能)でUpdateのストリームを取得し...
    // Whereでフィルタリングして...
    // Subscribeで処理を購読 (後ほど説明します)
   
    Observable.EveryUpdate()
        .Where(_ => Input.GetKeyDown(KeyCode.Space))
        .Subscribe(_ => 
        {
            // 何らかの処理
            Debug.Log("スペースキーが押された!");
        });
}

Update関数で手続き的に書いていた処理が、LINQを用いた宣言的なものに変わっているのがわかるかと思います。

これを図にすると以下のような感じ。

イベントをストリーム化することで、LINQ to Objectsと同様に、処理をWhereでフィルタリングすることが可能になりました。

ただ、このコードだけだとRxの威力を体感しづらいので、さらに複雑にします。「スペースキーが押されてから2秒後に何らかの処理を、最初の3回だけ行う」といったように変更してみましょう。

これを普通に書こうとするとかなり大変です。あえて書くなら以下のような感じでしょうか。

// 回数のカウント用の変数を用意
int count = 0;

void Update()
{
    // もしスペースキーが押され、かつ回数が3未満なら...
    if (Input.GetKeyDown(KeyCode.Space) && count < 3)
    {
        // カウントを加算
        count++;

        // コルーチンを起動
        StartCoroutine(HogeCoroutine());
    }
}

IEnumerator HogeCoroutine()
{
    // コルーチンで2秒待つ
    yield return new WaitForSeconds(2f);

    // 何らかの処理
    Debug.Log("Hello!");
}

一気にコードが複雑化しましたね。無駄なメソッドや変数が追加され、非常に読みづらいコードになってしまいました。

ではこれをUniRxで書いてみると…

void Start()
{
    // Observable.EveryUpdateでUpdateのストリームを取得し...
    // Whereでフィルタリングして...
    // Takeで最初の3回のみを取得し...
    // Delayで2秒間遅延させ...
    // Subscribeで処理を購読
   
    Observable.EveryUpdate()
        .Where(_ => Input.GetKeyDown(KeyCode.Space))
     .Take(3)
        .Delay(TimeSpan.FromSeconds(2f))
        .Subscribe(_ => 
        {
            // 何らかの処理
            Debug.Log("Hello!");
        });
}

このように、ここまで複雑な処理をほんの数行で記述することができます。その上、オペレータを見ればどのような処理が行われているかが一目瞭然であり、可読性という観点からも非常にわかりやすいコードとなっています。(あと書いてて楽しい。IntelliSense最高)

Rxの基本的な考え方、そしてその威力がわかっていただけたでしょうか。

Subscribe

続いては、Rxの根幹を担うSubscribe(購読)の概念について。この辺りもLINQと非常に似ているので、比較して見ていくとわかりやすいかと思います。

LINQでは、IEnumerable<T>の連鎖で各操作を表現し、それをforeachで回すことで動かしました。コードにするとこんな感じ。

var collection = Enumerable.Range(1, 40);

// 各オペレータの戻り値はIEnumerable<T>
var query = collection
    .Where(x => x % 3 == 0)
    .Select(x => x.ToString())
    .Take(5);

// foreachで回すことで実行される
foreach (var str in query)
{
    // MoveNext時の処理
    Debug.Log(str);
}

foreachを書くことで、MoveNext時にCurrentの値を取得することができます。

これがRxの場合だと、IObservable<T>というインターフェースの連鎖で各操作を表現し、それをSubscribe(購読)することで処理を実行します。

var updateStream = Observable.EveryUpdate();

// 各オペレータの戻り値はIObservable<T>
var stream = updateStream
    .Where(x => Input.GetKeyDown(KeyCode.Space))
    .Take(5);

// Subscribeすることで処理を実行する
stream.Subscribe(x => 
{
    // OnNext時の処理
    Debug.Log(x + "Frame");
});

このようにSubscribeすることで、OnNext時に通知される値を取得することができます。
また、RxのイベントストリームにはOnNext / OnError / OnCompletedという3つのイベントがあり、

  1. OnNext (値の発行) が0回以上呼ばれる
  2. OnCompleted (イベントの完了) または OnError (エラー) が1回呼ばれる

という流れでイベントが発行されます。

これらのイベントもSubscribeで同時に受け取ることが可能です。

stream.Subscribe(
    x => Debug.Log(x + "Frame"), // OnNext
    ex => Debug.Log("Exception " + ex), // OnError
    () => Debug.Log("Complete") // OnCompleted
); 

購読を解除したい場合には、戻り値のIDisposableをDisposeするだけです。この性質により、従来のevent構文では難しいイベントの解除をRxでは簡単に行うことが可能です。

// Subscribeの戻り値はIDisposable
var disposable = stream.Subscribe(x => 
{
    Debug.Log(x + "Frame");
});

// Disposeを呼ぶことで簡単に購読を解除できる
diposable.Dispose();

Observerパターン

先ほどはRxではSubscribe(購読)で処理を行うと説明しましたが、ここからはSubscribeによる動作が具体的にどのような処理を通じて行われるのかを見ていきたいと思います。

Rxの実装は「Observerパターン」という設計が核になっています。Observerパターンとは観察対象(Observable)の状態が変化した際に、値を観察者(Observer)に通知するデザインパターンのことで、何らかの状態を監視したい時に有効です。

C#のObserverパターンにおける登場人物は、IObserver<T>IObservable<T>という2つのインターフェースです。早速中身を見ていきましょう。

IObserver<T>

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

IObserver<T>の中身はこんな感じ。

観察者の役割であるIObserver側では、観察対象(Observable)の状態変化に応じて呼ばれる処理の内容を実装します。メソッドの内容はSubscribeのところでも説明しましたが、

  • 「OnNext」で観察対象(Observable)の変化を値で通知
  • 「OnError」で観察対象(Observable)の変化時に起こったエラー(例外)を通知
  • 「OnCompleted」で観察対象(Observable)の変化が完了したことを通知

という感じになっています。

IObservable<T>

public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

それに対して、IObservable<T>の中身はこんな感じ。

観察対象側であるIObservableの持つメソッドはSubscribeのみ。SubscribeでIObserverを受け取り、状態変化時に全てのIObserverへ値の発行(OnNextの呼び出し)を行います。

そのため、IObservableを実装する際には、購読者であるIObserverを記憶する仕組みを実装する必要があります。ま、基本的にはListでまとめておけばOKです。

IDisposable

public interface IDisposable
{
    void Dispose();
}

Observerパターン独自のインターフェースは上の2つのみですが、もう1つ、忘れてはいけないのがIDisposable。

IDisposableは基本的にリソースの破棄のために実装しますが、Rxにおいては購読の解除を担当するインターフェースとして機能します。

Subscribeの項でも説明した通り、イベントの解除(購読解除)はDisposeの呼び出しで行います。そのため、IObservableを実装する際には、合わせて購読解除を行うIDisposableを実装したクラスを作成する必要があります。

Observerパターンの処理の流れ

これらのインターフェースを用いてObserverパターンを実装すると、処理の流れは以下のようになります。

  1. SubscribeでObservableにObserverを渡す
  2. 状態変更時にObservableが保持するObserverに値を発行(OnNextを呼ぶ)
  3. Subscribeで取得したDisposableをDisposeして購読を終了

言葉だけでは分かりづらいので、実際に簡単なObserverパターンを実装してみましょう。IObserver<T>とIObservable<T>を実装したクラスを用意します。

// IObserverを実装したクラス
public class MyObserver : IObserver<string>
{
    private string _name;
    public MyObserver(string name)
    {
        _name = name;
    }

   // 完了時の処理
    public void OnCompleted()
    {
        Debug.Log("Complete!");
    }

   // エラー時の処理
    public void OnError(Exception error)
    {
        Debug.Log("Error!");
    }

   // 値が通知された際の処理
    public void OnNext(string value)
    {
        Debug.Log(value);
    }
}
public class MyObservable : IObservable<string>
{
    // 購読されたIObserverのリスト
    private List<IObserver<string>> _observers = new List<IObserver<string>>();
 
    // 購読時の処理
    public IDisposable Subscribe(IObserver<string> observer)
    {
        // 内部のリストに渡されたobserverを追加
        if(!_observers.Contains(observer))
            _observers.Add(observer);

     // 購読解除用のdisposableを返す
        return new Unsubscriber(_observers, observer);
    }

    // 全ての発行先に"Hello, world!"という文字列を通知する
    public void SendHello()
    {
        foreach (var observer in _observers)
        {
            observer.OnNext("Hello, world!");
        }
    }
    
    // 購読解除用の内部クラス
    class Unsubscriber : IDisposable
    {
        // 発行先リスト
        private List<IObserver<string>> _list;
        // DisposeされたときにRemoveするIObserver<string>
        private IObserver<string> _observer;

        public Unsubscriber(List<IObserver<string>> list, IObserver<string> observer)
        {
            _list = list;
            _observer = observer;
        }

        public void Dispose()
        {
            // Dispose時にListから対象の発行先を削除する
            _list.Remove(_observer);
        }
    }
}

では、これらのクラスを使ってコードを書いてみます。

// 値を受け取るクラスを作成
MyObserver observer = new MyObserver("name");
// 値を発行するクラスを作成
MyObservable observable = new MyObservable();

// Subscribeでobserverを渡す
observable.Subscribe(observer);
// 値を通知
observable.SendHello();

このように、Observerパターンを実装することで、Subscribeによって購読したObservableの変更を受け取ることができるようになりました。

Observerの隠蔽

ここまで説明してきて「あれ?」と思う方もいるかもしれません。「さっきのSubscribeの項だとIObserver<T>なんて使ってなくね?」と。

// あれ?Observerは?
stream.Subscribe(_ => 
{
    Debug.Log("HELLO!");
});

これはRxライブラリ側がIObservableに対する拡張メソッドを用意しているためで、ラムダ式で処理を渡すことで自動的にObserverを生成してくれます。そのため、利用側はIObserverの存在を意識することなく購読を行うことが可能です。

Observableの生成

また、RxライブラリにはObservableを生成するためのファクトリメソッドが事前に用意されています。

// 一定時間後に値を発行するObservable
Observable.Timer(TimeSpan.FromSeconds(5))
    .Subscribe(_ => Debug.Log("5秒経過"));

// Updateのタイミングで値を発行するObservable
Observable.EveryUpdate()
    .Subscribe(_ => Debug.Log("Update!")));

UniRxの場合はMonoBehaviourのイベントをObservable化する拡張メソッドが用意されているほか、CreateやFromEventを使えば独自のObservableも簡単に作成でき、さらにはコルーチンからObservableに変換することも可能です。

以上のような機能が存在するため、Rxを使う場合にはIObserver<T>やIObservable<T>のようなインターフェースを自力で実装する場面はほとんどありません。
とはいえ、ObserverパターンはRxの核となるデザインパターンですから、それらの仕組みについてはしっかり覚えておくようにしましょう。

Subjectって何者?

さて、ここでUniRxを触ったことがある人ならもう1つ、疑問に思うことがあるかもしれません。そう、Subjectです。

UniRxにおいてストリームソースとして使われるSubjectですが、このSubjectに対してはOnNext / Subscribeの両方を呼び出すことができます。これは「SubjectがIObserver<T>とIObservable<T>の両方を実装している」ためです。

var subject = new Subject<int>();

// 内部にIObserverを追加する
subject.Subscribe(x => 
{
    Debug.Log(x);
});

// 内部のIObserverに対して値を発行する
subject.OnNext(1);

ObserverとしてもObservableとしても機能するため、単独で値の購読/発行を行うことが可能です。機能としてもevent構文の上位互換に近い形なので、非常に便利です。

Pull型とPush型

IEnumerable<T>とIObservable<T>について理解したところで、Pull型とPush型の違いにも触れておきましょう。

IEnumerable<T>とPull型

まずは以下のコードを見てみます。

var query = Enumerable.Range(1, 10)
    .Where(x % 2 == 1)
    .Select(x => x * x);

foreach (var i in query)
{
    Debug.Log(i);
}

何の変哲もないLINQのコードです。この処理の様子を図示してみると、以下のようになります。

少々…というかものすごく分かりづらいですが、注目して欲しいのは処理の流れの方向です。

foreachでループを回す際、IEnumeratorはMoveNextで次の要素に進み、Currentで値を取得します。IEnumeratorが能動的に値を取得しに動くため、矢印が上向きになっています。

このように必要な情報を必要に応じて引っ張ってくるような処理の流れを「Pull型」と呼びます。

IObservable<T>とPush型

今度はRxの場合を見てきましょう。まずはコードから。

Observable.Range(1, 10)
    .Where(x => x % 2 == 1)
    .Select(x => x * x)
    .Subscribe(i => 
    { 
        Debug.Log(i);
    });

さっきのコードのRxバージョンです。結果はLINQの方と全く変わりませんが、処理の流れは大きく異なります。

こちらの処理では、元のObservableからOnNextで値が次々に伝達されていきます。IEnumeratorと比べると、処理の流れが真逆ですね。

このように、発信元から値が通知されてくる(押し出されてくる)ような処理の流れのことを「Push型」と呼びます。

IEnumerable<T>とIObservable<T>は一見似たような動作をしますが、処理の流れに注目すると真逆であることがわかります。LINQやRxの処理を追う際には、このことに気を付けておきましょう。

Hot / Cold

続いてはRxにおける鬼門、IObservable<T>におけるHot / Coldの性質について。

RxにおけるObservableは、その性質によりHotとColdに分類されます。これを知らずにRxを使っていくと、意図しない動作を引き起こす可能性がありますので、是非とも覚えておきましょう。

Hot Observable

まずはHotの方から説明していきましょう。Hot Observableは能動的に値を発行するObservableで、後続のIObservable<T>に対して同一の値を発行します。

Hot Observableの代表的なものとしては、Observable.FromEventが挙げられます。

event Action<int> onUpdate;

void Example()
{
  // FromEventでonUpdateをObservableに変換
   var stream = Observable.FromEvent<int>(
       h => onUpdate += h,
       h => onUpdate -= h
   );

   // 1度Subscribeする
   stream.Subscribe(x => 
   {
       Debug.Log(x);
   });

   // Consoleに1が表示される
   onUpdate?.Invoke(1);

   // もう1度Subscribeする
   stream.Subscribe(y => 
   {
       Debug.Log(y);
   });

   // Consoleに2が2回表示される
   onUpdate?.Invoke(2);
}

FromEventによって作成されたObservableはeventがInvokeされたタイミングで値を流すため、いつSubscribeしても同一の値が流れてきます。

その性質上、Hot Observableは枝分かれさせることが可能です。そのため、1つのストリームを複数回Subscribeする場合にはHot Observableを利用します。

Cold Observable

対してCold Observableは、Subscribeされる(or Hot変換される)まで動作しない性質を持つObservableです。Hotとは異なり、Subscribeされない限り動作そのものが一切行われません。(LINQの遅延評価のようなもの、と言ったら分かりやすいでしょうか)

基本的にWhere、Selectなどのオペレータで生成されるObservableは全てColdになります。

Cold Observableの特徴として、Subscribeされた時点で動作を開始する(新規のストリームを生成する)ため、Subscribeするタイミングによって挙動が変化します。

var subject = new Subject<int>();

// subjectから生成されたObservableは「Hot」になる
var sourceObservable = subject.AsObservable();

// Scanオペレータから生成されたObservableは「Cold」になる
var scanObservable = sourceObservable.Scan((a, b) => a + b);

// Subscribeしていない状態で値を流す
subject.OnNext(1);
subject.OnNext(2);

// ストリームに値を流した後にSubscribe
scanObservable.Subscribe(x => Debug.Log(x));

// Subscribe後にストリームに値を流す
subject.OnNext(3);

Scanで前回の値と今回の値を足したものを流しているはずなので、1+2+3で「6」が表示されそうな気がしますが、実際にこのコードを実行するとConsoleには「3」が表示されます。

なぜ意図した動作にならないか、というと、Scanで生成されるObservableがColdであるため、実際の処理が「scanObservableをSubscribeした時点で開始している」からです。

これがHotとColdの決定的な違いであり、場合によっては意図しない動作を引き起こす原因にもなります。(また、Subscribeの度にストリームを生成するためメモリ効率が良くないという問題もあります)

ColdからHotに変換する

では、これを意図通り「6」と表示させたい場合にはどうすれば良いのでしょうか。

このコードが上手く動作しないのは、ScanがColdであるために、Subscribeされる前の値が反映されていないからです。そのため、これをHotに変換してSubscribeより前にストリームを起動させてやれば、Subscribe前の値が反映され、意図通りの動作をするようになります。

ColdをHotに変換するには、Publishというオペレータを利用します。先ほどのコードを修正し、Cold ObservableをHotに変換してみましょう。

var subject = new Subject<int>();

// subjectから生成されたObservableは「Hot」になる
var sourceObservable = subject.AsObservable();

// Scanオペレータから生成されたObservableは「Cold」になる...
// が、Publishを呼び出すことで「Hot」に変換する
var scanObservable = sourceObservable.Scan((a, b) => a + b).Publish();

// Connectを呼び出してストリームを起動する (正確にはこの時点でHot変換が完了する)
scanObservable.Connect();

// Subscribeしていない状態で値を流す
subject.OnNext(1);
subject.OnNext(2);

// ストリームに値を流した後にSubscribe
scanObservable.Subscribe(x => Debug.Log(x));

// Subscribe後にストリームに値を流す
subject.OnNext(3);

PublishとConnectを挟むことによってHot変換が行われ、Consoleに「6」が表示されるようになりました。

基本的には「Publish→Connect」でHot変換が完了する、という捉え方で構いませんが、より詳しく理解したい場合には【UniRx】Cold→Hot変換の代表、「Publishオペレータ」を完全に理解するという記事が参考になります。

Scheduler

最後に、Schedulerについて軽くまとめておきます。

Schedulerとは、処理がいつ・どこで行われるのかを振り分ける役割を持ったものです。Rxにおいて処理が行われるスレッド・タイミングはSchedulerによって決定されます。

そのため、Rxの動作には欠かせない存在ではあるのですが、SchedulerはRxライブラリ側が事前に用意している上、UniRxでは最適なSchedulerを自動で選んでくれるので、利用側がSchedulerを意識する必要はほとんどありません。

明示的にSchedulerを設定する

UniRxでは基本的にMainThreadSchedulerがデフォルトで設定されていますが、別のSchedulerを明示的に指定すること可能です。

// 明示的にMainThreadSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThread)
    .Subscribe();

// MainThreadIgnoreTimeScaleSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.MainThreadIgnoreTimeScale)
    .Subscribe();

// ThreadPoolSchedulerを指定する
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
    .Subscribe();

MainThreadSchedulerではコルーチンを用いて時間の計測を行うため、WaitForSecondsと同じ動作になります。そのため、どんなに細かい時間を指定してもフレーム単位で丸められます。また、Time.timeScaleの影響を受けます。

MainThreadIgnoreTimeScaleSchedulerはその名の通り、timeScaleの影響を受けないという点以外はMainThreadSchedulerと同じです。

ThreadPoolSchedulerも名前の通り、ThreadPool上で処理を実行するSchedulerです。
メインスレッドで処理中の内容を途中でThreadPoolに逃したい場合などに用いることができます。

他にも色々なSchedulerが用意されていますが、基本的にはデフォルトのMainThreadSchedulerで問題ありません。ただ、他のSchedulerを指定できる、ということは覚えておくと良いでしょう。

終わりに

いかがだったでしょうか。UniRxは使いこなせるようになれば極めて強力ですが、Rxの概念の難解さから、習得するのは結構大変です。また、ある程度理解していたつもりでも、Cold / Hotなどの落とし穴もあります。

しかし、Rxは革命的に便利な機能であり、それがUnity上で使えるのであれば使わない手はありません。是非ともRxについて理解した上で、UniRxを使いこなしていきましょう。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です