流式通信模式

对于特定用例,请参考:[dubbo-samples-triple/pojo](https://github.com/apache/dubbo-samples/tree/master/3-extensions/protocol/dubbo-samples-triple/src/main/java /org/apache/dubbo/sample/tri/pojo);

开启 Triple 的新特性 - Stream (流)

Stream 是 Dubbo3 提供的一种新的调用类型,建议在以下场景中使用 stream

  • 接口需要发送大量数据。这些数据无法放在一个 RPC 请求或响应中,需要分批发送。但是,如果应用层无法解决传统多个 RPC 方法的顺序和性能问题,如果需要保证顺序,则只能串行发送
  • 在流式场景中,数据需要按发送顺序进行处理,数据本身没有明确的边界
  • 在推送场景中,多个消息在同一个调用的上下文中发送和处理

Stream 分为以下三种类型

  • SERVER_STREAM (服务器流) SERVER_STREAM
  • CLIENT_STREAM (客户端流) CLIENT_STREAM
  • BIDIRECTIONAL_STREAM (双向流) BIDIRECTIONAL_STREAM

由于 java 语言的限制,BIDIRECTIONAL_STREAM 和 CLIENT_STREAM 的实现是相同的。

在 Dubbo3 中,流接口声明和使用为 SteamObserver,用户可以使用和实现此接口来发送和处理流数据、异常和结束。

对于 Dubbo2 用户来说,他们可能不熟悉 StreamObserver,它是 Dubbo3 定义的一种流类型。Dubbo2 中没有 Stream 类型,因此它对迁移场景没有影响。

Stream 语义保证

  • 提供消息边界,可以轻松地分别处理消息
  • 严格有序,发送者的顺序与接收者的顺序一致
  • 全双工,无需等待发送
  • 支持取消和超时

非 PB 序列化流

  1. api
public interface IWrapperGreeter {

     StreamObserver<String> sayHelloStream(StreamObserver<String> response);

     void sayHelloServerStream(String request, StreamObserver<String> response);
}

Stream 方法的输入参数和返回值严格约定。为了防止因编写错误导致的问题,Dubbo3 框架侧会对参数进行检查,如果出现错误则抛出异常。对于 BIDIRECTIONAL_STREAM,需要注意的是,参数中的 StreamObserver 是响应流,返回值参数中的 StreamObserver 是请求流。

  1. 实现类
public class WrapGreeterImpl implements WrapGreeter {

     //...

     @Override
     public StreamObserver<String> sayHelloStream(StreamObserver<String> response) {
         return new StreamObserver<String>() {
             @Override
             public void onNext(String data) {
                 System.out.println(data);
                 response.onNext("hello,"+data);
             }

             @Override
             public void onError(Throwable throwable) {
                 throwable. printStackTrace();
             }

             @Override
             public void onCompleted() {
                 System.out.println("onCompleted");
                 response.onCompleted();
             }
         };
     }

     @Override
     public void sayHelloServerStream(String request, StreamObserver<String> response) {
         for (int i = 0; i < 10; i++) {
             response.onNext("hello," + request);
         }
         response.onCompleted();
     }
}
  1. 调用方法
delegate.sayHelloServerStream("server stream", new StreamObserver<String>() {
     @Override
     public void onNext(String data) {
         System.out.println(data);
     }

     @Override
     public void onError(Throwable throwable) {
         throwable. printStackTrace();
     }

     @Override
     public void onCompleted() {
         System.out.println("onCompleted");
     }
});


StreamObserver<String> request = delegate.sayHelloStream(new StreamObserver<String>() {
     @Override
     public void onNext(String data) {
         System.out.println(data);
     }

     @Override
     public void onError(Throwable throwable) {
         throwable. printStackTrace();
     }

     @Override
     public void onCompleted() {
         System.out.println("onCompleted");
     }
});
for (int i = 0; i < n; i++) {
     request.onNext("stream request" + i);
}
request.onCompleted();

使用 Protobuf 序列化的流

对于 Protobuf 序列化方式,建议编写 IDL 并使用 compiler 插件进行编译生成。生成的代码大致如下

public interface PbGreeter {

     static final String JAVA_SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";
     static final String SERVICE_NAME = "org.apache.dubbo.sample.tri.PbGreeter";

     static final boolean inited = PbGreeterDubbo.init();
    
     //...

     void greetServerStream(org.apache.dubbo.sample.tri.GreeterRequest request, org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);

     org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterRequest> greetStream(org.apache.dubbo.common.stream.StreamObserver<org.apache.dubbo.sample.tri.GreeterReply> responseObserver);
}

完整用例

  1. 编写 Java 接口

    import org.apache.dubbo.common.stream.StreamObserver;
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    
    public interface IGreeter {
        /**
         * <pre>
         * Sends greeting by stream
         * </pre>
         */
        StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    
    }
    
  2. 编写实现类

    public class IStreamGreeterImpl implements IStreamGreeter {
    
        @Override
        public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
    
            return new StreamObserver<HelloRequest>() {
                private List<HelloReply> replyList = new ArrayList<>();
    
                @Override
                public void onNext(HelloRequest helloRequest) {
                    System.out.println("onNext receive request name:" + helloRequest.getName());
                   replyList.add(HelloReply.newBuilder()
                       .setMessage("receive name:" + helloRequest.getName())
                       .build());
               }
    
               @Override
               public void onError(Throwable cause) {
                   System.out.println("onError");
                   replyObserver.onError(cause);
               }
    
               @Override
               public void onCompleted() {
                   System.out.println("onComplete receive request size:" + replyList.size());
                   for (HelloReply reply : replyList) {
                       replyObserver.onNext(reply);
                   }
                   replyObserver.onCompleted();
               }
           };
       }
    }
    
  3. 创建 Provider

    public class StreamProvider {
        public static void main(String[] args) throws InterruptedException {
            ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
            service.setInterface(IStreamGreeter.class);
            service.setRef(new IStreamGreeterImpl());
            service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
            service.setApplication(new ApplicationConfig("stream-provider"));
            service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
            service. export();
            System.out.println("dubbo service started");
            new CountDownLatch(1). await();
        }
    }
    
  4. 创建 Consumer

    public class StreamConsumer {
        public static void main(String[] args) throws InterruptedException, IOException {
            ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
            ref. setInterface(IStreamGreeter. class);
            ref. setCheck(false);
            ref.setProtocol(CommonConstants.TRIPLE);
            ref. setLazy(true);
            ref. setTimeout(100000);
            ref. setApplication(new ApplicationConfig("stream-consumer"));
            ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
            final IStreamGreeter iStreamGreeter = ref. get();
    
            System.out.println("dubbo ref started");
            try {
    
                StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
                    @Override
                    public void onNext(HelloReply reply) {
                        System.out.println("onNext");
                        System.out.println(reply.getMessage());
                    }
    
                    @Override
                    public void onError(Throwable throwable) {
                        System.out.println("onError:" + throwable.getMessage());
                    }
    
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                });
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("tony")
                    .build());
    
                streamObserver.onNext(HelloRequest.newBuilder()
                    .setName("nick")
                    .build());
    
                streamObserver.onCompleted();
            } catch (Throwable t) {
                t. printStackTrace();
            }
            System.in.read();
        }
    }
    
  5. 运行 Provider 和 Consumer,可以看到请求正常返回

    onNext
    receive name:tony
    onNext
    receive name:nick
    onCompleted

常见问题

  1. protobuf 类未找到

由于 Triple 协议底层需要依赖 protobuf 协议进行传输,即使定义的服务接口没有使用 protobuf,也需要将 protobuf 依赖引入环境中。

         <dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.19.4</version>
</dependency>

上次修改时间:2023 年 1 月 2 日:Enhance en docs (#1798) (95a9f4f6c1c)