Spring Boot 整合 Dubbo 的常规用法

下面是三个服务的调用链路:

项目结构:

服务提供者(例如 pay-service)需要提供接口服务(@Service):

服务消费者(例如 user-service)需要指定服务接口(@Reference),接口多实现可能需要配合 version 属性:

这样就完成了基本的上游对下游服务的调用。

探究底层实现

Dubbo 不仅仅只有 Netty 的实现,还支持 RMI 协议、HTTP 协议等等。所以它实现这种 RPC 调用的特点较其他(如 feign-http)更适用高并发以及短链接的项目。接下来用 Netty 的代码简单复现下基本的 Dubbo 应用。

配合 Dubbo 的服务注册发现链路图:

简单捋顺下思路,看看我们需要哪几样东西:

  1. 一个共享容器,用来存储服务地址(IP、端口等),当服务存在多个时,还需要负载均衡,以及服务信息修改后的通知功能
  2. 为了确定唯一的服务接口信息,需要包含接口名(像 Dubbo 一样,多实现可能需要再加一个 version)、方法名、方法参数类型列表、方法值列表
  3. Netty,保证服务和调用方之间的通信(RPC 调用)

这样我们就能实现一个简单 Dubbo 了。好,开整!

项目依赖

首先是 pom 依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
<!-- 使用redis作为注册中心(共享容器)(当然zk等等可以作为数据统一的工具也可以)-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>
</dependencies>

核心类定义

确定接口唯一信息体 Invocation

1
2
3
4
5
6
7
8
9
10
11
12
@AllArgsConstructor
@Data
public class Invocation implements Serializable {
// 接口名称
String interfaceName;
// 方法名
String methodName;
// 参数类型列表
Class[] paramTypes;
// 参数值列表
Object[] params;
}

服务信息,IP 和 Port:

1
2
3
4
5
6
@Data
@AllArgsConstructor
public class Url implements Serializable {
String hostName;
Integer port;
}

注册服务的容器,在 NettyServer 启动时注入服务接口(这里不是存储 IP、端口的共享容器,这里是 Netty-Server 用来注册服务接口的容器):

1
2
3
4
5
6
7
8
9
10
11
12
public class LocalRegister {

private static Map<String, Class> map = new ConcurrentHashMap<>();

public static void register(String interfaceName, Class implClass){
map.put(interfaceName, implClass);
}

public static Class get(String interfaceName){
return map.get(interfaceName);
}
}

用来存储服务的 IP 和端口,如果是多个(集群部署)还需要支持负载均衡:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@UtilityClass
public class RedisRegister {

static Jedis jedis;

static {
jedis = new Jedis("127.0.0.1", 6379);
}

// 注册服务
public void regist(String interfaceName, Url url){
try {
String s = jedis.get(interfaceName);
List<Url> urls = null;
if (s == null){
urls = new ArrayList<>();
} else {
JSONArray objects = JSONUtil.parseArray(s);
urls = objects.toList(Url.class);
}
urls.add(url);
jedis.set(interfaceName, JSONUtil.toJsonStr(urls));
} catch (Exception e) {
e.printStackTrace();
}
}

public Url get(String interfaceName) throws Exception{
String s = jedis.get(interfaceName);
JSONArray objects = JSONUtil.parseArray(s);
List<Url> urls = objects.toList(Url.class);
return LoadBalance.random(urls);
}
}

负载均衡机制(Dubbo 支持 4 种,这里先简单使用 Dubbo 默认的 random):

1
2
3
4
5
6
7
8
9
10
11
/**
* 多个Url采用负载均衡(dubbo默认的random)
*/
@UtilityClass
public class LoadBalance {

public Url random(List<Url> urls){
Random random = new Random();
return urls.get(random.nextInt(urls.size()));
}
}

Netty Server 端

需要一个 Netty 的 Server 端来接收请求数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyServer {
public static void start(String hostName, Integer port) {
// netty传统的一个老板,多个打工人模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
// 当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// netty内置了许多编码器和解码器供我们粘包和解包,这里因为我们需要传递Invocation对象,所以使用ObjectDecoder和ObjectEncoder
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
// 添加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(hostName, port).sync();
System.out.println("服务提供方开始提供服务");
channelFuture.channel().closeFuture().sync();
} catch (Exception ignored){
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

Server 端自定义处理器:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到请求,处理请求
Invocation invocation = (Invocation) msg;
Class aClass = LocalRegister.get(invocation.getInterfaceName());
// 利用反射执行方法得到res
Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
Object res = method.invoke(aClass.newInstance(), invocation.getParams());
// 写回netty,让client端监听到
ctx.writeAndFlush("Netty: " + res);
}
}

定义接口和实现类(按照 Dubbo 官方的服务最佳实践,接口应该与业务部分分离开来):

1
2
3
4
5
6
7
8
9
10
public interface HelloService {
String sayHello();
}

public class HelloServiceImpl implements HelloService {
@Override
public String sayHello() {
return "Hello";
}
}

Netty Client 端

定义一个 Netty-Client 端来模拟发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyClient {

public static NettyClientHandler clientHandler = null;

public static void initClient(String hostName, Integer port){
clientHandler = new NettyClientHandler();
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(clientHandler);
}
});
bootstrap.connect(hostName, port).sync();
} catch (InterruptedException ignored) {
}
}
}

Netty-Client 端自定义处理器(因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 waitnotify 方法):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

private ChannelHandlerContext context;
private Invocation invocation;
private String result;

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 在run方法中会用得到
this.context = ctx;
}

@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}

public void setInvocation(Invocation invocation) {
this.invocation = invocation;
}

@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(invocation);
wait();
return result;
}
}

代理工厂

创建一个获取代理对象的 Factory:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ProxyFactory {

final static ExecutorService executorService = newFixedThreadPool(Runtime.getRuntime().availableProcessors());

/**
* 编写方法,使用代理模式,获取一个代理对象
*/
public static <T> T getProxy(final Class<T> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
if (clientHandler == null){
Url url = RedisRegister.get(interfaceClass.getName());
initClient(url.getHostName(), url.getPort());
}
clientHandler.setInvocation(invocation);
return executorService.submit(clientHandler).get();
}
});
}
}

运行测试

启动提供者:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class Provider {

public static void main(String[] args) throws IOException {
// 注册服务接口
LocalRegister.register(HelloService.class.getName(), HelloServiceImpl.class);
Url url = new Url(InetAddress.getLocalHost().getHostAddress(), 8080);
// 注册服务器信息,端口,ip
RedisRegister.regist(HelloService.class.getName(), url);
// 启动
NettyServer.start(url.getHostName(), url.getPort());
}
}
// 服务提供方开始提供服务

启用消费者调用 sayHello 方法:

1
2
3
4
5
6
7
8
9
10
public class Consumer {

public static void main(String[] args) {
// 获取helloservice的代理对象
HelloService service = ProxyFactory.getProxy(HelloService.class);
// 调用sayhello方法
System.out.println(service.sayHello());
}
}
// Netty: Hello

到这里,一个简单基于 Netty 的 Dubbo 就完成了。

代码仓库地址

参考文章