[Java] Reactive Programming with JDK 9 Flow API

原文はこちら。
https://community.oracle.com/docs/DOC-1006738

リアクティブ・プログラミング(Reactive Programming)は非同期データ・アイテムのストリームを処理するものです。アプリケーションはデータ・アイテムが発生すると、そのデータ・アイテムに反応します。Rahul Srivastavaによるこの記事で、JDK 9 Flow APIを使った例をご紹介します。

What is Reactive Programming ?

Reactive Programmingとは、非同期データ項目のストリームを処理するものです。ここで、アプリケーションはデータ項目が発生するとそのデータに対応します。データストリームは基本的に時系列で発生するデータ項目の並び(シーケンス)です。このモデルは、データをストリームとして処理するため、インメモリデータの反復処理と比較するとメモリ効率がよいものです。

Reactive Programmingモデルでは、Publisher(パブリッシャ)とSubscriber(サブスクライバ)が存在します。Publisherはデータストリームを発行(publish)し、Subscriberは非同期で当該データストリームを購読(subscribe)します。

このモデルは、Processorを使ってストリーム上で動作するより高い高階関数を提供するメカニズムも提供します。Processorはデータストリームを変換します。この際、PublisherやSubscriberを変更する必要はありません。Processor (もしくはProcessorのチェーン)はPublisherとSubscriberの間に位置し、データストリームを次々変換します。PublisherとSubscriberはデータストリームに発生する変換とは無関係です。


Why Reactive Programming ?

  • よりシンプルなコードで、より可読性が高い
  • ボイラープレートを取り除き、ビジネスロジックに集中できる
  • 低レベルのスレッド、同期、並行性の問題を取り除くことができる
  • ストリーム処理はメモリ効率がよい
  • このモデルを適用すると、ほとんど全ての種類の問題を解決することができる

JDK 9 Flow API

JDK 9のFlow APIはデファクトスタンダードのReactive Streams Specificationに対応します。Reactive Streams SpecificationはReactive Programmingを標準化するイニシアティブの一つです。
Reactive Streams Specification
http://www.reactive-streams.org/ 
既にいくつかの実装がReactive Streams Specificationをサポートしています。
Reactive Streams 1.0.0 is here!
http://www.reactive-streams.org/announce-1.0.0 



Flow API Interfaces

@FunctionalInterface  
public static interface Flow.Publisher<T> { 
    public void    subscribe(Flow.Subscriber<? super T> subscriber); 
}  
 
public static interface Flow.Subscriber<T> { 
    public void    onSubscribe(Flow.Subscription subscription); 
    public void    onNext(T item) ; 
    public void    onError(Throwable throwable) ; 
    public void    onComplete() ; 
}  
 
public static interface Flow.Subscription { 
    public void    request(long n); 
    public void    cancel() ; 
}  
 
public static interface Flow.Processor<T,R>  extends Flow.Subscriber<T>, Flow.Publisher<R> { 
} 
Flow API (とReactive Streams API)は、ある点では、Iteretor(イテレータ)やObserver(オブザーバ)パターンからのアイデアの組み合わせです。Iteretorは、アプリケーションがソースからアイテムを引っ張るというプル・モデルです。Observerは、ソースからの項目をアプリケーションにプッシュするプッシュ・モデルです。Flow APIを使うと、アプリケーションは最初にN個の項目を要求し、その後Publisherは最大でN
個の項目をSubscriberにプッシュします。それゆえ、PullプログラミングモデルとPushプログラミングモデルの組合せなのです。

The Subscriber

SubscriberはコールバックのためにPublisherを購読します。データ項目はリクエストがない限りSubscriberにプッシュされませんが、複数の項目がリクエストされる可能性があります。指定されたSubscriptionに対するSubscriberのメソッド呼び出しは厳密に順序付けされます。アプリケーションは、Subscriberが利用可能な、以下のコールバックに反応することができます。
コールバック(Callback) 説明
onSubscribe 指定されたSubscriptionのための、任意の他のSubscriberメソッドを呼び出す前に呼び出されるメソッド
onNext Subscriptionの次の項目を伴って呼び出されるメソッド
onError PublisherもしくはSubscriptionによってリカバリできないエラーが発生したときに呼び出されるメソッド。その後、Subscriptionは他のSubscriberメソッドを呼び出さない。
PublisherがSubscriberにデータ項目を発行できないというエラーに遭遇した場合、SubscriberはonErrorを受け取り、その後それ以上のメッセージを受信しない。
onComplete エラーによって未終了のSubscriptionのために追加のSubscriberメソッド呼び出しが発生しないことがわかっている場合に呼び出されるメソッド。このあと、別のSubscriberメソッドはSubscriptionから呼び出されることはない。
後続のメッセージがSubscriberに発行されないことがわかっている場合、SubscriberはonCompleteを受け取る。

Subscriberのサンプル
import java.util.concurrent.Flow.*; 
... 
 
public class MySubscriber<T> implements Subscriber<T> { 
  private Subscription subscription; 
 
