【Java】CompletableFuture で遊ぶ 1
はじめに
以前 Java と C# を使ってローカルファイルを開く、というのをやりました。
C# では(意味があったかどうかはともかくとして) async/await で非同期に処理を行っていましたが、これに近いことを Java で実現するにはどうすれば良いかな~というのが今回のお話です。
CompletableFuture を使う
ググってみたところ、 Java 8 から導入された CompletableFuture を使うのが良さそうだったので、試してみることにしました。
【before】App.java
package SearchContainedFiles; import java.nio.file.Path; import java.nio.file.Paths; import SearchContainedFiles.FileLoaders.FileLoader; public class App { public static void main(String[] args) { if(args == null || args.length < 2){ System.out.println("Need two args"); return; } for(Path file: FileLoader.Search(Paths.get(args[0]), args[1])){ System.out.println(file.toString()); } } }
【after】App.java
package SearchContainedFiles; import java.nio.file.Path; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import SearchContainedFiles.FileLoaders.FileLoader; public class App { public static void main(String[] args) { if (args == null || args.length < 2) { System.out.println("Need two args"); return; } Supplier< Path[]> loadedPaths = () -> FileLoader.Search(Paths.get(args[0]), args[1]); CompletableFuture< Path[]> supply = CompletableFuture.supplyAsync(loadedPaths); try { Path[] paths = supply.get(); for(Path file: paths){ System.out.println(file.toString()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
これでローカルファイルを読み込む処理( FileLoader.Search() )が非同期(別スレッド)で実行されます。
呼び出すメソッドでは( async/await をつけるような)変更が必要なく、呼び出し元の方も単語としては await のような「待つ」といった意味合いのメソッドは見当たりません。
が、ちゃんと非同期の処理が完了するまで待った上で全部の処理が完了します。
どのようにしてこれらの処理が動いているのかを追ってみることにしました。
何をしているのか
まず、 CompletableFuture を含め、 Future を使った非同期処理というのは、 Future パターン と呼ばれるデザインパターンで実行されるとのこと。
Java本格入門 で挙げられている Future パターンの例では、下記 3 つのクラスが登場します。
- ExecutorService
- Future< String>
- Callable< String>
流れとしては下記のような感じだと解釈しています。
(実際の処理は本を見てもらうとして)
では先ほどのサンプルコードが、これと同じような動きになっているのかを見てみたいと思います。
(Completable)Future が出てくるあたり完全に同じでないにしろ、近そうな予感はありますね。
ソースコード
ソースコードは OpenJDKのサイト から入手できます。
今回は下記を ZIP 形式でダウンロードしてきました(ページ左にリンクがあります)。
コードを辿る冒険
今回の処理の中心となるのは下記のクラスです。
これらはいずれも src > java.base > share > classes > java > util > concurrent にあります。
supplyAsync
コードの内容としては、まず CompletableFuture.supplyAsync(Supplier< Path>) からスタートします。
まず引数となる Supplier は、 T (ここでは Path )を返す get() を持つだけの interface です。
で、 supplyAsync は asyncSupplyStage(ASYNC_POOL, supplier) を呼びます。
この ASYNC_POOL は、 ForkJoinPool.getCommonPoolParallelism() が 1 より大きい場合は ForkJoinPool のインスタンスを返します。
また、AccessController.doPrivileged ~ により、 ForkJoinPool に付与されている権限で実行されます。
ForkJoinPool.java
~省略~ common = AccessController.doPrivileged(new PrivilegedAction< >() { public ForkJoinPool run() { return new ForkJoinPool((byte)0); }}); ~省略~
getCommonPoolParallelism() ですが、これが 1 以下になる場合、というのはシングルコアの PC ということなのでしょうか。
ちなみに Surface Pro 6(Core i5-8250U) で実行したところ、 7 が返ってきました。
コア数とイコールでもないのが気になるところですが、今回はスキップすることにします。
ForkJoinPool クラスは AbstractExecutorService を継承しており、 Future パターンで登場した ExecutorService の役割をになっているようです。
asyncSupplyStage
さて asyncSupplyStage に進みます。
ここでは ForkJoinPool.execute に AsyncSupply< U>(ompletableFuture< U> d, Supplier< U> f) を渡しています( U は今回の場合 Path[] )。
ということで、 Callbale< String> にあたるクラスは AsyncSupply< U> ということなのだと思います。
次は ForkJoinPool.java で定義されている AsyncSupply を見てみます。
AsyncSupply
ForkJoinPool.java > AsyncSupply< T>
~省略~ public void run() { CompletableFuture< T> d; Supplier< ? extends T> f; if ((d = dep) != null && (f = fn) != null) { dep = null; fn = null; if (d.result == null) { try { d.completeValue(f.get()); } catch (Throwable ex) { d.completeThrowable(ex); } } d.postComplete(); } } ~省略~
ここで気になる処理は 4 つです。
- CompletableFuture< T>.result
- CompletableFuture< T>.completeValue()
- Supplier< ? extends T>.get()
- CompletableFuture< T>.postComplete()
1.はここまで追ってきた処理には登場しなかったような気がします。
他で代入している値を見ると、複数の処理が行われた時に、先に実行されたタスクでエラーが発生した場合に、処理をスキップして終了する( 4.で )ためのもののようです。
2.は RESULT.compareAndSet(this, null, (t == null) ? NIL : t) ( t は Supplier< ? extends T>.get() )を返しています。
この RESULT が何かというと、このような内容になっています。
( try ~ catch などは省いています)
MethodHandles.Lookup l = MethodHandles.lookup(); VarHandle RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class);
これにより、 CompletableFuture にある volatile Object result; への参照が取得できます。
VarHandle は 動的に変数への強い型を持った参照を得るためのもので、ここでは CompletableFuture の result に対する参照を取得している、と。
なおここでアクセスチェックが行われるために、 AccessController.doPrivileged ~ が必要となるようです。
2.に戻ると、 compareAndSet によって Supplier< ? extends T>.get() の戻り値を result にセットしているようです。
ここは後でもう少し調べてみることにします。
最後 4.ですが、処理完了後に実行するために登録されている処理を、順に実行して外していく、ということを行っているようです。
が、ここだけに限らないのですが、 CompletableFuture をはじめ同じ型のインスタンスが複数登場して混乱してきているので、後日修正するかもしれません/(^o^)\
compareAndSet について
さて、ちょこちょこと登場している VarHandle.compareAndSet 。
これが何をしているのかちょっとだけ調べてみることにしました。
例えばこのようなクラスがあったとして。
VarHSample2.java
package SearchContainedFiles; class VarHSample2{ String message; void call(){ System.out.println("VarHSample2 " + message); } }
このようにすると、 VarHSample2.java の message の中身が変更されます。
VarHSample.java
package SearchContainedFiles; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; class VarHSample { void call(){ MethodHandles.Lookup l = MethodHandles.lookup(); VarHandle handle; VarHSample2 sample2 = new VarHSample2(); try{ // 引数は 1. ターゲットとなるクラス 2. 変更したい変数名 3. 2.の型 handle = l.findVarHandle(VarHSample2.class, "message", String.class); // 引数は 1. findVarHandle の第一引数のインスタンス // 2. findVarHandle の第二引数の変数の処理実行前の値 // 3. 2.の値が変数と等しかった場合に代入される値 boolean result = handle.compareAndSet(sample2, null, "hello!"); System.out.println(result); // true } catch(IllegalAccessException e){ System.out.println(e.getMessage()); } catch(NoSuchFieldException e){ System.out.println(e.getMessage()); } sample2.call(); // 「VarHSample2 hello!」と出力される. } }
- findVarHandle で、ターゲットとなる変数は、呼び出し元である VarHSample.java からアクセス可能である必要があります(アクセス不可の場合、実行時にエラー)。
- findVarHandle と compareAndSet の第一引数の型は同じである必要があり、違っていると実行時にエラーがでます。
- compareAndSet の第二引数の値が、 findVarHandle の第二引数で指定する変数と異なる場合、戻り値は false となり、変数の値はそのままになります。
上記を踏まえてもう一度先ほどのコードを見てみると...
MethodHandles.Lookup l = MethodHandles.lookup(); VarHandle RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); // T は Supplier< ? extends T>.get() の戻り値. RESULT.compareAndSet(this, null, (t == null) ? NIL : t)
ForkJoinPool.java > AsyncSupply< T> の completeValue 実行時に、 result が null で、かつ Supplier< ? extends T>.get() が null でなかった場合に T (今回は Path[] )が代入される、という動きになっているようです。
ここで一旦切りまして、次は get() で処理の完了を待ち受けてみたいと思います。