Save Load
GitHub 切换暗/亮/自动模式 切换暗/亮/自动模式 切换暗/亮/自动模式 返回首页

Java9 特性 响应式流( Reactive Stream)

Java9 特性-响应式流(Reactive Stream)

转自:Java9 特性-响应式流(Reactive Stream)

作者:蜜糖的代码注释

链接:https://www.jianshu.com/p/eddea056e38a

来源:简书

什么是流

形象的比喻来说就是如同水一样绵绵不绝的数据形式。而抽象点来说,是有一个生产者(source)产生,由一个或者多个消费者(sink)消费的数据元素(item)序列。那从这个抽象的描述就可以看出,使用流来承担数据交互的模式就是咱们经常说的生产者/消费者模型,而这种模型也可以称之为发布者/订阅者模型(后文将使用这个名字,因为JDK中使用的是这个名字)。

对于流数据来说,一般有两种的数据流转方式:

  • 拉(pull)数据模式:订阅者向发布者索要数据。
  • 推(push)数据模式:发布者向订阅者推送数据(push)。

这两种模式都是描述的单次信息传递的方式。如果发布者产生信息的速度和订阅者消费信息的速度一致的话,那这两种方法都将是十分有效的数据流转方式。

流有什么问题

流的问题在于当两端的速度不匹配的时候(考虑一下各种mq主要处理的问题削峰平谷)。而速度的不匹配自然存在以下两种情况:

订阅者消费速度快

这种情况的时候会出现订阅者有处理能力了,但是订阅者无信息可以处理的情况。如果这种时候是同步的调用模式,则订阅者将会阻塞,直到有新的信息可以进行处理。而如果这时候是异步的信息处理模式,则订阅者可以在无消息处理的时候挂起,直接切换到其他的任务处理中(对于多核CPU的多线程来说)。也就是说,对于这种情况,比较理想的是异步推模式

发布者发布速度快

当发布者发布速度快的时候,会发生订阅者来不及处理数据的情况。如果是同步的情况下发布者会一直阻塞,而如果是异步模式则对于订阅者来说有两种处理方式(可以类比一下线程池设计)可以处理:

  • 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
  • 不损失数据:加入队列缓存数据(订阅者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)

而还有另一种需要发布者加入的处理方式叫做背压(backpressure)。背压的实现方式是:由订阅者发出信号,让发布者降低信息的发布速度,从而让信息速度之间匹配。背压的优点是同样可以处理信息流速不一致问题。而更有意思的是,这时候信息的处理策略可以由发布者来选择:

  • 损失数据:丢弃数据(在有限的队列缓存已经满了的情况下)
  • 不损失数据:加入队列缓存数据(订阅者需要有拥有无限的缓冲队列暂存数据,以确保不会溢出)

没错,这两种情况是和订阅者一致的,不过选择权则由订阅者变成了发布者。

也就是说,在发布者发布速度快的时候,要么发布者直接同步阻塞,要么可以先根据消息的主要关心方(是发布者还是订阅者)来确定是否使用背压,然后再根据数据的类型判断是否接受数据丢弃(不丢弃可能会导致系统崩溃)。往往我们的发布者可以由上层的mq或者程序的应答机制保护消息的可用性。

那么结论是什么,我们需要异步非阻塞(订阅者消费快)、以及背压(发布者发布快)。

什么是响应式流

