S-JIS[2017-09-30/2017-10-02] 変更履歴

Java Fork/Join


概要

Javaスレッド関連のクラスとして、JDK1.7でFork/Joinフレームワークが導入された。
ForkJoinTaskクラスに「スレッドで実行する処理」を書き、ForkJoinPoolクラスを使って実行する。

Fork/Joinフレームワークは、処理を(再帰的に)分割して実行するのに適している。
つまり、「処理(データ)を分割してそれぞれを子スレッドで並列実行し、それが終わると自分自身のスレッドも終了」という処理が向いている。


Fork/Joinも内部ではExecutorServiceを使っている。

ForkJoinTask内部で(ForkJoinTaskを使って)子スレッドを生成した場合、子スレッドを優先して実行するようになっているらしい。
(単純にThreadExecutorService(Future)を使って子スレッドを作った場合、依存関係や優先順位は考慮されない)

また、joinメソッドで子スレッドの終了を待つが、ただ待つだけでなく、タスクを実行する(キューに入っているタスクをstealする)こともあるらしい。[2017-10-02]
これにより、無駄な(何も作業をしない)スレッドを減らしているようだ。


Fork/Joinを使ってスレッドで処理を実行する例。
(処理の分割はしていない。→処理を分割する例

ForkJoinTaskの型引数には、戻り値の型を指定する。

import java.util.concurrent.ForkJoinTask;
class ForkJoinExample1 extends ForkJoinTask<Long> {
	private static final long serialVersionUID = 1L;

	private long result;
	@Override
	protected boolean exec() {
		long sum = 0;
		for (int i = 1; i <= 100 * 10000; i++) {
			sum += i;
		}
		setRawResult(sum); // 結果保存

		return true;
	}
	@Override
	public Long getRawResult() {
		return result;
	}

	@Override
	protected void setRawResult(Long arg) {
		this.result = arg;
	}
}

実行方法は以下のようになる。

import java.util.concurrent.ForkJoinPool;
		ForkJoinPool pool = new ForkJoinPool();
		ForkJoinTask<Long> task = pool.submit(new ForkJoinExample1()); // 処理開始
		long result = task.join(); // 終了待ち+結果取得
		long result = pool.invoke(new ForkJoinExample1()); // 処理開始+終了待ち+結果取得

ForkJoinPoolを使わずに呼び出すことも出来る。
(ForkJoinTask#forkメソッドの内部でForkJoinPool(共通プール)を呼び出している)

		ForkJoinTask<Long> task = new ForkJoinExample1();
		task.fork(); // 処理開始
		long result = task.join(); // 終了待ち+結果取得
		long result = task.fork().join(); // 処理開始+終了待ち+結果取得

なお、処理中に発生した例外(ForkJoinTaskのexecメソッドから発生した例外)(RuntimeException系)は、joinメソッドからスローされる。


ForkJoinTaskのサブクラスであるRecursiveTaskクラスを使うと、setRawResult/getRawResultメソッドが実装されているので、以下のようになる。
(なお、戻り値が無い場合はRecursiveActionクラスを使う)

import java.util.concurrent.RecursiveTask;
class RecursiveExample1 extends RecursiveTask<Long> {
	private static final long serialVersionUID = 1L;
	@Override
	protected Long compute() {
		long sum = 0;
		for (int i = 1; i <= 100 * 10000; i++) {
			sum += i;
		}
		return sum; // 結果を返す
	}
}
		ForkJoinTask<Long> task = new RecursiveExample1();
		long result = task.fork().join(); // 処理開始+終了待ち+結果取得

Callableインターフェース(戻り値が不要な場合はRunnableインターフェース)を使って処理を書くことも出来る。
Java8(JDK1.8)ならラムダ式が使えるので、以下のようになる。

		ForkJoinTask<Long> task = ForkJoinTask.adapt(() -> {
			long sum = 0;
			for (int i = 1; i <= 100 * 10000; i++) {
				sum += i;
			}
			return sum;
		});

		long result = task.fork().join(); // 処理開始+終了待ち+結果取得

処理を分割する例

処理を分割する場合、ForkJoinTaskを実装した自クラスを再帰的に生成・呼び出す形になる。

import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class RecursiveExample2 extends RecursiveTask<Long> {
	private static final long serialVersionUID = 1L;
	private final int start;
	private final int end;

	public RecursiveExample2(int start, int end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Long compute() {
		int size = end - start;
		if (size <= 10000) {
			long sum = 0;
			for (int i = start; i <= end; i++) {
				sum += i;
			}
			return sum;
		}

		// 処理を分割し、自クラスを再帰的に呼び出す
		int split = start + size / 2;
		ForkJoinTask<Long> task1 = new RecursiveExample2(start, split).fork();
		ForkJoinTask<Long> task2 = new RecursiveExample2(split + 1, end).fork();
		return task1.join() + task2.join();
	}
}
		ForkJoinTask<Long> task = new RecursiveExample2(1, 100 * 10000);
		long result = task.fork().join(); // 処理開始+終了待ち+結果取得

ディレクトリーを走査する例

ディレクトリーを再帰的に走査し、ファイルを処理する例。

普通、ディレクトリーを走査する場合はJava7(JDK1.7)のFilesとJava8(JDK1.8)のStreamを使って、以下のように書く。

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.stream.Stream;
		Path path = 〜;
		try (Stream<Path> stream = Files.walk(path)) {
			stream.filter(p -> !Files.isDirectory(p)).forEach(file -> {
				// ファイルの処理
				System.out.println(file);
			});
		} catch (IOException e) {
			throw new UncheckedIOException(e);
		}

しかしこれはシングルスレッドで処理される。
ファイルの処理の部分だけThreadExecutorService(Future)を使ってマルチスレッドにしても、それらが終了するのを待つのは難しい(きれいに書けない)。


ForkJoinTaskを使ってディレクトリーを走査すれば、マルチスレッドで処理できる。

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class TraverseTask extends RecursiveAction {
	private static final long serialVersionUID = 1L;
	private final Path path;

	public TraverseTask(Path path) {
		this.path = path;
	}
	@Override
	protected void compute() {
		if (Files.isDirectory(path)) {
			try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
				stream.map(p -> new TraverseTask(p)) // ForkJoinTaskを生成
					.peek(task -> task.fork()) // fork実行
					.collect(Collectors.toList()).forEach(task -> task.join()); // 終了待ち
			} catch (IOException e) {
				throw new UncheckedIOException(e);
			}
			return;
		}

		// ファイルの処理
		System.out.println(path);
	}
}
		Path path = 〜;
		new TraverseTask(path).fork().join();

余談だが、一覧取得部分をメソッド参照で書くと以下のようになる。

			try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
				stream.map(TraverseTask::new) // ForkJoinTaskを生成
					.peek(ForkJoinTask::fork) // fork実行
					.collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち
			} catch (IOException e) {
				throw new UncheckedIOException(e);
			}

ForkJoinTaskを生成する部分をメソッド化すると、クラスを作らずに書ける。

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.Collectors;
import java.util.stream.Stream;
	static ForkJoinTask<?> traverse(Path path) {
		return ForkJoinTask.adapt(() -> {
			if (Files.isDirectory(path)) {
				try (Stream<Path> stream = Files.list(path)) { // ファイル・ディレクトリー一覧
					stream.map(p -> traverse(p)) // ForkJoinTaskを生成
						.peek(ForkJoinTask::fork) // fork実行
						.collect(Collectors.toList()).forEach(ForkJoinTask::join); // 終了待ち
				} catch (IOException e) {
					throw new UncheckedIOException(e);
				}
				return;
			}

			// ファイルの処理
			System.out.println(path);
		});
	}
		Path path = 〜;
		traverse(path).fork().join();

参考: ashigeruさんのディレクトリー削除メソッド(Files.delete()はディレクトリー内にファイルがあるとエラーになるので、再帰的に消していく必要がある)


Java目次へ戻る / 新機能へ戻る / 技術メモへ戻る
メールの送信先:ひしだま