|
|
Javaのスレッド関連のクラスとして、JDK1.7でFork/Joinフレームワークが導入された。
ForkJoinTaskクラスに「スレッドで実行する処理」を書き、ForkJoinPoolクラスを使って実行する。
Fork/Joinフレームワークは、処理を(再帰的に)分割して実行するのに適している。
つまり、「処理(データ)を分割してそれぞれを子スレッドで並列実行し、それが終わると自分自身のスレッドも終了」という処理が向いている。
Fork/Joinも内部ではExecutorServiceを使っている。
ForkJoinTask内部で(ForkJoinTaskを使って)子スレッドを生成した場合、子スレッドを優先して実行するようになっているらしい。
(単純にThreadやExecutorService(Future)を使って子スレッドを作った場合、依存関係や優先順位は考慮されない)
また、joinメソッドで子スレッドの終了を待つが、ただ待つだけでなく、タスクを実行する(キューに入っているタスクをstealする)こともあるらしい。[2017-10-02]
これにより、無駄な(何も作業をしない)スレッドを減らしているようだ。
Fork/Joinを使ってスレッドで処理を実行する例。
(処理の分割はしていない。→処理を分割する例)
ForkJoinTaskの型引数には、戻り値の型を指定する。
import java.util.concurrent.ForkJoinTask;
class ForkJoinExample1 extends ForkJoinTask<Long> {
private static final long serialVersionUID = 1L;
private long result;
@Override
protected boolean exec() {
long sum = 0;
for (int i = 1; i <= 100 * 10000; i++) {
sum += i;
}
setRawResult(sum); // 結果保存
return true;
}
@Override
public Long getRawResult() {
return result;
}
@Override
protected void setRawResult(Long arg) {
this.result = arg;
}
}
実行方法は以下のようになる。
import java.util.concurrent.ForkJoinPool;
ForkJoinPool pool = new ForkJoinPool(); ForkJoinTask<Long> task = pool.submit(new ForkJoinExample1()); // 処理開始 long result = task.join(); // 終了待ち+結果取得
long result = pool.invoke(new ForkJoinExample1()); // 処理開始+終了待ち+結果取得
var task = pool.lasySubmit(new ForkJoinExample1()); // Java19[2022-09-25]
// →YUICHI SAKURABAさんのJEPでは語れないJava 19
ForkJoinPoolを使わずに呼び出すことも出来る。
(ForkJoinTask#forkメソッドの内部でForkJoinPool(共通プール)を呼び出している)
ForkJoinTask<Long> task = new ForkJoinExample1(); task.fork(); // 処理開始 long result = task.join(); // 終了待ち+結果取得
long result = task.fork().join(); // 処理開始+終了待ち+結果取得
なお、処理中に発生した例外(ForkJoinTaskのexecメソッドから発生した例外)(RuntimeException系)は、joinメソッドからスローされる。
ForkJoinTaskのサブクラスであるRecursiveTaskクラスを使うと、setRawResult/getRawResultメソッドが実装されているので、以下のようになる。
(なお、戻り値が無い場合はRecursiveActionクラスを使う)
import java.util.concurrent.RecursiveTask;
class RecursiveExample1 extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
@Override
protected Long compute() {
long sum = 0;
for (int i = 1; i <= 100 * 10000; i++) {
sum += i;
}
return sum; // 結果を返す
}
}
ForkJoinTask<Long> task = new RecursiveExample1(); long result = task.fork().join(); // 処理開始+終了待ち+結果取得
Callableインターフェース(戻り値が不要な場合はRunnableインターフェース)を使って処理を書くことも出来る。
Java8(JDK1.8)ならラムダ式が使えるので、以下のようになる。
ForkJoinTask<Long> task = ForkJoinTask.adapt(() -> {
long sum = 0;
for (int i = 1; i <= 100 * 10000; i++) {
sum += i;
}
return sum;
});
long result = task.fork().join(); // 処理開始+終了待ち+結果取得
処理を分割する場合、ForkJoinTaskを実装した自クラスを再帰的に生成・呼び出す形になる。
import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask;
class RecursiveExample2 extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;
private final int start;
private final int end;
public RecursiveExample2(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int size = end - start;
if (size <= 10000) {
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
// 処理を分割し、自クラスを再帰的に呼び出す
int split = start + size / 2;
ForkJoinTask<Long> task1 = new RecursiveExample2(start, split).fork();
ForkJoinTask<Long> task2 = new RecursiveExample2(split + 1, end).fork();
return task1.join() + task2.join();
}
}
ForkJoinTask<Long> task = new RecursiveExample2(1, 100 * 10000); long result = task.fork().join(); // 処理開始+終了待ち+結果取得
ディレクトリーを再帰的に走査し、ファイルを処理する例。
普通、ディレクトリーを走査する場合はJava7(JDK1.7)のFilesとJava8(JDK1.8)のStreamを使って、以下のように書く。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.stream.Stream;
Path path = 〜;
try (Stream<Path> stream = Files.walk(path)) {
stream.filter(p -> !Files.isDirectory(p)).forEach(file -> {
// ファイルの処理
System.out.println(file);
});
} catch (IOException e) {
throw new UncheckedIOException(e);
}
しかしこれはシングルスレッドで処理される。
ファイルの処理の部分だけThreadやExecutorService(Future)を使ってマルチスレッドにしても、それらが終了するのを待つのは難しい(きれいに書けない)。
ForkJoinTaskを使ってディレクトリーを走査すれば、マルチスレッドで処理できる。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.RecursiveAction; import java.util.stream.Collectors; import java.util.stream.Stream;
class TraverseTask extends RecursiveAction {
private static final long serialVersionUID = 1L;
private final Path path;
public TraverseTask(Path path) {
this.path = path;
}
@Override
protected void compute() {
if (Files.isDirectory(path)) {
try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
stream.map(p -> new TraverseTask(p)) // ForkJoinTaskを生成
.peek(task -> task.fork()) // fork実行
.collect(Collectors.toList()).forEach(task -> task.join()); // 終了待ち
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return;
}
// ファイルの処理
System.out.println(path);
}
}
Path path = 〜; new TraverseTask(path).fork().join();
余談だが、一覧取得部分をメソッド参照で書くと以下のようになる。
try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
stream.map(TraverseTask::new) // ForkJoinTaskを生成
.peek(ForkJoinTask::fork) // fork実行
.collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち
} catch (IOException e) {
throw new UncheckedIOException(e);
}
ForkJoinTaskを生成する部分をメソッド化すると、クラスを作らずに書ける。
import java.io.IOException; import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.ForkJoinTask; import java.util.stream.Collectors; import java.util.stream.Stream;
static ForkJoinTask<?> traverse(Path path) {
return ForkJoinTask.adapt(() -> {
if (Files.isDirectory(path)) {
try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
stream.map(p -> traverse(p)) // ForkJoinTaskを生成
.peek(ForkJoinTask::fork) // fork実行
.collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return;
}
// ファイルの処理
System.out.println(path);
});
}
Path path = 〜; traverse(path).fork().join();
参考: ashigeruさんのディレクトリー削除メソッド(Files.delete()はディレクトリー内にファイルがあるとエラーになるので、再帰的に消していく必要がある)