Reactive Streams 是一项非阻塞背压的异步流处理标准的倡议,当然,如果我这个翻译看的不清楚的话就还是看原文吧(http://www.reactive-streams.org/)。

Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure.

响应式流(Reactive Streams)概念被提出是在2013年,旨在处理上一小节中由于流速问题而产生的几种问题:订阅者订阅者的阻塞、由订阅者(数据下游)来选择是依赖无限队列(数据不丢)或直接丢弃数据。

而对于一项标准而言,它的目是自然是用更少的协议来描述交互。而响应式流的模型也是十分简单:

  • 订阅者异步的向发布者请求N个元素。
  • 发布者一步的向订阅者发送M(0<M<=N)个元素。

基于这个模型,响应式流可以做到pull模型和push模型流处理机制之间动态切换。当订阅者较慢时,发送者没有新数量的请求则发布者进入等待请求信息的状态,相当于pull模型;当订阅者更快时,相当于发布者没有新的信息,订阅者进入到等待消息发送的状态相当于push模型。

Java中的响应式流

对于响应式流,在2015年的时候确定了关于其Java API,具体的详情也也可以参考上面的链接。其中定义了4个API,具体为:

  • Publisher
  • Subscriber
  • Subscription
  • Processor<T,R>

对他们的定义为:

Publisher(发布者)

是一个假定上游会产生无限数据的信息发布者。他们会向有发送请求的订阅者推送元素

Subscriber(订阅者)

订阅者会从发布者那里领取令牌,然后根据令牌向发布者发送“获取请求”。同时当发布者部分准备好元素的时候,会通过令牌对订阅者进行调用,进行数据消费。

Subscription(令牌)

发布者和订阅者通过令牌来进行信息通信的约定。主要有:开始订阅、信息获取、信息推送、异常、结束、取消订阅。

Processor(处理器)

可以通过处理器连接发布者、订阅者以及其他处理器。Processor本身同时继承了Publisher与Subscriber接口,所以可以对元素进行处理转发。主要用于让数据从T转换为R。同时,由于Processor本身也可以接入Processor,所以Processor可以组成链来对数据进行处理。

一次完整的调用流程大概可以描述为:

  1. 订阅者向发布者发送订阅请求。
  2. 发布者根据订阅请求生成令牌发送给订阅者。
  3. 订阅者根据令牌向发布者发送请求N个数据。
  4. 发送者根据订阅者的请求数量返回M(M<=N)个数据
  5. 重复3,4
  6. 数据发送完毕后由发布者发送给订阅者结束信号

而Java API中的接口如下所示,其中所有的方法都是void,因为所有的方法都是异步执行的。

public interface Publisher<T> {
    //用于1.中订阅请求
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    //用于2.中回调发送令牌
    public void onSubscribe(Subscription s);
    //用于3.用于接受4中发送过来的数据
    public void onNext(T t);
    //用于3,4,5接收中间异常了之后的调用
    public void onError(Throwable t);
    //用于6.中结束信号的回调
    public void onComplete();
}
public interface Subscription {
    //用于3.的发送请求N个数据
    public void request(long n);
    //用于3,4,5订阅者异步的向
    public void cancel();
}
public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

JDK中的响应式流

Java API中的流程使用方式看起来比较简单,但API背后的具体实现由于是全异步交互以及涉及具体背压处理而很困难。而JDK9中为用户提供了 Publisher 接口的简单实现,让开发人员可以基于此来扩展出自己的实际需求。

JDK 9中的响应式流功能提供在 java.util.concurrent 包下,全响应式流的 API 接口被封装到 Flow 接口中,其中包括需要使用的接口以及静态方法,关于上一小节中接口方法的详细描述也可以参见该接口上的方法描述。其中的静态接口为:

Flow.Processor<T,R>
Flow.Publisher<T>
Flow.Subscriber<T>
Flow.Subscription

除去上一小节说的4个接口外,Flow中还包含了一个默认方法defaultBufferSize(),用于返回默认的令牌中的缓冲区大小,而默认的值为: DEFAULT_BUFFER_SIZE = 256

除去 Flow 外,其中还有一个刚刚说到的 Publisher 的简单实现类SubmissionPublisher。该接口在实现了 publisher 之外还实现了 AutoCloseable 接口,所以可以直接用 try 块来进行资源的管理。

尽管JDK 9中没有提供 Subscriber 的简单实现,但是在 SubmissionPublisher 中提供了一个 consume(Consumer<? super T> consumer) 方法,用于让开发人员可以直接消费消息发布者的所有元素。实际上是在内部实现了简单的 Subscriber 为 ConsumerSubscriber,但是并不是 public 的,所以不能直接使用

简单的例子

根据JDK 9中提供的SubmissionPublisher<T>咱们来写一个小例子。

    public static void main(String[] args) {
        // 用于承接返回值的任务
        CompletableFuture<Void> task;
        // try-with-resource来控制资源
        try (SubmissionPublisher<Integer> publisher = new SubmissionPublisher<>()) {
            System.out.println("默认缓冲容量: " + publisher.getMaxBufferCapacity());
            // 传入打印方法来处理元素
            task = publisher.consume(System.out::println);
            // 打印数字,调用发布者进行信息处理
            IntStream.range(1, 6)
                    .forEach(publisher::submit);
        }
        if (task != null) {
            try {
                // 当所有订阅者处理完毕后调用
                task.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

在这个例子里面进行了以下几件事。

  1. 声明一个CompletableFuture用于捕获后续的处理事件。
  2. 开启资源用于进行流消息订阅
  3. 设置流的订阅方法(订阅者)
  4. 进行发布者的信息发送
  5. 阻塞主方法等待处理完毕后结束

其中 pub.getMaxBufferCapacity() 会打印默认的缓存空间256。在调用 publisher.consume 的时候,是奖传入的 Consumer 在内部封装成一个 Subscribr 的简单实现类,用于订阅信息的发送,实时上后续数据的订阅者就是在这步创建的。

当 publisher 进行调用的时候,调用 submit 发送数据,publisher 有两个方法用于发送数据,一个是 submit,一个是 offer。两个方法下面实际都是调用的 doOffer 方法,所以,offer 方法提供了置顶延迟时间后丢弃的策略,而 submit 是 offer 的简单实现,是一致阻塞不丢弃。

最后

不得不说响应式流是 java 中响应式编程的基础,而 JDK 9 中也提供了 Reactive Streams 的“简单”实现。之所示简单是打引号的是因为实际上还有点绕的,有兴趣的同学可以追一下 SubmissionPublisher 的实现,有一些思想的经典实现,比如用整数中的 7 位来作为状态机。在下一篇中我们再聊一下 JDK 9 中的数据交互顺序。