先来看看看看springboot整合dubbo的常规用法

下面是三个服务的调用链路

项目结构

服务提供者(例如pay-service)需要提供接口服务(@Service)

服务消费者(例如user-service)需要指定服务接口(@Reference),接口多实现可能需要配合version属性

这样就完成了基本的上游对下游服务的调用

探究下底层实现

知道dubbo是基于netty实现的,
所以它实现这种rpc调用的特点较其他(如feign-http)更适用高并发以及短链接的项目,接下来用netty的代码简单复现下基本的dubbo应用

1
这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等

配合dubbo的服务注册发现链路图

简单捋顺下思路,看看我们需要哪几样东西

  1. 一个共享容器,用来存储服务地址(ip,port等),当服务存在多个,还需要负载均衡,以及服务信息修改后的通知功能
  2. 为了确定唯一的服务接口信息,需要包含接口名(像dubbo一样,多实现可能需要再加一个version),方法名,方法参数类型列表,方法值列表
  3. netty,保证服务和调用方之间的通信(rpc调用)

    这样我们就能实现一个简单dubbo了。好,开整

  • 首先是pom依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    <dependencies>
    <!-- 使用redis作为注册中心(共享容器)(当然zk等等可以作为数据统一的工具也可以)-->
    <dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.5.2</version>
    </dependency>
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.5.8</version>
    </dependency>
    </dependencies>
  • 确定接口唯一信息体Invocation

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @AllArgsConstructor
    @Data
    public class Invocation implements Serializable {
    // 接口名称
    String interfaceName;
    // 方法名
    String methodName;
    // 参数类型列表
    Class[] paramTypes;
    // 参数值列表
    Object[] params;
    }
  • 服务信息,ip和port

    1
    2
    3
    4
    5
    6
    @Data
    @AllArgsConstructor
    public class Url implements Serializable {
    String hostName;
    Integer port;
    }
  • 注册服务的容器,在nettyserver启动时注入服务接口(这里不是存储ip port的共享容器,这里是netty-server用来注册服务接口的容器)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class LocalRegister {

    private static Map<String, Class> map = new ConcurrentHashMap<>();

    public static void register(String interfaceName,Class implClass){
    map.put(interfaceName,implClass);
    }

    public static Class get(String interfaceName){
    return map.get(interfaceName);
    }
    }
  • 用来存储服务的ip和port,如果是多个(集群部署)还需要支持负载均衡

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @UtilityClass
    public class RedisRegister {

    static Jedis jedis;

    static {
    jedis = new Jedis("127.0.0.1", 6379);
    }
    // 注册服务
    public void regist(String interfaceName, Url url){
    try {
    String s = jedis.get(interfaceName);
    List<Url> urls = null;
    if (s == null){
    urls = new ArrayList<>();
    }else {
    JSONArray objects = JSONUtil.parseArray(s);
    urls = objects.toList(Url.class);
    }
    urls.add(url);
    jedis.set(interfaceName,JSONUtil.toJsonStr(urls));
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

    public Url get(String interfaceName) throws Exception{
    String s = jedis.get(interfaceName);
    JSONArray objects = JSONUtil.parseArray(s);
    List<Url> urls = objects.toList(Url.class);
    return LoadBalance.random(urls);
    }
    }
  • 负载均衡机制(dubbo支持4种,这里先简单使用dubbo默认的random)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 多个Url采用负载均衡(dubbo默认的random)
    */
    @UtilityClass
    public class LoadBalance {

    public Url random(List<Url> urls){
    Random random = new Random();
    return urls.get(random.nextInt(urls.size()));
    }
    }
  • 需要一个netty的server端来接收请求数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public class NettyServer {
    public static void start(String hostName, Integer port) {
    // netty传统的一个老板,多个打工人模式
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    final ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    // 该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。
    //当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。
    .childOption(ChannelOption.SO_KEEPALIVE,true)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    // netty内置了许多编码器和解码器供我们粘包和解包,这里因为我们需要传递Invocation对象,所以使用ObjectDecoder和ObjectEncoder
    pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    pipeline.addLast(new ObjectEncoder());
    // 添加自定义处理器
    pipeline.addLast(new NettyServerHandler());
    }
    });
    ChannelFuture channelFuture = bootstrap.bind(hostName, port).sync();
    System.out.println("服务提供方开始提供服务");
    channelFuture.channel().closeFuture().sync();
    }catch (Exception ignored){}finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }
  • server端自定义处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 接收到请求,处理请求
    Invocation invocation = (Invocation) msg;
    Class aClass = LocalRegister.get(invocation.getInterfaceName());
    // 利用反射执行方法得到res
    Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
    Object res = method.invoke(aClass.newInstance(), invocation.getParams());
    // 写回netty,让client端监听到
    ctx.writeAndFlush("Netty: "+res);
    }
    }
  • 定义接口和实现类(按照dubbo官方的服务最佳实践,接口应该与业务部分分离开来)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public interface HelloService {
    String sayHello();
    }
    public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello() {
    return "Hello";
    }
    }
  • 定义一个netty-client端来模拟发送数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class NettyClient {

    public static NettyClientHandler clientHandler = null;

    public static void initClient(String hostName,Integer port){
    clientHandler = new NettyClientHandler();
    final Bootstrap bootstrap = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup();
    try {
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    pipeline.addLast(new ObjectEncoder());
    pipeline.addLast(clientHandler);

    }
    });
    bootstrap.connect(hostName,port).sync();
    } catch (InterruptedException ignored) {
    }
    }
    }
  • netty-client端自定义处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 wait notify方法
    */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;

    private Invocation invocation;

    private String result;


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //在run方法中会用得到
    this.context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    result = msg.toString();
    notify();
    }

    public void setInvocation(Invocation invocation) {
    this.invocation = invocation;
    }

    @Override
    public synchronized Object call() throws Exception {
    context.writeAndFlush(invocation);
    wait();
    return result;
    }
    }
  • 好,基本的逻辑已经完成,接下就是需要创建一个helloservice的代理对象来调用sayhello方法

  • 创建一个获取代理对象的factory

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class ProxyFactory {

    final static ExecutorService executorService = newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    /**
    * 编写方法,使用代理模式,获取一个代理对象
    */
    public static <T> T getProxy(final Class<T> interfaceClass) {
    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
    if(clientHandler == null){
    Url url = RedisRegister.get(interfaceClass.getName());
    initClient(url.getHostName(),url.getPort());
    }
    clientHandler.setInvocation(invocation);
    return executorService.submit(clientHandler).get();
    }
    });
    }
    }
  • 启动提供者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class Provider {

    public static void main(String[] args) throws IOException {
    // 注册服务接口
    LocalRegister.register(HelloService.class.getName(), HelloServiceImpl.class);
    Url url = new Url(InetAddress.getLocalHost().getHostAddress(), 8080);
    // 注册服务器信息,端口,ip
    RedisRegister.regist(HelloService.class.getName(),url);
    // 启动
    NettyServer.start(url.getHostName(),url.getPort());
    }
    }
    // 服务提供方开始提供服务
  • 启用消费者调用sayhello方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class Consumer {

    public static void main(String[] args) {
    // 获取helloservice的代理对象
    HelloService service = ProxyFactory.getProxy(HelloService.class);
    // 调用sayhello方法
    System.out.println(service.sayHello());
    }
    }
    // Netty: Hello

    到这里,一个简单基于netty的dubbo就完成了

代码仓库地址:

参考文章