流式通信

流的实现原理

Triple 协议的流模式

  • 从协议层面上看,Triple 是构建在 HTTP2 的基础之上的,所以它直接拥有了 HTTP2 的所有能力,因此具备了拆分 stream 和全双工的能力。

  • 在框架层面上,为用户提供了 StreamObserver 作为流式接口,对输入输出参数提供流式处理,框架会在发送和接收流式数据时,做出相应的接口调用,从而保证流的生命周期完整性。

启用 Triple 的新特性

流式流

流是 Dubbo3 提供的一种新的调用类型,建议在以下场景中使用流

  • 接口需要发送大量数据,这些数据无法放置在一个 RPC 请求或响应中,需要分批发送,但如果应用层无法解决传统多次 RPC 方式下的顺序和性能问题,如果需要保证顺序,则只能串行发送
  • 在流式场景中,需要按照发送顺序处理数据,并且数据本身没有确定的边界
  • 在推送场景中,在同一个调用的上下文中发送和处理多条消息

流分为以下三种类型

  • SERVER_STREAM(服务器流) SERVER_STREAM
  • CLIENT_STREAM(客户端流) CLIENT_STREAM
  • BIDIRECTIONAL_STREAM(双向流) BIDIRECTIONAL_STREAM

由于 java 语言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的实现是相同的。

在 Dubbo3 中,流式接口被声明和使用为 SteamObserver,用户可以使用和实现该接口来发送和处理流式数据、异常和结束。

对于 Dubbo2 用户来说,他们可能对 StreamObserver 比较陌生,它是 Dubbo3 定义的一种流式类型。Dubbo2 中没有 Stream 类型,因此对迁移场景没有影响。

流语义保证

  • 提供消息边界,可以方便地单独处理消息
  • 严格有序,发送方的顺序与接收方的顺序一致
  • 全双工,无需等待发送
  • 支持取消和超时

非 PB 序列化流

API

public interface IWrapperGreeter {

    StreamObserver<String> sayHelloStream(StreamObserver<String> response);

    void sayHelloServerStream(String request, StreamObserver<String> response);
}

Stream 方法的方法输入参数和返回值是严格约定的。为了防止因编写错误导致问题,Dubbo3 框架侧会对参数进行检查,如果出现错误则抛出异常。对于 BIDIRECTIONAL_STREAM,需要注意的是,参数中的 StreamObserver 是响应流,返回值中的 StreamObserver 是请求流。

实现类

public class WrapGreeterImpl implements WrapGreeter {

    //...

    @Override
    public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
        return new StreamObserver<String>() {
            @Override
            public void onNext(String data) {
                System.out.println(data);
                response.onNext("hello,"+data);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable. printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
                response.onCompleted();
            }
        };
    }

    @Override
    public void sayHelloServerStream(String request, StreamObserver<String> response) {
        for (int i = 0; i < 10; i++) {
            response.onNext("hello," + request);
        }
        response.onCompleted();
    }
}

调用方法

delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
    @Override
    public void onNext(String data) {
        System.out.println(data);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable. printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
});


StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
    @Override
    public void onNext(String data) {
        System.out.println(data);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable. printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }
});
for (int i = 0; i < n; i++) {
    request.onNext("stream request" + i);
}
request.onCompleted();

使用 Protobuf 序列化的流

对于 Protobuf 序列化方式,建议编写 IDL,并使用 compiler 插件进行编译生成。生成的代码大致如下

public interface PbGreeter {

    static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
    static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";

    static final boolean inited = PbGreeterDubbo.init();
    
    //...

    void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);

    org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}

上次修改时间:2023 年 1 月 2 日: 增强英文文档 (#1798) (95a9f4f6c1c)