[Java] Integrating WebSockets and JMS with CDI Events in Java EE 7

原文はこちら。
https://blogs.oracle.com/brunoborges/entry/integrating_websockets_and_jms_with

Web開発について検討する場合、最近ではWebSocketも検討されているのではないでしょうか。そして、手に入れられるものであれば何でも統合してみたいと思われていることでしょう。これを超えたクールなもの、例えばJMS 2.0のようなクールなモノをひっさげてJava EE7がやってきます。そしてあなたは不思議に思うことでしょう。自身のWebサイトに接続されているすべてのWebSocketセッションに対してどうやって非同期メッセージを送ることができるのだろう、と。サーバーからのプッシュ型で、ポーリングなしで、本格的なやつです。
答えは非常にシンプルで、CDIを使うことです。またこれはJava EEの魔法の糊としても知られています。CDIを使うと、明らかにアプリケーションと異なる部分間での相互通信を開発者は構築することができます。WebSocketアプリケーションがJMSを使ってメッセージを送受信できるようにするためのすべての手順をたどっていきましょう。

1 - Creating the WebSocket Server Endpoint

まず、クライアントからメッセージを受信するWebSocketのサーバーエンドポイントを構築する必要があります。また、サーバープッシュを使いインバウンドのJMSメッセージペイロードを用いて非同期でクライアントに通知する必要もあります。
@Named
@ServerEndpoint("/websocket")
public class WebSocketEndpoint implements Serializable {
 // this object will hold all WebSocket sessions connected to this WebSocket
 // server endpoint (per JVM)
 private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<Session>()); 
ここで、3個の主要なメソッドをこのWebSocketに追加する必要があります。
@OnOpen public void onOpen(final Session session) { sessions.add(session); } 
@OnMessage public void onMessage(final String message, final Session client) { ... } 
@OnClose public void onClose(final Session session) { sessions.remove(session); }
onOpenonCloseで、このエンドポイントに接続しているすべてのユーザーセッションを管理することに注目して下さい。後でセッションをonMessageの中で使う方法をご覧頂きます。ここでは、JMS Queueにメッセージを送信するSessionBeanを作成しましょう。

2 - Creating the SessionBean to send JMS messages

WebSocketおよびJMSの仕様の両方で定義されている制限により、WebSocketのエンドポイントの内部で直接JMSオブジェクトを作成することはできません。そのため、SessionBeanを作成して、インバウンドWebSocketメッセージをJMS Queueに転送する必要があります。以下のようにQueueSenderSessionBeanというクラスを作成します。
@Named
@LocalBean
@Stateless public class QueueSenderSessionBean { ... }
このSessionBeanは@Statelessであり、CDIコンテキスト(@Named)の一部でもあります。そして、インタフェースを持たないので、@LocalBeanです。それでは、ビジネスメソッド(sendMessage)を追加しましょう。
public void sendMessage(String message) { ... } 
かなりわかりやすいですね。JMS 2.0のすごいことの一つは、宛先へのメッセージ送信がシンプルである、ということです。そのために、2個のオブジェクトを注入する必要があります。
@Resource(mappedName = "jms/myQueue")
private Queue myQueue;
@Inject private JMSContext jmsContext; 
JMSContextはJMS APIに追加された、新しいクラスの一つです。ドキュメントは以下からどうぞ。
Interface JMSContext
http://jms-spec.java.net/2.0/apidocs/javax/jms/JMSContext.html
これはConnectionとSessionをカプセル化し、デフォルトのConnectionFactoryを使いますが、これらはすべてのJava EE 7認定アプリケーションサーバーが提供するリソースです。
Interface Connection
https://jms-spec.java.net/2.0/apidocs/javax/jms/Connection.html
Interface Session
https://jms-spec.java.net/2.0/apidocs/javax/jms/Session.html
Interface ConnectionFactory
https://jms-spec.java.net/2.0/apidocs/javax/jms/ConnectionFactory.html
次は、ロジックを以前追加したメソッドに追加する必要があります。
jmsContext.createProducer().send(myQueue, message);
そして、SessionBeanに手をつけましょう。次は、SessionBeanとWebSocket間に糊を追加して、JMS宛先にメッセージを送信します。

