vaguely

和歌山に戻りました。ふらふらと色々なものに手を出す毎日。

UniRxでObservableを作る・つなげる

はじめに

Observableは作れる!ということで(完全に思いつきで発言しています)、CreateやSelectManyを使ってみます。

なお、本来はRxJavaとUniRxの対応表を作ってみたい 1の続編にするつもりでしたが、今回はUniRxのみ扱います。

Observableを作る

例えば時間のかかる処理を行う関数で、戻り値としてIObservable<T>を返すことで、処理の完了を呼び出し側で受け取れるようにします。

private IObservable ExecuteSomething()
{
        return Observable.Create(observer =>
        {
            // 何らかの処理.
            observer.OnNext(0);
            observer.OnCompleted();
            return new CompositeDisposable();
        });
}

Observable.Create<T>{何らかの処理}でObservableを作ります。

  • <T>の型はintだけでなくstringやTexture2Dなども使用可能ですが、関数の戻り値IObservable<T>と、Observable.Create<T>、observer.OnNext(T)の型を合わせる必要があります。
     (というかOnNextで呼び出し元に値を渡すことができるので、そこで必要な値の型を指定します)
  • {何らかの処理}では戻り値としてDisposableを渡します。
     これは変数に入れて渡すことも可能ですが、Createのときに新規作成する必要があります  (Startなどで1回しか作成しないと、1回Completeした後Observable.Create直後に完了してしまうようになります)。
  • {何らかの処理}の中でobserver.OnNext(T)、observer.OnComplete()、observer.OnError(Exception)を呼び出すことで、呼び出し元に通知することができます。

呼び出し

このExecuteSomething()を呼び出すには、ExecuteSomething().Subscribe()を実行します。
Subscribeしないと{何らかの処理}の内容が実行されないので注意が必要です
(私はたまに忘れます\(^o^)/)。

ExecuteSomething()
    .Subscribe(
        result => {/* observer.OnNext実行時に呼ばれる(resultはOnNext(T)の引数) */},
        error => {/* observer.OnError実行時に呼ばれる(errorはOnError(Exception)の引数) */},
        () => {/* observer.OnComplete実行時に呼ばれる */});
  • 上記はOnNext、OnError、OnComplete全て書いていますが、不要な場合はOnError、OnCompleteを省略できます。

Error

{何らかの処理}の中でErrorが発生した時に呼び出し元に通知が欲しい場合はobserver.OnError(Exception)を実行します。

例えば下記の場合は引数としてFileNotFoundExceptionを渡します。

private IObservable LoadTexture(string path)
{
    return Observable.Create(observer =>
    {
        if (!File.Exists(path))
        {
            observer.OnError(new FileNotFoundException("ファイルがないアルヨ"));
            // OnErrorを実行しても処理自体は止まらないのでreturnで処理を中断.
            return new CompositeDisposable();
        }
~省略~
    });
}

なお、OnErrorを呼び出し元に返すためには呼び出し元、呼び出される側(ExecuteSomething、LoadTexture)の両方でOnErrorの処理を書いておく必要があり、どちらかが書かれていない場合は通常のErrorと同じように扱われます。

ここは注意が必要かもしれません。

private IObservable LoadTexture(string path)
    {
        return Observable.Create(observer =>
        {
            // パスから画像読み込み.
            var newTexture = new Texture2D(1, 1);
            newTexture.LoadImage(File.ReadAllBytes(path));
            observer.OnNext(newTexture);
            observer.OnCompleted();
            return new CompositeDisposable();
        });
    }

Observableをつなげる

例えば画像を読み込んで、それをMaterialのTextureにセットする処理を行う場合。

それぞれ下記のような関数を作ってみました。

private IObservable LoadTexture(string path)
{
    // パスから画像を読み込む.
    return Observable.Create(observer =>
    {
        if (!File.Exists(path))
        {
            observer.OnError(new FileNotFoundException("ファイルがないアルヨ"));
            return new CompositeDisposable();
        }
        var newTexture = new Texture2D(1, 1);
        newTexture.LoadImage(File.ReadAllBytes(path));
        observer.OnNext(newTexture);
        observer.OnCompleted();
        return new CompositeDisposable();
    });
}
private IObservable AttachMainTexture(Texture targetTexture)
{
    // Materialに画像をセットする.
    return Observable.Create(observer =>
    {
        TargetMaterial1.SetTexture("_MainTex", targetTexture);
        observer.OnCompleted();
        return new CompositeDisposable();
    });
}

これを実行する場合に、それぞれをSubscribeすることもできますが、 下記のように一纏めにすることができます。

// LoadTextureを呼び出して画像を読み込む.
LoadTexture(Application.dataPath + @"/files/image1.png")
    // LoadTextureのOnNextで渡された画像を使ってAttachMainTextureを呼び出す.
    .SelectMany(texture => AttachMainTexture(texture))
                .Subscribe(
                    result => Debug.Log("OnNext"),
                    () => Debug.Log("finished"));
  • あるObservableの処理が終わったあと、OnNextで渡された値を使ってそのまま別のObservableをSubscribeしたい場合、「SelectMany」を使うことができます。
  • 同じものを呼び出したい場合などは「Concat」を使います。
  • SubscribeのOnNext、OnError、OnCompleteは後から読んでいるAttachMainTextureのものが反映されるため、例えばLoadTextureのOnCompleteのタイミングで何かをしたい、ということであればSelectManyなどを使わず分割した方が良さそうです。

参考