  @Override 
  public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded 
  } 
 
  @Override 
  public void onNext(T item) { 
    System.out.println("Got : " + item); 
    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded 
  } 
 
  @Override 
  public void onError(Throwable t) { 
    t.printStackTrace(); 
  } 
 
  @Override 
  public void onComplete() { 
    System.out.println("Done"); 
  } 
} 

The Publisher

Publisherはデータ項目のストリームを登録されたSubscriberに発行(publish)します。Publisherは通常はExecutorを使って、データ項目をSubscriberに非同期で発行します。Publisherは各SubscriptionのSubscriberのメソッド呼び出しの厳密な順序付けを保証します。

JDKのSubmissionPublisherを使ってデータ項目のストリームをSubscriberに発行する例
import java.util.concurrent.SubmissionPublisher; 
... 
    //Create Publisher 
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
 
    //Register Subscriber 
    MySubscriber<String> subscriber = new MySubscriber<>(); 
    publisher.subscribe(subscriber); 
 
    //Publish items 
    System.out.println("Publishing Items...");
    String[] items={"1", "x", "2", "x", "3", "x"};
    Arrays.asList(items).stream().forEach(i -> publisher.submit(i));  
    publisher.close();  

The Subscription

Flow.PublisherとFlow.Subscriberをリンクします。Subscriberは要求された場合のみデータ項目を受信します。Subscriptionを介していつでもキャンセルすることができます。
メソッド(Method) 説明
request このsubscriptionに対する現在の満たされていない要求に対して、指定されたn件の項目を追加する
cancel (最終的に)Subscriberにメッセージの受信を停止させる

The Processor

SubscriberとPublisherの両方として機能するコンポーネントです。Processorは、PublisherとSubscriberの間に位置し、ストリームを次々と変換します。一つ以上のProcessorが互いに繋がることができます。チェーンの最後のProcessorの結果がSubscriberによって処理されます。JDKは具体的なProcessorを提供しませんので、必要となる任意のProcessorの実装は個々人に任されています。

StringをIntegerに変換するProcessorのサンプル
import java.util.concurrent.Flow.*; 
import java.util.concurrent.SubmissionPublisher; 
... 
 
public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> { 
 
  private Function function; 
  private Subscription subscription; 
 
  public MyTransformProcessor(Function<? super T, ? extends R> function) { 
    super(); 
    this.function = function; 
  } 
 
  @Override 
  public void onSubscribe(Subscription subscription) { 
    this.subscription = subscription; 
    subscription.request(1); 
  } 
 
  @Override 
  public void onNext(T item) { 
    submit((R) function.apply(item)); 
    subscription.request(1); 
  } 
 
  @Override 
  public void onError(Throwable t) { 
    t.printStackTrace(); 
  } 
 
  @Override 
  public void onComplete() { 
    close(); 
  } 
} 
Processorを使ってデータストリームを変換するコードのサンプル
import java.util.concurrent.SubmissionPublisher; 
... 
 
    //Create Publisher 
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
 
    //Create Processor and Subscriber 
    MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x")); 
 
    MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s)); 
 
    MySubscriber<Integer> subscriber = new MySubscriber<>(); 
 
    //Chain Processor and Subscriber 
    publisher.subscribe(filterProcessor); 
    filterProcessor.subscribe(transformProcessor); 
    transformProcessor.subscribe(subscriber); 
 
    System.out.println("Publishing Items..."); 
    String[] items = {"1", "x", "2", "x", "3", "x"}; 
    Arrays.asList(items).stream().forEach(i -> publisher.submit(i)); 
    publisher.close(); 

Back pressure

Subscriberが消費するペースよりもはるかに速いペースでPublisherがデータ項目を生成している場合に、Back Pressure(背圧)がかかります。
Back-Pressure
http://www.reactivemanifesto.org/glossary#Back-Pressure
未処理のデータ項目がバッファリングされているバッファサイズが制限されることがあります。Flow APIは、Back Pressureを合図したり、対処したりするようなAPIを提供しませんが、Back Pressureを処理する実装する上では様々な戦略が考えられます。RxJavaでのBack Pressureの処理方法を確認してください。

Summary

Reactive Programming APIがJDK 9に加わるのは幸先がよいことです。他の多くの製品は製品機能にアクセスするためのReactive Programming APIも提供しています。Flow APIを使うと、プログラマーはリアクティブなプログラムを書き始めることができますが、エコシステムがもっと進化する必要があります。

例えば、リアクティブ・プログラムであってもやはり最終的には、これまでのAPIを使ってデータベースにアクセスする可能性があります。というのも、全てのデータベースがReactive ProgrammingのAPIをサポートしているわけではないからです。つまり、リアクティブ・プログラムが依存するAPIが、リアクティブ・プログラミング・モデルをまだサポートしていない可能性がある、ということです。

References

About the Author

Rahul Srivastavaは以前ApacheでXerces2-Jプロジェクトのコミッタをつとめていました。現在はOracleのApplication Server開発チームの技術スタッフの主要メンバーです。

0 件のコメント:

コメントを投稿