在上一篇我提到了dubbo的rpc是基于netty实现的,而且也按照它的原理简单的写了些代码,大致的梳理了一个整体的链路.而这一篇,我打算在上一篇的基础上,把整体的代码运用到正式项目中,看看该怎样实现

1
这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等

整体的项目结构如下

  • 服务层链路就是简单的调用消费端接口,然后消费端再去服务端发送远端请求
  • 然后接口层还是按照dubbo的服务最佳实践提到的,分离维护

先来看下消费者的代码(pom就是一个web和我手写的rpc)

1
2
3
4
5
6
7
8
9
10
@RestController
public class ConsumerController {
@Autowired
ApplicationContext applicationContext;
@GetMapping("/{name}")
public String test(@PathVariable String name){
PayService payService = applicationContext.getBean(PayService.class);
return payService.pay(name);
}
}

再来看下提供者的代码(这个PayService就是interface模块里的一个接口,如下)

1
2
3
4
5
6
7
@MyDubboService(PayService.class)
public class PayserviceImpl implements PayService {
@Override
public String pay(String name) {
return name+": 支付100元";
}
}

interface公共接口模块

1
2
3
4
@MyDubboRefrence
public interface PayService {
String pay(String name);
}

至于这个@MyDubboRefrence和@MyDubboService是什么我接下再进行描述

重点部分-dubbo-framework模块

先看下pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reflections/reflections -->
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
</dependencies>
  • 还如上一篇一样,只不过我加了些反射的东西

    先来看下这个模块有哪些包

annotations包

  • EnableDubboConsumer注解用于注在消费方,且注意到这里我@Import的类,一旦服务标注此注解,会初始化ServiceBeanDefinitionRegistry的逻辑
    1
    2
    3
    4
    5
    6
    7
    @Documented
    @Inherited
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @Import(ServiceBeanDefinitionRegistry.class)
    public @interface EnableDubboConsumer {
    }
  • EnableDubboProvider注解用于标注在提供方,同上,标注该注解的服务会依次初始化Import中的几个
    1
    2
    3
    4
    5
    6
    7
    8
    @Documented
    @Inherited
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @EnableAsync
    @Import({RedisRegisterCenter.class, Url.class,NettyServer.class})
    public @interface EnableDubboProvider {
    }
  • 标注在公共接口上,客户端在调用之初会扫描注解下所有类,然后将其动态代理到spring中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * @author 小五
    * 标注在接口上,使客户端在调用之初将该接口动态代理到spring中
    */
    @Documented
    @Inherited
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
    @Component
    public @interface MyDubboRefrence {
    }
  • 标注在实现类上,一样通过扫描,使其被找到并注册到注册中心
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * @author 小五
    * 标注在实现类上,使其被找到并注册到注册中心
    */
    @Documented
    @Inherited
    @Target({ ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
    @Component
    public @interface MyDubboService {
    Class<?> value();
    }

    consumer包

  • 就是初始化netty-client的,发送远程调用通知使用,这段我还是像上一篇一样会在代理模式中用到

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class NettyClient {

    public static NettyClientHandler clientHandler = null;

    public static void initClient(String hostName,Integer port){
    clientHandler = new NettyClientHandler();
    final Bootstrap bootstrap = new Bootstrap();
    EventLoopGroup group = new NioEventLoopGroup();
    try {
    bootstrap.group(group)
    .channel(NioSocketChannel.class)
    .option(ChannelOption.TCP_NODELAY, true)
    .handler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
    pipeline.addLast(new ObjectEncoder());
    pipeline.addLast(clientHandler);

    }
    });
    bootstrap.connect(hostName, port).sync();
    } catch (InterruptedException ignored) {
    }
    }
    }
  • netty-client handler,消费端发送请求处理器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /**
    * 因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 wait notify方法
    */
    public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;

    private Invocation invocation;

    private String result;


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //在run方法中会用得到
    this.context = ctx;
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    result = msg.toString();
    notify();
    }

    public void setInvocation(Invocation invocation) {
    this.invocation = invocation;
    }

    @Override
    public synchronized Object call() throws Exception {
    context.writeAndFlush(invocation);
    wait(10000);
    return result;
    }
    }

    discovery包

  • 服务发现中心,这里还是基于redis实现的一个处理器,当消费端发送请求时会从redis中获取到提供方的地址信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class RedisDiscoveryCenter {

    private Jedis jedis;

    // random
    // 服务发现
    public Url get(String interfaceName) throws Exception{
    String s = jedis.get(interfaceName);
    JSONArray objects = JSONUtil.parseArray(s);
    List<Url> urls = objects.toList(Url.class);
    return urls.get(ThreadLocalRandom.current().nextInt(urls.size()));
    }

    public void jedis(String host, Integer port, String password){
    Jedis jedis = new Jedis(host, port);
    if (StrUtil.isNotEmpty(password)){
    jedis.auth(password);
    }
    this.jedis = jedis;
    }
    }

    provider包

  • netty-server,这里我实现了ApplicationContextAware和CommandLineRunner,重写ApplicationContextAware的setApplicationContext方法,其在spring初始化对象时会调用到这里,对redisRegisterCenter和url进行赋值操作,CommandLineRunner的run方法会在容器启动完毕时执行,然后异步启动netty-server
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    @Slf4j
    @Component
    public class NettyServer implements ApplicationContextAware, CommandLineRunner {

    private RedisRegisterCenter redisRegisterCenter;

    private Url url;

    //用于存储业务接口和实现类的实例对象
    private Map<String, Class> handlerMap = new HashMap<>();

    // spring构造对象时会调用setApplicationContext方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    redisRegisterCenter = applicationContext.getBean(RedisRegisterCenter.class);
    url = applicationContext.getBean(Url.class);
    Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(MyDubboService.class);
    if (CollectionUtil.isNotEmpty(serviceBeanMap)) {
    for (Object serviceBean : serviceBeanMap.values()) {
    //从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名
    String interfaceName = serviceBean.getClass()
    .getAnnotation(MyDubboService.class).value().getName();
    handlerMap.put(interfaceName, serviceBean.getClass());
    }
    }
    }

    /**
    * 组件启动时会执行run,启动netty服务
    * @throws Exception
    */
    @Async
    @Override
    public void run(String... args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap
    .group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel channel)
    throws Exception {
    channel.pipeline()
    .addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
    .addLast(new ObjectEncoder())
    .addLast(new NettyServerHandler(handlerMap));
    }
    }).option(ChannelOption.SO_BACKLOG, 128)
    .childOption(ChannelOption.SO_KEEPALIVE, true);


    ChannelFuture future = bootstrap.bind(url.getHost(), url.getPort()).sync();
    for (String clazzName : handlerMap.keySet()) {
    redisRegisterCenter.register(clazzName,url);
    }
    log.info("mydubbo提供方启动");
    future.channel().closeFuture().sync();
    } catch (Exception e){
    log.error("mydubbo提供方启动error: {}",e.getMessage());
    }finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
    }
    }
    }
  • netty-server-handler 提供方接收到请求的处理器,解析Invocation,利用反射执行方法得到结果并返回给消费端
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    private final Map<String, Class> handlerMap;

    public NettyServerHandler(Map<String, Class> handlerMap) {
    this.handlerMap = handlerMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // 接收到请求,处理请求
    Invocation invocation = (Invocation) msg;
    Class aClass = handlerMap.get(invocation.getInterfaceName());
    // 利用反射执行方法得到res
    Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
    Object res = method.invoke(aClass.newInstance(), invocation.getParams());
    // 写回netty,让client端监听到
    ctx.writeAndFlush(res);
    }
    }