3 - Forwarding an incoming WebSocket message to a JMS destination

これはまったく簡単です。やるべきことは、SessionBeanをWebSocketに注入し、エンドポイントのonMessage内でsendMessageメソッドを呼び出すことです。まず注入から始めますが、不具合のためにコンストラクタinjectionを実行する必要があります。WebSocketサーバーエンドポイントクラスのWebSocketEndpointを開き、以下のフィールドを追加します。
private QueueSenderSessionBean senderBean;
以下のコンストラクタを追加します。
@Inject
 public WebSocketEndpoint(QueueSenderSessionBean sb) {
     this.senderBean = sb;
}
次はシンプルにonMessage内からメソッドを呼び出します。
senderBean.sendMessage(message);
このアプリケーションの最初の部分はこれで完了です。このコードを使ってWebSocketクライアントからJMS宛先にメッセージを送信することができます。では次にこの逆をやってみましょう。JMS QueueからWebSocketクライアントにデータを送信してみましょう。

4 - Listening to a JMS Destination with a MessageDriven Bean

(面白い事実)まだご存知ない開発者もいらっしゃいますが、JMS APIはMessageDrivenアノテーションを指定しません。その代わりに、EJB仕様の一部であるため、JMSのみならず他の多くのもので利用できます。素晴らしいApache TomEEからDavid Blevinsが気付き、EJBの仕様に小さな変更を提案しました。その提案とは、リソースアダプタがmessagelistener-typeを提供するために、コネクタを必要とするというものです。しかし彼の提案では、MDBを使って様々なものをリスニングできるはずで、リスナーインタフェースはオプションであるべきだ、と言っています。一つの例は、Telnetコマンドのリスニングです。実にすばらしい!しかし、ここではJMS固有のユースケースに着目しましょう。
Annotation Type MessageDriven
http://docs.oracle.com/javaee/6/api/javax/ejb/MessageDriven.html
David Blevins (Twitter)
https://twitter.com/dblevins
Apache TomEE
http://tomee.apache.org/
Modernize Connector/MDB
https://java.net/jira/browse/EJB_SPEC-60
dblevins / mdb-improvements (GitHub)
https://github.com/dblevins/mdb-improvements
WebSocketクライアントからQueue宛先にメッセージを発行することができるようになっているので、処理のあと、どこかに転送する必要があります。JMS MDBを作り始めましょう(覚えておいて欲しいのは、すべてのMDBが暗黙のうちにJMSと関係があるわけではありません)。JMS ResourceAdapterコネクタが必要とするMessageListenerインタフェースを実装しましょう。
Interface MessageListener
http://docs.oracle.com/javaee/6/api/javax/jms/MessageListener.html
@Named
@MessageDriven(mappedName = "jms/myQueue")
public class WebSocketMDB implements MessageListener {
    @Override
    public void onMessage(Message msg) { ... }
}
これはすべてのJMS MDBの基本的なコードです。これから魔法をかけていきましょう。

5 - Firing CDI events with the JMS Message payload

仕様の制限のために、WebSocketサーバーのエンドポイントから直接JMS宛先をリスニングできないことをお話したことを覚えてますでしょうか。まぁ、実際には違うテクニックを使えばできるのですがね。CDI Eventについて聞いたことがないのであれば、このチュートリアルを進める前に以下をお読み下さい。
Using Events in CDI Applications (The Java EE 6 Tutorial)
http://docs.oracle.com/javaee/6/tutorial/doc/gkhic.html
Working with Events in CDI
https://netbeans.org/kb/docs/javaee/cdi-events.html
Decoupling event producers and event consumers in Java EE 6 using CDI and JMS (Juliano Viana)
https://weblogs.java.net/blog/jjviana/archive/2010/04/14/decoupling-event-producers-and-event-consumers-java-ee-6-using-cdi-a
読み終えましたか?それでは進めましょう。まず、Event修飾子です。WSJMSMessageアノテーションをプロジェクト内に作成します。
@Qualifier
  @Retention(RetentionPolicy.RUNTIME)
  @Target({ElementType.METHOD, ElementType.FIELD, ElementType.PARAMETER, ElementType.TYPE})
  public @interface WSJMSMessage {}
