はじめに
このブログ記事は”Oracle Certified Java Programmer, Gold SE 11認定資格(Java Gold SE11)”合格に向けた学習備忘録のまとめです。
< この学習備忘録について >
内容や重要な点が抜けていた場合には適宜更新していきます
Java Gold合格において重要な単元
Java Goldの学習においては、下記の単元が非常に重要となってきます。
- 関数型インタフェース
- ラムダ式
- JavaストリームAPI
- 並列処理
今回は太字になっている部分、つまり「並列処理」について私がこの章の学習で特に重要だと感じたポイントや、つまずきやすかった箇所を、具体的なコード例とともに備忘録としてまとめていきます。
私と同じようにJava Goldの合格を目指して勉強されている皆様にとって、少しでもお役に立てれば幸いです。
なぜ「並列処理」が重要なのか?
システム開発の現場、特に複数のユーザーが同時にアクセスするWebアプリケーションなどでは、「複数の処理が同時に行われる」状況はごく当たり前に発生します。
「複数で呼び出す」とは、まさに複数のスレッドが、同時に、またはごく近いタイミングで同じメソッドや共有リソースにアクセスしようとすることを指します。
このような状況でデータの整合性を保ち、安全に処理を行うために「並列処理」の知識は不可欠です。それでは、具体的な学習内容を見ていきましょう。
synchronizedによるスレッドセーフの実現
複数のスレッドから同時にアクセスされると問題が起きる可能性がある処理には、synchronizedを使ってロックをかけることで、一度に一つのスレッドしか実行できないようにします。
// このメソッドはスレッドセーフではない
public void increase(int val) {
num += val;
}
// synchronizedブロックで排他制御を行い、スレッドセーフを保証
public void decrease(int val) {
synchronized (this) { num -= val; }
}
// メソッド自体を同期化し、スレッドセーフなシングルトンパターンを実現
public static synchronized Foo getFoo() {
if(obj == null) obj = new Foo();
return obj;
}
使い分けのポイント
- synchronized ブロックを使う
┗ メソッド内の必要な部分だけを同期の対象としたい場合。ロックの範囲を絞ることで、パフォーマンスの向上(並列性の向上)が期待できる。 - メソッドに synchronized を使う
┗ メソッド内の処理すべてを同期の対象としたい場合にシンプルで便利。
BlockingDeque:両端キューの操作
Deque(デック)は「Double Ended Queue」の略で、その名の通り、コレクションの両端(先頭と末尾)から要素の追加・取り出しができるキューです。
BlockingDequeは、それに加えてスレッドセーフな操作を提供します。
コード例
import java.util.concurrent.*;
public class Test {
public static void main(String[] args) {
try {
BlockingDeque<String> dqueue = new LinkedBlockingDeque<>();
dqueue.offer("a"); // Dequeの状態:[ a ]
dqueue.offerFirst("b", 2, TimeUnit.SECONDS); // 先頭に"b"を追加。状態:[ b, a ]
dqueue.offerLast("c", 5, TimeUnit.MILLISECONDS); // 末尾に"c"を追加。状態:[ b, a, c ]
System.out.print(dqueue.pollFirst(7, TimeUnit.NANOSECONDS)); // 先頭から"b"を取り出す
System.out.print(dqueue.pollLast(1, TimeUnit.MILLISECONDS)); // 末尾から"c"を取り出す
} catch(InterruptedException e) { }
}
}
// 出力結果:b c
パラレルストリームで処理を高速化
大量のデータを扱う際に、処理を複数のスレッドに分割して並列実行することで、全体の処理時間を短縮するのがパラレルストリームです。
パラレルストリームの生成方法
- Collection(ListやSetなど)からは parallelStream() メソッドを呼び出します。
- 配列やIntStreamなどからは、まず stream() でシーケンシャルストリームを生成し、その後に parallel() メソッドで並列モードに切り替えます。
よくある間違い①:List から parallel() は呼び出せない
// コンパイルエラーになる例
Arrays.asList("100", "200", "300")
.parallel() // Listオブジェクトにはparallel()メソッドはない
.map(Integer::parseInt)
.reduce(0, (s1, s2) -> s1 + s2);
// 正しい例
Arrays.asList("100", "200", "300")
.parallelStream() // ListからはparallelStream()を呼び出す
.map(Integer::parseInt)
.reduce(0, (s1, s2) -> s1 + s2);
よくある間違い②:reduce の引数と戻り値の型
reduceの第二引数(Accumulator)は、BinaryOperator<T>という関数型インタフェースを実装します。これは、2つの同じ型の引数を受け取り、それと同じ型の戻り値を返す必要があります。
// コンパイルエラーになる例
Arrays.asList("123", "456", "789")
.parallelStream() // この時点ではStream<String>
.reduce(0, (s1, s2) -> s1.length() + s2.length()); // 引数s1, s2はString型だが、戻り値がint型のためエラー
// 正しい例
Arrays.asList("123", "456", "789")
.parallelStream()
.map(s -> s.length()) // mapでStream<Integer>に変換
.reduce(0, (s1, s2) -> s1 + s2); // 引数も戻り値もInteger型でOK
Runnable と Callable
非同期で実行したいタスクを定義する際、RunnableとCallableという2つのインタフェースが使われます。
- Runnable
┗ run()メソッドを持つ。引数なし、戻り値なし(void)。 - Callable
┗ call()メソッドを持つ。引数なし、戻り値あり。
タスクの結果を取得したい場合はCallable、実行するだけでよい場合はRunnableを選びます。
ExecutorServiceでスレッドを管理
ExecutorServiceは、スレッドプールを管理し、非同期タスクの実行を効率化するためのフレームワークです。これを使うことで、手動でThreadを生成・管理する手間が省け、より安全に並行処理を実装できます。
Executorsクラスの便利なファクトリメソッド
ExecutorServiceを生成する際には、Executorsクラスが提供する便利なファクトリメソッドを使うのが一般的です。用途に応じて最適なスレッドプールを簡単に作成できます。代表的なものをいくつか見ていきましょう。
newFixedThreadPool(int nThreads)
特徴
・指定された数(nThreads)の固定数のスレッドを持つスレッドプールを作成します。
・すべてのスレッドが使用中の場合、新しく依頼されたタスクはキューに入って待機します。
・サーバーのリソースが無限ではないため、動作するスレッド数を制限して、システムの負荷を安定させたい場合に非常に有効です。
主な用途
・同時に実行するスレッド数を厳密に管理したい場合。
・サーバーのパフォーマンスを安定させたい場合。newCachedThreadPool()
特徴
・必要に応じてスレッドを動的に作成・破棄する柔軟なスレッドプールです。
・スレッドが60秒間使われないと自動的に終了・破棄されるため、リソースの無駄遣いを防ぎます。
・ただし、スレッド数の上限が実質的にないため、タスクが大量に投入されるとスレッドが無限に生成され、システムリソースを枯渇させる危険性があります。
主な用途
・比較的短時間で終わるタスクが、不規則な間隔で大量に実行される可能性がある場合。newSingleThreadExecutor()
特徴
・スレッドが1つだけのスレッドプールです。
・すべてのタスクは、投入された順に1つずつ実行されることが保証されます(FIFO: First-In, First-Out)。
主な用途
・タスクの実行順序を厳密に保証したい場合。newScheduledThreadPool(int corePoolSize)
特徴
・指定した時間後にタスクを実行したり、定期的にタスクを繰り返し実行したりできる高機能なスレッドプールです。
・ScheduledExecutorServiceという、ExecutorServiceを拡張したインタフェースのインスタンスを返します。このサービスが提供する定期実行メソッドには、よく似ていて混同しやすい2つがあります。・scheduleAtFixedRate()
┗「一定間隔(Rate)」で実行します。前のタスクの開始時間を起点とするため、処理が長引くと次のタスクは遅れを取り戻すように即座に実行されることがあります。・scheduleWithFixedDelay()
┗「一定の遅延(Delay)」を挟みます。前のタスクが完全に終了してから指定時間を待つため、常に「処理時間 + 遅延時間」の間隔が保たれます。
execute()
vs submit()
execute(Runnable task)
┗ タスクを実行するだけ。戻り値は void。submit(Runnable task) / submit(Callable<T> task)
┗ タスクを実行し、その結果をラップした Future オブジェクトを返す。
Futureは、非同期処理の「予約券」や「注文票」のようなものです。 submit()でタスクを依頼すると、まずFuture(予約券)が返ってきます。実際の処理(料理)は別のスレッドで行われ、結果が欲しくなったらfuture.get()を呼び出すことで、処理の完了を待って結果(料理)を受け取ることができます。
コード例
// コンパイルエラーになる例
// execute()はvoidを返すため、Future<Object>型のlistに追加できない
list.add(service.execute(() -> i));
// 正しい例
// submit()はFuture<?>を返すため、listに追加できる
list.add(service.submit(() -> i));
忘れがちなshutdown()
ExecutorService service = Executors.newScheduledThreadPool(2);
try {
// タスクの実行
} finally {
if (service != null) {
service.shutdown(); // 必ず終了処理を入れる
}
}
findFirst() の挙動
findFirst()は、ストリームの最初の要素を取得する終端操作です。これは、ストリームがシーケンシャルでもパラレルでも、論理的な順序の最初の要素を返すことが保証されています。
コード例(シーケンシャル vs パラレル)
char c1 = List.of('a', 'b', 'c')
.stream()
.findFirst() // 'a'が取得される
.get();
char c2 = List.of('x', 'y', 'z')
.parallelStream()
.findFirst() // 'x'が取得される
.get();
まとめ
今回は「並列処理」の単元をざっと振り返ってみました。synchronizedによる排他制御から、ExecutorServiceやパラレルストリームを使った高度な並行処理まで、覚えるべき概念は多いですが、一つひとつ整理していくと理解が深まります。
特にExecutorService周りのファクトリメソッドの使い分けや、submit()とFutureの関係、定期実行メソッドの違いなどは、試験でも問われやすいポイントだと感じました。
Java Goldへの道はまだ続きますが、一緒に頑張っていきましょう!
コメント