proxy包

  • BeanDefinitionRegistryPostProcessor,其父类是BeanFactoryPostProcessor,很熟悉,在aac类-refresh方法-invokeBeanFactoryPostProcessors方法中能够看到,简单说,这玩意是动态注入bean的,而这里我就是利用它将PayService动态注入到spring中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    /**
    * 用于Spring动态注入自定义接口
    */
    public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {

    @Autowired
    ApplicationContext applicationContext;

    @Override
    public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    Properties properties = new Properties();
    InputStream is = this.getClass().getResourceAsStream("/application.properties");
    try {
    if (is == null){
    is = this.getClass().getResourceAsStream("/application.yml");
    }
    properties.load(is);
    } catch (IOException ignored) {}
    Set<Class<?>> typesAnnotatedWith = new Reflections(properties.getProperty("dubbo.interface.path"), Arrays.asList(
    new SubTypesScanner(false)//允许getAllTypes获取所有Object的子类, 不设置为false则 getAllTypes 会报错.默认为true.
    ,new MethodParameterNamesScanner()//设置方法参数名称 扫描器,否则调用getConstructorParamNames 会报错
    ,new MethodAnnotationsScanner() //设置方法注解 扫描器, 否则getConstructorsAnnotatedWith,getMethodsAnnotatedWith 会报错
    ,new MemberUsageScanner() //设置 member 扫描器,否则 getMethodUsage 会报错, 不推荐使用,有可能会报错 Caused by: java.lang.ClassCastException: javassist.bytecode.InterfaceMethodrefInfo cannot be cast to javassist.bytecode.MethodrefInfo
    ,new TypeAnnotationsScanner()//设置类注解 扫描器 ,否则 getTypesAnnotatedWith 会报错
    )).getTypesAnnotatedWith(MyDubboRefrence.class);
    for (Class beanClazz : typesAnnotatedWith) {
    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz);
    GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
    //在这里,我们可以给该对象的属性注入对应的实例。
    //比如mybatis,就在这里注入了dataSource和sqlSessionFactory,
    // 注意,如果采用definition.getPropertyValues()方式的话,
    // 类似definition.getPropertyValues().add("interfaceType", beanClazz);
    // 则要求在FactoryBean(本应用中即ServiceFactory)提供setter方法,否则会注入失败
    // 如果采用definition.getConstructorArgumentValues(),
    // 则FactoryBean中需要提供包含该属性的构造方法,否则会注入失败

    String host = properties.getProperty("dubbo.redis.host");
    Integer port = Integer.valueOf(properties.getProperty("dubbo.redis.port"));
    String password = properties.getProperty("dubbo.redis.password");
    RedisDiscoveryCenter redisDiscoveryCenter = new RedisDiscoveryCenter();
    redisDiscoveryCenter.jedis(host,port,password);
    AsyncTaskExecutor executor = ExecutorServicePool.executor();
    definition.getPropertyValues().addPropertyValue("redisDiscoveryCenter", redisDiscoveryCenter);
    definition.getPropertyValues().addPropertyValue("executor", executor);
    definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz);

    //注意,这里的BeanClass是生成Bean实例的工厂,不是Bean本身。
    // FactoryBean是一种特殊的Bean,其返回的对象不是指定类的一个实例,
    // 其返回的是该工厂Bean的getObject方法所返回的对象。
    definition.setBeanClass(ProxyFactory.class);

    //这里采用的是byType方式注入,类似的还有byName等
    definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
    registry.registerBeanDefinition(beanClazz.getSimpleName(), definition);
    }
    }

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

    }
    }
  • ProxyFactory-FactoryBean:自定义bean的创建过程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    /**
    * @author 小五
    */
    @RequiredArgsConstructor
    public class ProxyFactory<T> implements FactoryBean<T> {

    @Setter
    private AsyncTaskExecutor executor;

    @Setter
    private RedisDiscoveryCenter redisDiscoveryCenter;

    private Class<T> interfaceClass;

    @Override
    public T getObject() throws Exception {
    return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
    if(clientHandler == null){
    Url url = redisDiscoveryCenter.get(interfaceClass.getName());
    initClient(url.getHost(),url.getPort());
    }
    clientHandler.setInvocation(invocation);
    return executor.submit(clientHandler).get();
    }
    });
    }

    public ProxyFactory(Class<T> interfaceClass) {
    this.interfaceClass = interfaceClass;
    }

    @Override
    public Class<?> getObjectType() {
    return interfaceClass;
    }

    @Override
    public boolean isSingleton() {
    return true;
    }
    }
  • 还有一个注册中心
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47

    /**
    * @author 小五
    */
    @NoArgsConstructor
    @AllArgsConstructor
    @Getter
    @Setter
    @Configuration
    @EnableConfigurationProperties
    @ConfigurationProperties(prefix = "dubbo.redis")
    public class RedisRegisterCenter {

    private String host;

    private int port;

    private String password;

    @Bean
    public Jedis jedis(){
    Jedis jedis = new Jedis(host, port);
    if (StrUtil.isNotEmpty(password)){
    jedis.auth(password);
    }
    return jedis;
    }

    // 服务注册
    public void register(String interfaceName, Url url){
    try {
    Jedis jedis = jedis();
    String s = jedis.get(interfaceName);
    List<Url> urls = null;
    if (s == null){
    urls = new ArrayList<>();
    }else {
    JSONArray objects = JSONUtil.parseArray(s);
    urls = objects.toList(Url.class);
    }
    urls.add(url);
    jedis.set(interfaceName,JSONUtil.toJsonStr(urls));
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

整个核心包的开发并没过多的代码,整个流程是这样的

提供者在开启@EnableDubboProvider后,获取所有标注@MyDubboService的实现类,放到handlerMap中,key为其value值(所实现接口全路径),value为实现类的class,,供netty-server-handler使用.然后调用注册中心,将提供者项目的ip和port注册到redis中,然后启动netty-server等待请求过来,当请求发送给来,走handler的read方法,解析,反射,返回response

消费者在开启@EnableDubboConsumer后,走ServiceBeanDefinitionRegistry的逻辑,扫描dubbo.interface.path下所有标注了@MyDubboRefrence的接口,将其动态注入到spring中,以便项目可以对其进行依赖注入,(autowired,这里用的是byType),然后定义了具体创建该bean过程的ProxyFactory

当客户端发送请求后会走到ProxyFactory的getObject方法,然后组装Invocation,从服务发现中心获取到该服务的地址和port,发送到netty-server端,然后阻塞等待结果通知,

代码仓库地址(https://gitee.com/xiaowu_wang/mydubbo.git)