S-JIS[2013-12-20/2023-09-23] 変更履歴

Java ExecutorService/Future


概要

Javaでスレッドを扱うと言えばJDK1.4まではThreadクラス・Runnableインターフェースだったが、
JDK1.5でExecutorService・Callableインターフェース・Futureが導入された。

ExecutorServiceでは、同時に実行するスレッド数を指定することが出来る。
処理を登録(submit)しても、指定したスレッド数までしか同時に実行されない。
(実行可能なスレッド数に足りない場合、登録したスレッドは実行待ちになる。実行中のスレッドの処理が終わると、待機していたスレッドが実行される)

Callableインターフェースは、ExecutorServiceで実行する処理を定義する為のインターフェース。
Runnableインターフェースは戻り値を指定することが出来ないし例外もRuntimeExceptionしかスローできないが、
Callableインターフェースは戻り値を指定することが出来るし、どんな例外をスローすることも出来る。

ExecutorServiceでCallableやRunnableを実行開始(submit)すると、Futureが返される。
別スレッドの処理の終了待ちはFutureに対して行う。
Futureを使うと、別スレッドで処理した結果を受け取るのが簡単になる。
また、別スレッドを実行する際のタイムアウト時間を指定することも出来る。


ExecutorServiceを使ってスレッドで処理を実行する例。

  ExecutorServiceを使った例 従来のThreadを使った例 説明
import文
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
Thread系はjava.langパッケージ配下なので、
明示的にインポートするものは無い。
1スレッド
Runnable
Task1 task = new Task1();

ExecutorService service = Executors.newSingleThreadExecutor();
try {
  Future<?> future = service.submit(task);
  try {
    future.get();

    long sum = task.getSum();
    System.out.println(sum);
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
} finally {
  service.shutdownNow();
}
Task1 task = new Task1();

Thread thread = new Thread(task);

thread.start();
try {
  thread.join();

  long sum = task.getSum();
  System.out.println(sum);
} catch (InterruptedException e) {
  e.printStackTrace();
}
Thread#start()に該当するものは、
ExecutorService#submit()。

Thread#join()に相当するものは、
Future#get()。
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
Runnableインターフェースを実装した処理クラスは
ThreadでもExecutorServiceでも同じ。
1スレッド
Callable
Task1 task = new Task1();

ExecutorService service = Executors.newSingleThreadExecutor();
try {
  Future<Long> future = service.submit(task);
  try {
    long sum = future.get();


    System.out.println(sum);
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
} finally {
  service.shutdownNow();
}
Task1 task = new Task1();

Thread thread = new Thread(task);

thread.start();
try {
  thread.join();

  long sum = task.getSum();
  System.out.println(sum);
} catch (InterruptedException e) {
  e.printStackTrace();
}
Callableインターフェースを使った場合は、
Future#get()で処理の戻り値を受け取ることが出来る。
Callable#call()で例外がスローされた場合は、
Future#get()でExecutionExceptionが発生する。
そのcauseがCallableでスローされた例外。
class Task1 implements Callable<Long> {
  @Override
  public Long call() throws Exception {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    return sum;
  }
}
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
Callableインターフェースでは戻り値の型を指定できる。
callメソッドに処理を実装する。
戻り値を指定できるし、例外も何でもスローできる。
複数スレッド
Callable
ExecutorService service = Executors.newFixedThreadPool(10);
try {
  List<Future<Long>> list = new ArrayList<>();
  for (int i = 0; i < 10; i++) {

    Future<Long> future = service.submit(new Task1());
    list.add(future);
  }
  for (Future<Long> future : list) {
    try {
      future.get();
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
} finally {
  service.shutdownNow();
}

List<Thread> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  Thread thread = new Thread(new Task1());
  thread.start();
  list.add(thread);
}
for (Thread thread : list) {
  try {
    thread.join();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}
 
複数スレッド
終了待ち
List<Task1> taskList = new ArrayList<Task1>(10);
for (int i = 0; i < 10; i++) {
  taskList.add(new Task1());
}

try {
  List<Future<Long>> result = service.invokeAll(taskList);
} catch(InterruptedException e) {
  e.printStackTrace();
} finally {
  service.shutdownNow();
}
  invokeAllで複数タスクを同時に実行し、全てが終わるまで待つ。[2020-10-08]

なお、 タスク内で例外が発生しても、invokeAllからはスローされない。
Futureからgetしたときにスローされる。
(つまり全スレッドが終了してからでないと例外を取得できない。
個別に処理したい場合はafterExecuteメソッドを使用する)

クローズ
try (var service = Executors.newFixedThreadPool(10)) {
  〜
}
  Java19でExecutorServiceがAutoCloseableを実装した。[2022-09-25]
これにより、shutdown()を呼び出す代わりにtry-with-resources構文が使えるようになった。
仮想スレッド
try (var service = Executors.newVirtualThreadPerTaskExecutor()) {
  var future = service.submit(task); // 実行開始
  future.get(); // 終了待ち
}
  Java21で仮想スレッドが使えるようになった。[2023-09-23]

タイムアウト時間の指定

ExecutorServiceでsubmitする際にタイムアウト時間を指定することが出来る。

ExecutorService service = Executors.newSingleThreadExecutor();
try {
	Future<Integer> future = service.submit(new Task2());
	try {
		int result = future.get(5, TimeUnit.SECONDS);	//5秒でタイムアウト
		System.out.println(result);
	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	} catch (TimeoutException e) {
		System.out.println("タイムアウト発生");
	}
} finally {
	service.shutdownNow();
}

Future#get()メソッドでタイムアウト時間を指定する。
その時間が経過しても別スレッドの処理が終わらない場合、Future#get()でTimeoutExceptionが発生する。


スレッド内のタイムアウトの判定

一方、別スレッドで処理している側(RunnableやCallable)は、タイムアウト時間になったからと言って処理が中断されるわけでもなく、続行される。

ただ、スレッドとしてはキャンセル状態になる(割り込みが発生する)ので、判定することは出来る。

class Task2 implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		long prev = System.currentTimeMillis();
		long end = prev + TimeUnit.SECONDS.toMillis(10); //10秒間待つ
		while (System.currentTimeMillis() < end) {
			long now = System.currentTimeMillis();
			if (now - prev >= 1000) {
				System.out.printf("%d\t%b%n", now, Thread.currentThread().isInterrupted());
				prev = now;
			}
		}
		return 0;
	}
}

キャンセル状態になるとThread#isInterrupted()がtrueになる。


また、Thread.sleep()でスリープしている場合は、InterruptedExceptionが発生する。
(なので、Thread.sleep()を使うときにInterruptedExceptionを握りつぶすのは良くない)

class Task2s implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		try {
			// スレッド呼び出し側でタイムアウトすると、InterruptedExceptionが発生する
			Thread.sleep(TimeUnit.SECONDS.toMillis(10));
		} catch (InterruptedException e) {
			e.printStackTrace();
			throw e;
		}
		return 0;
	}
}

