响应式编程
功能描述
此功能基于 Triple 协议和 Project Reactor 实现,在 3.1.0
以上版本支持。用户只需要编写 IDL 文件并指定 protobuf 插件的对应 Generator 即可生成和使用支持响应式 API 的 Stub 代码。
共有四种调用模式,分别是 OneToOne、OneToMany、ManyToOne 和 ManyToMany,分别对应 Unary 调用、服务器流、客户端流和双向流。在 Reactor 的实现中,One 对应 Mono,Many 对应 Flux。
背景
Reactive Stream 提供了一套标准的异步流处理 API。在允许应用程序编写事件驱动的程序的同时,它还通过 BackPressure 确保节点稳定性。Triple 协议在通信协议层面为 Dubbo 框架增加了对流场景的支持。在此基础上,可以实现上层包括大文件传输和推送机制的业务需求。
Dubbo + Reactive Stream Stub 的组合模式可以为用户带来最便捷的流式方式,提升整个链路的异步性能。
参考用例
[https://github.com/apache/dubbo-samples/tree/master/dubbo-samples-triple-reactor](https://github.com/apache/dubbo-samples/tree/master/3-extensions/ protocol/dubbo-samples-triple-reactor)
使用场景
系统需要处理大量并发请求,而不会使任何服务器过载。拥有大量用户提供实时数据的系统,并希望确保系统能够处理负载而不会崩溃或变慢。
如何使用
有关 Triple 的使用和配置,请参考 在 IDL 中使用 Triple,并确保 Dubbo 版本> = 3.1.0。
添加必要的依赖项
要使用 Reactor Triple,您需要添加以下额外的依赖项。
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
设置 protobuf Maven 插件
只需将 mainClass 更改为 org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator
并确保 ${compiler.version}
>= 3.1.0
<build>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>dubbo</id>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-compiler</artifactId>
<version>${compiler.version}</version>
<mainClass>org.apache.dubbo.gen.tri.reactive.ReactorDubbo3TripleGenerator</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
编写并编译 IDL 文件
IDL 文件的编写与原生 Triple 协议完全一致,编译后默认会在 target/generated-sources/protobuf/java
目录下看到相应的代码。
syntax = "proto3";
option java_multiple_files = true;
package org.apache.dubbo.samples.triple.reactor;
// The request message containing the user's name.
message GreeterRequest {
string name = 1;
}
// The response message containing the greetings
message GreeterReply {
string message = 1;
}
service GreeterService {
rpc greetOneToOne(GreeterRequest) returns (GreeterReply);
rpc greetOneToMany(GreeterRequest) returns (stream GreeterReply);
rpc greetManyToOne(stream GreeterRequest) returns (GreeterReply);
rpc greetManyToMany(stream GreeterRequest) returns (stream GreeterReply);
}
使用
- 添加服务器接口实现
package org.apache.dubbo.samples.triple.reactor.impl;
import org.apache.dubbo.samples.triple.reactor.DubboGreeterServiceTriple;
import org.apache.dubbo.samples.triple.reactor.GreeterReply;
import org.apache.dubbo.samples.triple.reactor.GreeterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class GreeterServiceImpl extends DubboGreeterServiceTriple.GreeterServiceImplBase {
private static final Logger LOGGER = LoggerFactory. getLogger(GreeterServiceImpl. class);
@Override
public Flux<GreeterReply> greetManyToMany(Flux<GreeterRequest> request) {
return request.doOnNext(req -> LOGGER.info("greetManyToMany get data: {}", req))
.map(req -> GreeterReply. newBuilder(). setMessage(req. getName() + " -> server get"). build())
.doOnNext(res -> LOGGER.info("greetManyToMany response data: {}", res));
}
}
- 添加服务器接口启动类
package org.apache.dubbo.samples.triple.reactor;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.apache.dubbo.samples.triple.reactor.impl.GreeterServiceImpl;
public class ReactorServer {
private static final int PORT = 50052;
public static void main(String[] args) {
ServiceConfig<GreeterService> reactorService = new ServiceConfig<>();
reactorService.setInterface(GreeterService.class);
reactorService.setRef(new GreeterServiceImpl());
DubboBootstrap bootstrap = DubboBootstrap. getInstance();
bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.protocol(new ProtocolConfig(CommonConstants.TRIPLE, PORT))
.service(reactorService)
.start();
}
}
- 添加客户端启动类和消费者程序
package org.apache.dubbo.samples.triple.reactor;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.bootstrap.DubboBootstrap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.IOException;
public class ReactorConsumer {
private static final Logger LOGGER = LoggerFactory. getLogger(ReactorConsumer. class);
private final GreeterService greeterService;
public ReactorConsumer() {
ReferenceConfig<GreeterService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface(GreeterService.class);
referenceConfig.setProtocol(CommonConstants.TRIPLE);
referenceConfig.setProxy(CommonConstants.NATIVE_STUB);
referenceConfig.setTimeout(10000);
DubboBootstrap bootstrap = DubboBootstrap. getInstance();
bootstrap. application(new ApplicationConfig("tri-reactor-stub-server"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.reference(referenceConfig)
.start();
GreeterService greeterService = referenceConfig. get();
}
public static void main(String[] args) throws IOException {
ReactorConsumer reactorConsumer = new ReactorConsumer();
reactorConsumer.consumeManyToMany();
System.in.read();
}
private void consumeManyToMany() {
greeterService. greetManyToMany(Flux. range(1, 10)
.map(num->
GreeterRequest.newBuilder().setName(String.valueOf(num)).build())
.doOnNext(req -> LOGGER.info("consumeManyToMany request data: {}", req)))
.subscribe(res -> LOGGER.info("consumeManyToMany get response: {}", res));
}
}
启动服务器
启动消费者