S-JIS[2013-12-20] 変更履歴

Java ExecutorService/Future


概要

Javaでスレッドを扱うと言えばJDK1.4まではThreadクラス・Runnableインターフェースだったが、
JDK1.5でExecutorService・Callableインターフェース・Futureが導入された。

ExecutorServiceでは、同時に実行するスレッド数を指定することが出来る。
処理を登録(submit)しても、指定したスレッド数までしか同時に実行されない。
(実行可能なスレッド数に足りない場合、登録したスレッドは実行待ちになる。実行中のスレッドの処理が終わると、待機していたスレッドが実行される)

Callableインターフェースは、ExecutorServiceで実行する処理を定義する為のインターフェース。
Runnableインターフェースは戻り値を指定することが出来ないし例外もRuntimeExceptionしかスローできないが、
Callableインターフェースは戻り値を指定することが出来るし、どんな例外をスローすることも出来る。

ExecutorServiceでCallableやRunnableを実行開始(submit)すると、Futureが返される。
別スレッドの処理の終了待ちはFutureに対して行う。
Futureを使うと、別スレッドで処理した結果を受け取るのが簡単になる。
また、別スレッドを実行する際のタイムアウト時間を指定することも出来る。


ExecutorServiceを使ってスレッドで処理を実行する例。

  ExecutorServiceを使った例 従来のThreadを使った例 説明
import文
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
Thread系はjava.langパッケージ配下なので、
明示的にインポートするものは無い。
1スレッド
Runnable
Task1 task = new Task1();

ExecutorService pool = Executors.newSingleThreadExecutor();
try {
  Future<?> future = pool.submit(task);
  try {
    future.get();

    long sum = task.getSum();
    System.out.println(sum);
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
} finally {
  pool.shutdownNow();
}
Task1 task = new Task1();

Thread thread = new Thread(task);

thread.start();
try {
  thread.join();

  long sum = task.getSum();
  System.out.println(sum);
} catch (InterruptedException e) {
  e.printStackTrace();
}
Thread#start()に該当するものは、
ExecutorService#submit()。

Thread#join()に相当するものは、
Future#get()。
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
Runnableインターフェースを実装した処理クラスは
ThreadでもExecutorServiceでも同じ。
1スレッド
Callable
Task1 task = new Task1();

ExecutorService pool = Executors.newSingleThreadExecutor();
try {
  Future<Long> future = pool.submit(task);
  try {
    long sum = future.get();


    System.out.println(sum);
  } catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
  }
} finally {
  pool.shutdownNow();
}
Task1 task = new Task1();

Thread thread = new Thread(task);

thread.start();
try {
  thread.join();

  long sum = task.getSum();
  System.out.println(sum);
} catch (InterruptedException e) {
  e.printStackTrace();
}
Callableインターフェースを使った場合は、
Future#get()で処理の戻り値を受け取ることが出来る。
Callable#call()で例外がスローされた場合は、
Future#get()でExecutionExceptionが発生する。
そのcauseがCallableでスローされた例外。
class Task1 implements Callable<Long> {
  @Override
  public Long call() throws Exception {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    return sum;
  }
}
class Task1 implements Runnable {
  private long sum;
  @Override
  public void run() {
    long sum = 0;
    for (int i = 1; i <= 100 * 10000; i++) {
      sum += i;
    }
    this.sum = sum;
  }
  public long getSum() {
    return sum;
  }
}
Callableインターフェースでは戻り値の型を指定できる。
callメソッドに処理を実装する。
戻り値を指定できるし、例外も何でもスローできる。
複数スレッド
Callable
ExecutorService pool = Executors.newFixedThreadPool(10);
try {
  List<Future<Long>> list = new ArrayList<>();
  for (int i = 0; i < 10; i++) {

    Future<Long> future = pool.submit(new Task1());
    list.add(future);
  }
  for (Future<Long> future : list) {
    try {
      future.get();
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
  }
} finally {
  pool.shutdownNow();
}

List<Thread> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  Thread thread = new Thread(new Task1());
  thread.start();
  list.add(thread);
}
for (Thread thread : list) {
  try {
    thread.join();
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}
 

タイムアウト時間の指定

ExecutorServiceでsubmitする際にタイムアウト時間を指定することが出来る。

ExecutorService pool = Executors.newSingleThreadExecutor();
try {
	Future<Integer> future = pool.submit(new Task2());
	try {
		int result = future.get(5, TimeUnit.SECONDS);	//5秒でタイムアウト
		System.out.println(result);
	} catch (InterruptedException | ExecutionException e) {
		e.printStackTrace();
	} catch (TimeoutException e) {
		System.out.println("タイムアウト発生");
	}
} finally {
	pool.shutdownNow();
}

Future#get()メソッドでタイムアウト時間を指定する。
その時間が経過しても別スレッドの処理が終わらない場合、Future#get()でTimeoutExceptionが発生する。


スレッド内のタイムアウトの判定

一方、別スレッドで処理している側(RunnableやCallable)は、タイムアウト時間になったからと言って処理が中断されるわけでもなく、続行される。

ただ、スレッドとしてはキャンセル状態になる(割り込みが発生する)ので、判定することは出来る。

class Task2 implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		long prev = System.currentTimeMillis();
		long end = prev + TimeUnit.SECONDS.toMillis(10); //10秒間待つ
		while (System.currentTimeMillis() < end) {
			long now = System.currentTimeMillis();
			if (now - prev >= 1000) {
				System.out.printf("%d\t%b%n", now, Thread.currentThread().isInterrupted());
				prev = now;
			}
		}
		return 0;
	}
}

キャンセル状態になるとThread#isInterrupted()がtrueになる。


また、Thread.sleep()でスリープしている場合は、InterruptedExceptionが発生する。
(なので、Thread.sleep()を使うときにInterruptedExceptionを握りつぶすのは良くない)

class Task2s implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		try {
			// スレッド呼び出し側でタイムアウトすると、InterruptedExceptionが発生する
			Thread.sleep(TimeUnit.SECONDS.toMillis(10));
		} catch (InterruptedException e) {
			e.printStackTrace();
			throw e;
		}
		return 0;
	}
}

同様に、ProcessProcessBuilderで実行する別プロセス)のwaitFor()でもInterruptedExceptionが発生する。

class Task2p implements Callable<Integer> {

	@Override
	public Integer call() throws Exception {
		ProcessBuilder pb = new ProcessBuilder("cmd.exe");
		Process process = pb.start();
		try {
			return process.waitFor();
		} catch (InterruptedException e) {
			e.printStackTrace();
			throw e;
		}
	}
}

Callableの中からさらに別のFutureを実行している場合でも、連鎖してキャンセル状態(interrupted)になる。


Java目次へ戻る / 新機能へ戻る / 技術メモへ戻る
メールの送信先:ひしだま