同様に、ProcessProcessBuilderで実行する別プロセス)のwaitFor()でもInterruptedExceptionが発生する。

class Task2p implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		ProcessBuilder pb = new ProcessBuilder("cmd.exe");
		Process process = pb.start();
		try {
			return process.waitFor();
		} catch (InterruptedException e) {
			e.printStackTrace();
			throw e;
		}
	}
}

Callableの中からさらに別のFutureを実行している場合でも、連鎖してキャンセル状態(interrupted)になる。


afterExecute

Executors.newFixedThreadPool()から返るThreadPoolExecutorクラスには、タスクの実行前に呼ばれるbeforeExecuteメソッドや、タスクの終了後に呼ばれるafterExecuteメソッドがある。[2020-11-04]

ExecutorServiceのinvokeAllで複数タスクを同時に実行した場合、タスク内で例外が発生してもinvokeAllからスローされるわけではなく、全スレッドの実行が終了した後に返ってくるFutureのget()を呼び出したときにスローされる。
しかしafterExecuteメソッドは、タスク内から例外がスローされた時点で呼ばれる。


自分でbeforeExecuteやafterExecuteメソッドの中を実装したい場合は、Executors.newFixedThreadPool()を使う代わりに、自前でThreadPoolExecutorインスタンスを生成する。

import java.util.concurrent.ThreadPoolExecutor;
	private static ExecutorService newExecutorService(int nThreads) {
//		return Executors.newFixedThreadPool(nThreads);

		return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()) {

			@Override
			protected void beforeExecute(Thread t, Runnable r) {
				// TODO Auto-generated method stub
			}

			@Override
			protected void afterExecute(Runnable r, Throwable t) {
				// TODO Auto-generated method stub
			}
		};
	}

afterExecuteメソッドには引数にThrowableがあるので、タスクで例外が発生したらここに入ってくるように見える
しかしそれは罠で、実際には自分がsubmitinvokeAllで指定したタスク(CallableやRunnable)はFutureTaskというクラス(Futureの実装)にラップされており、発生した例外はFutureTaskの中で保持されている。
(そして引数のRunnableは、実体がFutureTaskである)

したがって、タスク内で発生した例外をafterExecuteメソッドで参照するには、以下のようにする必要がある。

			@Override
			protected void afterExecute(Runnable r, Throwable t) {
				Future<?> task = (Future<?>) r;
				try {
					task.get();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (ExecutionException e) {
					Throwable cause = e.getCause(); // タスク内で発生した例外
					cause.printStackTrace();
				}
			}

Java目次へ戻る / 新機能へ戻る / 技術メモへ戻る
メールの送信先:ひしだま