定義した修飾子を使い、CDIは出力されたイベントをオブザーバーオブジェクトに接続することができます。WebSocketMDBに戻って、先ほど作成した修飾子を使って、Eventディスパッチャーを追加します。
Interface Event<T>
http://docs.oracle.com/javaee/6/api/javax/enterprise/event/Event.html
@Inject
@WSJMSMessage
Event<Message> jmsEvent; 
ロジックをonMessageメソッドに追加しましょう。
jmsEvent.fire(msg);

6 - Listening to CDI events within the WebSocket server endpoint

これがこの記事のサーバー側の最後のパートです。次はクライアント側でJavascriptのコード記述方法をご覧頂きます。Messageペイロードを使って、MDBが出力したCDIイベントをリスニングしましょう。再度WebSocketEndpointクラスを開き、以下のメソッドを追加しましょう。
public void onJMSMessage(@Observes @WSJMSMessage Message msg) {
     try {
         for (Session s : sessions) {
             s.getBasicRemote().sendText("message from JMS: " + msg.getBody(String.class));
         }
    } catch (IOException | JMSException ex) {
            Logger.getLogger(WebSocketEndpoint.class.getName()).log(Level.SEVERE, null, ex);
    }
}
@Observesと以前定義した@WSJMSMessage修飾子に着目しましょう。これでCDIにMDBが発したイベントをリスニングするように指示するのです。
Annotation Type Observes
http://docs.oracle.com/javaee/6/api/javax/enterprise/event/Observes.html

7 - Client-side Javascript to connect with the WebSocket server endpoint

これがしばらくの間Internet界隈に出てきましたが、Java固有でもJava EE固有でもありません。とはいえ、とにかく基本的にこれです。
// note the final path is the same defined inside WebSocketEndpoint class at @ServerEndpoint
websocketSession = new WebSocket('ws://' + document.location.host + '/your-app-context-root/websocket');
このサンプルで使っているJavascriptとHTMLインタフェースは以下のリンクからどうぞ。
brunoborges / javaee7-jms-websocket-example (GitHub)
Javascript
https://github.com/brunoborges/javaee7-jms-websocket-example/blob/master/src/main/webapp/myapp.js
HTMLインタフェース
https://github.com/brunoborges/javaee7-jms-websocket-example/blob/master/src/main/webapp/index.html

Conclusion

この記事がJava EE 7での開発を始めるにあたって役立つことを期待しています。そして。CDIやWebSocket、JMS、EJBとの統合の可能性を理解する上で役立つことを願っています。これらはこのエントリの主要なポイントです。
  • WebSocketクライアントと非同期通信できること(session.getAsyncRemote()を使って非同期でメッセージを送信できるわけですけれども)
  • アプリケーションの任意の場所でWebSocketクライアントに対しサーバープッシュすることができる、ということ
  • JMS Topicを使って、クラスタ全体でWebSocketクライアントセッションへのサーバーからプッシュされた通信をスケールすることができる、ということ
    これはおそらくこの設定における最も興味深いところの一つでしょう。QueueのかわりにTopicを使うと、クラスタ全体でアプリケーションに接続している全てのWebSocketセッションに対し、データをプッシュすることができます。Webサーバーあたり、およそ6万4000個のクライアントセッションという限界があり、この例では静的なsynchronized Setを使ってこれらの参照を保持しています。クラスタの場合を想像してみましょう。これをクラスタ化されたTopicのサブスクライバに変えると、サーバーからプッシュされたデータをスケールアップすることができます。
このプロジェクトのソースコードはGitHubのjavaee7-jms-websocket-exampleにUpしてあります。
Bruno Borgesのリポジトリ(GitHub)
https://github.com/brunoborges/
brunoborges / javaee7-jms-websocket-example (Java EE 7 Example for JMS and WebSockets integration)
https://github.com/brunoborges/javaee7-jms-websocket-example/

0 件のコメント:

コメントを投稿