距离上一次更新该文章已经过了 652 天,文章所描述的內容可能已经发生变化,请留意。
先来看看看看springboot整合dubbo的常规用法
下面是三个服务的调用链路

项目结构

服务提供者(例如pay-service)需要提供接口服务(@Service)

服务消费者(例如user-service)需要指定服务接口(@Reference),接口多实现可能需要配合version属性

这样就完成了基本的上游对下游服务的调用
探究下底层实现
知道dubbo是基于netty实现的,
所以它实现这种rpc调用的特点较其他(如feign-http)更适用高并发以及短链接的项目,接下来用netty的代码简单复现下基本的dubbo
应用
plaintext
1 | 这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等 |
配合dubbo的服务注册发现链路图

简单捋顺下思路,看看我们需要哪几样东西
- 一个共享容器,用来存储服务地址(ip,port等),当服务存在多个,还需要负载均衡,以及服务信息修改后的通知功能
- 为了确定唯一的服务接口信息,需要包含接口名(像dubbo一样,多实现可能需要再加一个version),方法名,方法参数类型列表,方法值列表
- netty,保证服务和调用方之间的通信(rpc调用)
这样我们就能实现一个简单dubbo了。好,开整
首先是pom依赖
xml1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22<dependencies>
<!-- 使用redis作为注册中心(共享容器)(当然zk等等可以作为数据统一的工具也可以)-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.5.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>
</dependencies>确定接口唯一信息体Invocation
java1
2
3
4
5
6
7
8
9
10
11
12
public class Invocation implements Serializable {
// 接口名称
String interfaceName;
// 方法名
String methodName;
// 参数类型列表
Class[] paramTypes;
// 参数值列表
Object[] params;
}服务信息,ip和port
java1
2
3
4
5
6
public class Url implements Serializable {
String hostName;
Integer port;
}注册服务的容器,在nettyserver启动时注入服务接口(这里不是存储ip port的共享容器,这里是netty-server用来注册服务接口的容器)
java1
2
3
4
5
6
7
8
9
10
11
12public class LocalRegister {
private static Map<String, Class> map = new ConcurrentHashMap<>();
public static void register(String interfaceName,Class implClass){
map.put(interfaceName,implClass);
}
public static Class get(String interfaceName){
return map.get(interfaceName);
}
}用来存储服务的ip和port,如果是多个(集群部署)还需要支持负载均衡
java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class RedisRegister {
static Jedis jedis;
static {
jedis = new Jedis("127.0.0.1", 6379);
}
// 注册服务
public void regist(String interfaceName, Url url){
try {
String s = jedis.get(interfaceName);
List<Url> urls = null;
if (s == null){
urls = new ArrayList<>();
}else {
JSONArray objects = JSONUtil.parseArray(s);
urls = objects.toList(Url.class);
}
urls.add(url);
jedis.set(interfaceName,JSONUtil.toJsonStr(urls));
} catch (Exception e) {
e.printStackTrace();
}
}
public Url get(String interfaceName) throws Exception{
String s = jedis.get(interfaceName);
JSONArray objects = JSONUtil.parseArray(s);
List<Url> urls = objects.toList(Url.class);
return LoadBalance.random(urls);
}
}负载均衡机制(dubbo支持4种,这里先简单使用dubbo默认的random)
java1
2
3
4
5
6
7
8
9
10
11/**
* 多个Url采用负载均衡(dubbo默认的random)
*/
public class LoadBalance {
public Url random(List<Url> urls){
Random random = new Random();
return urls.get(random.nextInt(urls.size()));
}
}需要一个netty的server端来接收请求数据
java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public class NettyServer {
public static void start(String hostName, Integer port) {
// netty传统的一个老板,多个打工人模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
final ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// 该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
//当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// netty内置了许多编码器和解码器供我们粘包和解包,这里因为我们需要传递Invocation对象,所以使用ObjectDecoder和ObjectEncoder
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
// 添加自定义处理器
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(hostName, port).sync();
System.out.println("服务提供方开始提供服务");
channelFuture.channel().closeFuture().sync();
}catch (Exception ignored){}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}server端自定义处理器
java1
2
3
4
5
6
7
8
9
10
11
12
13public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到请求,处理请求
Invocation invocation = (Invocation) msg;
Class aClass = LocalRegister.get(invocation.getInterfaceName());
// 利用反射执行方法得到res
Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
Object res = method.invoke(aClass.newInstance(), invocation.getParams());
// 写回netty,让client端监听到
ctx.writeAndFlush("Netty: "+res);
}
}定义接口和实现类(按照dubbo官方的服务最佳实践,接口应该与业务部分分离开来)
java1
2
3
4
5
6
7
8
9public interface HelloService {
String sayHello();
}
public class HelloServiceImpl implements HelloService {
public String sayHello() {
return "Hello";
}
}定义一个netty-client端来模拟发送数据
java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class NettyClient {
public static NettyClientHandler clientHandler = null;
public static void initClient(String hostName,Integer port){
clientHandler = new NettyClientHandler();
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(clientHandler);
}
});
bootstrap.connect(hostName,port).sync();
} catch (InterruptedException ignored) {
}
}
}netty-client端自定义处理器
java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* 因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 wait notify方法
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private Invocation invocation;
private String result;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//在run方法中会用得到
this.context = ctx;
}
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
public void setInvocation(Invocation invocation) {
this.invocation = invocation;
}
public synchronized Object call() throws Exception {
context.writeAndFlush(invocation);
wait();
return result;
}
}好,基本的逻辑已经完成,接下就是需要创建一个helloservice的代理对象来调用sayhello方法
创建一个获取代理对象的factory
java1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class ProxyFactory {
final static ExecutorService executorService = newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* 编写方法,使用代理模式,获取一个代理对象
*/
public static <T> T getProxy(final Class<T> interfaceClass) {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
if(clientHandler == null){
Url url = RedisRegister.get(interfaceClass.getName());
initClient(url.getHostName(),url.getPort());
}
clientHandler.setInvocation(invocation);
return executorService.submit(clientHandler).get();
}
});
}
}启动提供者
java1
2
3
4
5
6
7
8
9
10
11
12
13public class Provider {
public static void main(String[] args) throws IOException {
// 注册服务接口
LocalRegister.register(HelloService.class.getName(), HelloServiceImpl.class);
Url url = new Url(InetAddress.getLocalHost().getHostAddress(), 8080);
// 注册服务器信息,端口,ip
RedisRegister.regist(HelloService.class.getName(),url);
// 启动
NettyServer.start(url.getHostName(),url.getPort());
}
}
// 服务提供方开始提供服务启用消费者调用sayhello方法
java1
2
3
4
5
6
7
8
9
10public class Consumer {
public static void main(String[] args) {
// 获取helloservice的代理对象
HelloService service = ProxyFactory.getProxy(HelloService.class);
// 调用sayhello方法
System.out.println(service.sayHello());
}
}
// Netty: Hello到这里,一个简单基于netty的
dubbo
就完成了
代码仓库地址:
参考文章
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 小五的个人杂货铺!