JMSは、メッセージのやりとりをする方法を提供している。JavaEEの仕様のひとつで、JNDIの機能も利用している。
「メッセージ」というのは、オブジェクト指向で言う「メッセージ」と同じような意味。
すなわち“メッセージの送信”とは、実際にはメソッド呼び出しをすること。
その際に受け渡せるデータは、プリミティブ型やStringやJavaオブジェクト(要シリアライズ)と、XMLデータらしい。
これを非同期で送受信するのがJMSの役割。
例えば、複数のスレッドからメッセージをJMSが管理するキューへ入れ、別プロセスでそのキューからメッセージを読み取ることによって非同期の処理を実現する。
あるいは、1つのメッセージをJMS経由で複数の受信先へ送ることも出来る。
前者のような非同期処理は自分でThreadを起動すれば実現できるが、JMSを使えば高度なことが便利に出来る。
JMSは、JMSの機能を提供しているJ2EEサーバーが無いと使用できない。
例えばWebLogicはJMSを使用できるが、Tomcatは提供していない(JBossASは使える)。
JMSが管理するキューにメッセージを入れ、他方からメッセージを取り出す。
送信側 | J2EEサーバー | 受信側 | ||
---|---|---|---|---|
アプリS1 | → | キュー1 | → | アプリR1 |
アプリS2 | ||||
アプリS3 | ||||
アプリS4 | → | キュー2 | → | アプリR2 |
JMSでは、送信側は「プロデューサー」(もしくはセンダー)、受信側の事は「コンシューマー」(レシーバー)と呼ぶらしい。
PTPでは、同一のキューに対し複数のコンシューマー(受信アプリ)を登録(作成・接続)すると、メッセージはどれか1つ(ランダム?)に届けられるようだ。[2008-08-05]
(1つのメッセージが複数のコンシューマー全てに送信されるようなことは無い→というか、それはPTPではない)
まず、J2EEサーバーに対して使用するキューを登録(作成・設定)してやる必要がある。
→WebLogic10のJMSキューの登録方法
→JBoss4.2.3のJMSキューの定義方法 [2008-08-16]
JMS_FACTORY | QUEUE_NAME | その他 | ||
---|---|---|---|---|
WLS 10.0J | ○ | weblogic.jms.ConnectionFactory | MyQueue0 | |
○ | javax.jms.QueueConnectionFactory | MyQueue0 | ||
× | QueueConnectionFactory | MyQueue0 | JNDIに登録されていないっぽい。 | |
JBoss 4.2.3 | ○ | QueueConnectionFactory | queue/MyQueue1 | |
○ | ConnectionFactory | queue/MyQueue1 | ||
× | javax.jms.QueueConnectionFactory | queue/MyQueue1 |
public static final String JMS_FACTORY = "weblogic.jms.ConnectionFactory"; //WebLogic10 public static final String JMS_FACTORY = "ConnectionFactory"; //JBoss4.2.3 public static final String QUEUE_NAME = "MyQueue0"; //WebLogic10 public static final String QUEUE_NAME = "queue/MyQueue1"; //JBoss4.2.3
送信側のプログラムは、以下のような感じ。
import javax.jms.*;
import javax.naming.*;
〜
public static void main(String args[]) throws Exception {
// コンテキスト作成
Context ctx = initContext(); //APサーバー毎に異なる
// コネクションファクトリーとキューを取得
QueueConnectionFactory factory;
Queue queue;
try {
factory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
queue = (Queue) ctx.lookup(QUEUE_NAME);
} finally {
ctx.close();
}
QueueConnection connection = null;
QueueSession session = null;
QueueSender sender = null;
try {
// コネクションを作成
connection = factory.createQueueConnection();
// セッションを作成
session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
// キューセンダーを作成
sender = session.createSender(queue);
// メッセージ作成
TextMessage msg = session.createTextMessage();
msg.setText("テキストメッセージ!");
// ObjectMessage msg = session.createObjectMessage();
// msg.setObject(new Serializable() {
// private static final long serialVersionUID = 1L;
// });
// メッセージ送信
connection.start(); //これは無くても動くような…
sender.send(msg);
System.out.println("送信:" + msg);
} finally {
// if (sender != null) {
// try { sender.close(); } catch (JMSException e) {}
// }
// if (session != null) {
// try { session.close(); } catch (JMSException e) {}
// }
if (connection != null) {
try { connection.close(); } catch (JMSException e) {}
}
}
}
コンパイルするためには、JMSのライブラリーをクラスパスに加える必要がある。
C:\bea\modules\javax.jms_1.1.jar
WebLogic10を起動した状態でこのプログラムを実行すると、メッセージがWebLogicのキューに入れられる。
WebLogicの管理コンソールの「サービス」→「メッセージング」→「JMSモジュール」⇒「SystemModule-0」⇒「Queue-0」⇒「モニタ」でメッセージ(キュー)の状態が見られる。
キューからメッセージを全く取り出していない状態だと、メッセージをキューに入れると「最大メッセージ数」が増えてゆく。
import javax.jms.*; import javax.naming.*; 〜 public static void main(String[] args) throws Exception { // コンテキスト作成 Context ctx = initContext(); // コネクションファクトリーとキューを取得 QueueConnectionFactory factory; Queue queue; try { factory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); queue = (Queue) ctx.lookup(QUEUE_NAME); } finally { ctx.close(); } QueueConnection connection = null; QueueSession session = null; QueueReceiver receiver = null; try { // コネクションを作成 connection = factory.createQueueConnection(); // セッションを作成 session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // キューレシーバーを作成 receiver = session.createReceiver(queue); // メッセージ受信開始 connection.start(); for (;;) { Message msg = receiver.receive(); if (msg instanceof TextMessage) { TextMessage tm = (TextMessage) msg; System.out.println("受信:" + tm.getText()); } else if (msg instanceof ObjectMessage) { ObjectMessage om = (ObjectMessage) msg; Object obj = om.getObject(); System.out.println("受信:" + obj); } } } finally { // if (receiver != null) { // try { receiver.close(); } catch (JMSException e) {} // } // if (session != null) { // try { session.close(); } catch (JMSException e) {} // } if (connection != null) { try { connection.close(); } catch (JMSException e) {} } } }
WebLogicの管理コンソールの「サービス」→「メッセージング」→「JMSモジュール」⇒「SystemModule-0」⇒「Queue-0」⇒「モニタ」でメッセージ(キュー)の状態が見られる。
その表の「現在のコンシューマ数」が、そのキューに接続している受信プログラム(コンシューマー)の数だと思われる。
ウェブアプリ(サーブレット)からキューへ送信する場合、コンテキストの取得方法以外はバッチサンプルと全く同じでもよい。[2008-08-02]
public class JmsSampleServlet extends HttpServlet { @Override protected void doGet(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { try { sendQueue(req.getRequestURI()); } catch (NamingException e) { throw new ServletException(e); } catch (JMSException e) { throw new ServletException(e); } PrintWriter out = res.getWriter(); out.println("<html>"); out.println("<head><title>JMS送信サンプル</title></head>"); out.println("<body>JMSキューへ送信しました。</body>"); out.println("</html>"); } void sendQueue(String text) throws NamingException, JMSException { // コンテキスト作成 Context ctx = new InitialContext(); // 以下、バッチ系と全く同じ 〜 TextMessage msg = session.createTextMessage(); msg.setText(text); 〜 } }
InitialContextは、ウェブアプリの場合、「new InitialContext()」とすればそのAPサーバーを使う設定になるらしい。
しかし、接続ファクトリーやキューなどのJNDIオブジェクトはスレッドセーフなので、コンテキストから取得するのは最初の1回だけでいいらしい。[2008-08-03]
(サーブレットはマルチスレッドで動作するので、スレッドセーフなオブジェクトでない限り、何らかの対策をとらないと使えない)
(コンテキストを取得するのもそれなりにコストの高い処理なので、取得は極力減らしたい)
(ただし、実行中に送付先がエラーによって無効になった場合は再取得できるように作っておくべきらしい)
public class JmsSampleServlet extends HttpServlet { /** サーブレットの初期化時に一度だけ呼ばれる */ @Override public void init() throws ServletException { try { lookupJNDIObjects(); } catch (NamingException e) { throw new ServletException(e); } } private QueueConnectionFactory factory; private Queue queue; /** JNDIオブジェクト(接続ファクトリーやキュー)を取得する */ protected void lookupJNDIObjects() throws NamingException { Context ctx = new InitialContext(); try { // コネクションファクトリーを取得 factory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY); // キューを取得 queue = (Queue) ctx.lookup(QUEUE_NAME); } finally { ctx.close(); } } // // doGetは最初に作ったものと同じ // void sendQueue(String text) throws NamingException, JMSException { QueueConnectionFactory f = factory; Queue q = queue; if (f == null || q == null) { // JNDIオブジェクトが無効化されていたら、再取得を行う。 synchronized (this) { lookupJNDIObjects(); f = factory; q = queue; } } QueueConnection connection = null; try { // コネクションを作成 connection = f.createQueueConnection(); // セッションを作成 QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); // キューセンダーを作成 QueueSender sender = session.createSender(q); // メッセージ作成 TextMessage msg = session.createTextMessage(); msg.setText(text); // メッセージ送信 // connection.start(); //これは無くても動くような… sender.send(msg); System.out.println("送信:" + msg); } catch (JMSException e) { // エラーが発生したら、JNDIオブジェクトを無効化する。 // 送信先サーバーが停止すると無効になる可能性がある為。 synchronized (this) { factory = null; queue = null; } throw e; } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) {} } } } }
→EJB3.0のMessageDriven [2008-08-03]
createQueueSession()の第2引数には、acknowledgeMode(確認応答モード)を指定する。[2008-10-04]
JMSでは、受信側にメッセージがちゃんと届いたことを確認する「応答」というものがあるらしい。
「どういう状態になったら応答を返すのか」を指定するのがacknowledgeMode。
acknowledgeMode | 説明 |
---|---|
Session.AUTO_ACKNOWLEDGE | 自動的に確認応答される。 |
Session.CLIENT_ACKNOWLEDGE | クライアントが(明示的に)Message#acknowledge()メソッドを呼び出すと確認応答される。 |
Session.DUPS_OK_ACKNOWLEDGE | 遅延確認応答(なんだろそれ) |
参考: