|
|
Javaのスレッド関連のクラスとして、JDK1.7でFork/Joinフレームワークが導入された。
ForkJoinTaskクラスに「スレッドで実行する処理」を書き、ForkJoinPoolクラスを使って実行する。
Fork/Joinフレームワークは、処理を(再帰的に)分割して実行するのに適している。
つまり、「処理(データ)を分割してそれぞれを子スレッドで並列実行し、それが終わると自分自身のスレッドも終了」という処理が向いている。
Fork/Joinも内部ではExecutorServiceを使っている。
ForkJoinTask内部で(ForkJoinTaskを使って)子スレッドを生成した場合、子スレッドを優先して実行するようになっているらしい。
(単純にThreadやExecutorService(Future)を使って子スレッドを作った場合、依存関係や優先順位は考慮されない)
また、joinメソッドで子スレッドの終了を待つが、ただ待つだけでなく、タスクを実行する(キューに入っているタスクをstealする)こともあるらしい。[2017-10-02]
これにより、無駄な(何も作業をしない)スレッドを減らしているようだ。
Fork/Joinを使ってスレッドで処理を実行する例。
(処理の分割はしていない。→処理を分割する例)
ForkJoinTaskの型引数には、戻り値の型を指定する。
import java.util.concurrent.ForkJoinTask;
class ForkJoinExample1 extends ForkJoinTask<Long> { private static final long serialVersionUID = 1L; private long result;
@Override protected boolean exec() { long sum = 0; for (int i = 1; i <= 100 * 10000; i++) { sum += i; } setRawResult(sum); // 結果保存 return true; }
@Override public Long getRawResult() { return result; } @Override protected void setRawResult(Long arg) { this.result = arg; } }
実行方法は以下のようになる。
import java.util.concurrent.ForkJoinPool;
ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = pool.submit(new ForkJoinExample1()); // 処理開始 long result = task.join(); // 終了待ち+結果取得
long result = pool.invoke(new ForkJoinExample1()); // 処理開始+終了待ち+結果取得
var task = pool.lasySubmit(new ForkJoinExample1()); // Java19[2022-09-25]
// →YUICHI SAKURABAさんのJEPでは語れないJava 19
ForkJoinPoolを使わずに呼び出すことも出来る。
(ForkJoinTask#forkメソッドの内部でForkJoinPool(共通プール)を呼び出している)
ForkJoinTask<Long> task = new ForkJoinExample1(); task.fork(); // 処理開始 long result = task.join(); // 終了待ち+結果取得
long result = task.fork().join(); // 処理開始+終了待ち+結果取得
なお、処理中に発生した例外(ForkJoinTaskのexecメソッドから発生した例外)(RuntimeException系)は、joinメソッドからスローされる。
ForkJoinTaskのサブクラスであるRecursiveTaskクラスを使うと、setRawResult/getRawResultメソッドが実装されているので、以下のようになる。
(なお、戻り値が無い場合はRecursiveActionクラスを使う)
import java.util.concurrent.RecursiveTask;
class RecursiveExample1 extends RecursiveTask<Long> { private static final long serialVersionUID = 1L;
@Override protected Long compute() { long sum = 0; for (int i = 1; i <= 100 * 10000; i++) { sum += i; } return sum; // 結果を返す } }
ForkJoinTask<Long> task = new RecursiveExample1(); long result = task.fork().join(); // 処理開始+終了待ち+結果取得
Callableインターフェース(戻り値が不要な場合はRunnableインターフェース)を使って処理を書くことも出来る。
Java8(JDK1.8)ならラムダ式が使えるので、以下のようになる。
ForkJoinTask<Long> task = ForkJoinTask.adapt(() -> { long sum = 0; for (int i = 1; i <= 100 * 10000; i++) { sum += i; } return sum; }); long result = task.fork().join(); // 処理開始+終了待ち+結果取得
処理を分割する場合、ForkJoinTaskを実装した自クラスを再帰的に生成・呼び出す形になる。
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;
class RecursiveExample2 extends RecursiveTask<Long> { private static final long serialVersionUID = 1L;
private final int start; private final int end; public RecursiveExample2(int start, int end) { this.start = start; this.end = end; }
@Override protected Long compute() { int size = end - start; if (size <= 10000) { long sum = 0; for (int i = start; i <= end; i++) { sum += i; } return sum; } // 処理を分割し、自クラスを再帰的に呼び出す int split = start + size / 2; ForkJoinTask<Long> task1 = new RecursiveExample2(start, split).fork(); ForkJoinTask<Long> task2 = new RecursiveExample2(split + 1, end).fork(); return task1.join() + task2.join(); } }
ForkJoinTask<Long> task = new RecursiveExample2(1, 100 * 10000); long result = task.fork().join(); // 処理開始+終了待ち+結果取得
ディレクトリーを再帰的に走査し、ファイルを処理する例。
普通、ディレクトリーを走査する場合はJava7(JDK1.7)のFilesとJava8(JDK1.8)のStreamを使って、以下のように書く。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Stream;
Path path = 〜; try (Stream<Path> stream = Files.walk(path)) { stream.filter(p -> !Files.isDirectory(p)).forEach(file -> { // ファイルの処理 System.out.println(file); }); } catch (IOException e) { throw new UncheckedIOException(e); }
しかしこれはシングルスレッドで処理される。
ファイルの処理の部分だけThreadやExecutorService(Future)を使ってマルチスレッドにしても、それらが終了するのを待つのは難しい(きれいに書けない)。
ForkJoinTaskを使ってディレクトリーを走査すれば、マルチスレッドで処理できる。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.RecursiveAction; import java.util.stream.Collectors; import java.util.stream.Stream;
class TraverseTask extends RecursiveAction { private static final long serialVersionUID = 1L;
private final Path path; public TraverseTask(Path path) { this.path = path; }
@Override protected void compute() { if (Files.isDirectory(path)) { try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧 stream.map(p -> new TraverseTask(p)) // ForkJoinTaskを生成 .peek(task -> task.fork()) // fork実行 .collect(Collectors.toList()).forEach(task -> task.join()); // 終了待ち } catch (IOException e) { throw new UncheckedIOException(e); } return; } // ファイルの処理 System.out.println(path); } }
Path path = 〜; new TraverseTask(path).fork().join();
余談だが、一覧取得部分をメソッド参照で書くと以下のようになる。
try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧 stream.map(TraverseTask::new) // ForkJoinTaskを生成 .peek(ForkJoinTask::fork) // fork実行 .collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち } catch (IOException e) { throw new UncheckedIOException(e); }
ForkJoinTaskを生成する部分をメソッド化すると、クラスを作らずに書ける。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.ForkJoinTask; import java.util.stream.Collectors; import java.util.stream.Stream;
static ForkJoinTask<?> traverse(Path path) { return ForkJoinTask.adapt(() -> { if (Files.isDirectory(path)) { try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧 stream.map(p -> traverse(p)) // ForkJoinTaskを生成 .peek(ForkJoinTask::fork) // fork実行 .collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち } catch (IOException e) { throw new UncheckedIOException(e); } return; } // ファイルの処理 System.out.println(path); }); }
Path path = 〜; traverse(path).fork().join();
参考: ashigeruさんのディレクトリー削除メソッド(Files.delete()はディレクトリー内にファイルがあるとエラーになるので、再帰的に消していく必要がある)