[Java] JAX-RS 2.1 Server Sent Events

原文はこちら。
https://blogs.oracle.com/PavelBucek/entry/jax_rs_2_1_server

このブログの通常の書き方に比べて、このエントリのほうがより良い言葉(そして文法的に正しい)だとすれば、それは以下の文章がSantiagoが作成したJAX-RS 2.1仕様の一部だからです。

この章に対するフィードバックは、users@jax-rs-spec.java.netまでご連絡ください。

Server Sent Events

Server-sent events (SSE) は元々、HTML5の中でW3Cが導入した仕様ですが、現在ではWHATWGがメンテナンスしています。
HTML Standard / Server-sent events
https://html.spec.whatwg.org/#server-sent-events
これはサーバからクライアントへの1方向のチャネルを確立する方法を提供します。接続は長時間にわたり続きます。サーバから送信される複数のイベントのために再利用されますが、これはHTTPプロトコルを基にしています。クライアントは特別なメディアタイプ(text/event-stream)をHTTP HeaderのAcceptで指定することで、SSE接続の開通をリクエストします。
イベントは構造化され、イベント、データ、ID、リトライ、およびコメントといういくつかのフィールドが含まれています。SSEは、イベントフィールドがトピックに対応するメッセージングプロトコルであり、idフィールドを使用してイベントの順序を検証し、連続性を保証することができます。コネクションが何らかの理由で中断された場合、IDをリクエストヘッダーに入れて、サーバーに対し過去のイベントを再実行させることができます。ただしこれは、すべての実装でサポートされているわけではない、オプションの動作です。イベントペイロードはデータフィールドで運ばれ、テキスト形式でなければなりません。 リトライは再接続を制御するために使用されます。コメントは一般的な目的のフィールドであり、接続を維持するためにも使用できます。

Client API

SSE用のJAX-RSクライアントAPIは、HTML5の対応するJavaScript APIに触発されたものですが、異なる言語の使用やJava 9からFlow APIをサポートしたいという願望に由来する変更を伴いました。しかしながら、そのバージョンのJavaの依存性の導入はありませんでした。
Reactive Programming with JDK 9 Flow API
https://community.oracle.com/docs/DOC-1006738
[Java] Reactive Programming with JDK 9 Flow API
https://orablogs-jp.blogspot.jp/2016/10/reactive-programming-with-jdk-9-flow-api.html

クライアントAPIへのエントリポイントは、SseEventSource型です。これは、JAX-RS APIの他のクラスと同様のビルダーを提供します。SseEventSourceは、リソースの場所を使ってすでに構成済みのWebTargetから作成されます。SseEventSourceはWebTargetの機能を複製せず、SSEのための必要なロジックのみを追加します。

以下は、SSE接続を開いてちょっとの間メッセージを読む例です。
try (final SseEventSource eventSource = SseEventSource.target(target).build()) {
    eventSource.subscribe(System.out::println);
    eventSource.open();
    Thread.sleep(500); // consume events for 500 ms.
} catch (InterruptedException e) {
    // ...
} 
この例でわかるように、SseEventSourceはAutoCloseableを実装しています。 ソースを開く前に、クライアントはイベント・ストリームをサブスクライブし、各イベントを単に出力するイベント・コンシューマーを登録します。onSubscribe、onComplete、onErrorといった他のライフサイクルイベントのためのハンドラもサポートされていますが、簡単にするためにonEventのみを上記の例で示しています。

Server API

JAX-RS SSEサーバーAPIを使って、接続を受け入れ、1個以上のクライアント(サブスクライバ)に対しイベントを送信します。SseEventSinkを挿入し、text/event-streamメディアタイプを生成するリソースメソッドは、SSEリソースメソッドです。

以下の例は、SSE接続を受け入れ、接続を閉じる前にエグゼキュータ・スレッドを使用して3つのイベントを送信しています。
@GET
@Path("eventStream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void eventStream(@Context SseEventSink eventSink, @Context Sse sse) {
    executor.execute(() -> {
        try(SseEventSink sink = eventSink) {
            eventSink.onNext(sse.newEvent("event1"));
            eventSink.onNext(sse.newEvent("event2"));
            eventSink.onNext(sse.newEvent("event3"));
            eventSink.close();
        } catch (IOException e) {
        // handle exception
        }
    });
}
SSEリソースメソッドは、着信接続を表すオブジェクト(この場合はSseEventSink)がリソースメソッドに注入されるという点で、非同期処理の場合と同様のパターンに従います。
上記の例では、イベントとブロードキャスタのファクトリメソッドを提供するSse型も挿入しています。SseEventSourceと同様に、インターフェースSseEventSinkもauto-closeableであるため、上記のtry-with-resourcesステートメントを使用することに注意してください。

Broadcasting

アプリケーションは、同時に複数のクライアントへイベントを送信する必要があるかもしれませんが、こうしたアクションは、JAX-RSでブロードキャストと呼ばれます。1つのSseBroadcasterに対し、複数のSseEventSinkを登録(またはサブスクライブ)することができます。そのため、すべてのSseBroadcasterは、Java 9 Flow APIで定義された意味でのパブリッシャーでもあり、Flow.Publisher<T>と同じメソッドをサポートしますが、Java 9への依存を避けるために直接インターフェースを拡張しません。より正確には、SseBroadcasterはFlow.Publisher<OutboundSseEvent>を実装します。

挿入されたSseインスタンスでnewBroadcasterというメソッドを呼び出すことによってのみbroadcasterを作成することができます。SseBroadcasterのライフサイクルとスコープはアプリケーションによって完全に管理されます(JAX-RSランタイムが管理するわけではありません)。以下の例は、broadcasterの利用方法を紹介しています。リソースクラスの@Singletonアノテーションに着目してください。
@Path("/")
@Singleton
public class SseResource {
    private final Sse sse;
    private final SseBroadcaster sseBroadcaster;
    
    public SseResource(@Context Sse sse) {
        this.sse = sse;
        this.sseBroadcaster = sse.newBroadcaster();
    }

    @GET
    @Path("subscribe")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void subscribe(@Context SseEventSink eventSink) {
        eventSink.onNext(sse.newEvent("welcome!"));
        sseBroadcaster.subscribe(eventSink);
    }

    @POST
    @Path("broadcast")
    @Consumes(MediaType.MULTIPART_FORM_DATA)
    public void broadcast(@FormParam("event") String event) {
        sseBroadcaster.broadcast(sse.newEvent(event));
    }
}

Environment

SseEventSourceクラスは、RuntimeDelegateに基づく既存のJAX-RSメカニズムを使用して、サービス名javax.ws.rs.sse.SseEventSource.Builderを使用する実装を検索します。 javax.ws.rs.sseの大部分の型はスレッドセーフです。 スレッドセーフの詳細については、Javadocを参照してください。

0 件のコメント:

コメントを投稿