在上一篇我提到了dubbo的rpc是基于netty实现的,而且也按照它的原理简单的写了些代码,大致的梳理了一个整体的链路.而这一篇,我打算在上一篇的基础上,把整体的代码运用到正式项目中,看看该怎样实现
1 | 这里改一下,dubbo不仅仅只有netty的实现,还支持rmi协议,http协议等等 |
整体的项目结构如下

- 服务层链路就是简单的调用消费端接口,然后消费端再去服务端发送远端请求
- 然后接口层还是按照dubbo的服务最佳实践提到的,分离维护
先来看下消费者的代码(pom就是一个web和我手写的rpc)
1 |
|
再来看下提供者的代码(这个PayService就是interface模块里的一个接口,如下)
1 |
|
interface公共接口模块
1 |
|
至于这个@MyDubboRefrence和@MyDubboService是什么我接下再进行描述
重点部分-dubbo-framework模块
先看下pom
1 | <dependencies> |
- 还如上一篇一样,只不过我加了些反射的东西
先来看下这个模块有哪些包

annotations包

- EnableDubboConsumer注解用于注在消费方,且注意到这里我@Import的类,一旦服务标注此注解,会初始化ServiceBeanDefinitionRegistry的逻辑
1
2
3
4
5
6
7
public EnableDubboConsumer {
} - EnableDubboProvider注解用于标注在提供方,同上,标注该注解的服务会依次初始化Import中的几个
1
2
3
4
5
6
7
8
public EnableDubboProvider {
} - 标注在公共接口上,客户端在调用之初会扫描注解下所有类,然后将其动态代理到spring中
1
2
3
4
5
6
7
8
9
10
11/**
* @author 小五
* 标注在接口上,使客户端在调用之初将该接口动态代理到spring中
*/
//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
public MyDubboRefrence {
} - 标注在实现类上,一样通过扫描,使其被找到并注册到注册中心
1
2
3
4
5
6
7
8
9
10
11
12/**
* @author 小五
* 标注在实现类上,使其被找到并注册到注册中心
*/
//VM将在运行期也保留注释,因此可以通过反射机制读取注解的信息
public MyDubboService {
Class<?> value();
}consumer包

就是初始化netty-client的,发送远程调用通知使用,这段我还是像上一篇一样会在代理模式中用到
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28public class NettyClient {
public static NettyClientHandler clientHandler = null;
public static void initClient(String hostName,Integer port){
clientHandler = new NettyClientHandler();
final Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(clientHandler);
}
});
bootstrap.connect(hostName, port).sync();
} catch (InterruptedException ignored) {
}
}
}netty-client handler,消费端发送请求处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35/**
* 因为调用的时候,需要等待调用结果,再将结果返回,这需要一个过程,所以需要用到线程等待 wait notify方法
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
private Invocation invocation;
private String result;
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//在run方法中会用得到
this.context = ctx;
}
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
public void setInvocation(Invocation invocation) {
this.invocation = invocation;
}
public synchronized Object call() throws Exception {
context.writeAndFlush(invocation);
wait(10000);
return result;
}
}discovery包
服务发现中心,这里还是基于redis实现的一个处理器,当消费端发送请求时会从redis中获取到提供方的地址信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21public class RedisDiscoveryCenter {
private Jedis jedis;
// random
// 服务发现
public Url get(String interfaceName) throws Exception{
String s = jedis.get(interfaceName);
JSONArray objects = JSONUtil.parseArray(s);
List<Url> urls = objects.toList(Url.class);
return urls.get(ThreadLocalRandom.current().nextInt(urls.size()));
}
public void jedis(String host, Integer port, String password){
Jedis jedis = new Jedis(host, port);
if (StrUtil.isNotEmpty(password)){
jedis.auth(password);
}
this.jedis = jedis;
}
}provider包

- netty-server,这里我实现了ApplicationContextAware和CommandLineRunner,重写ApplicationContextAware的setApplicationContext方法,其在spring初始化对象时会调用到这里,对redisRegisterCenter和url进行赋值操作,CommandLineRunner的run方法会在容器启动完毕时执行,然后异步启动netty-server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class NettyServer implements ApplicationContextAware, CommandLineRunner {
private RedisRegisterCenter redisRegisterCenter;
private Url url;
//用于存储业务接口和实现类的实例对象
private Map<String, Class> handlerMap = new HashMap<>();
// spring构造对象时会调用setApplicationContext方法,从而可以在方法中通过自定义注解获得用户的业务接口和实现
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
redisRegisterCenter = applicationContext.getBean(RedisRegisterCenter.class);
url = applicationContext.getBean(Url.class);
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(MyDubboService.class);
if (CollectionUtil.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
//从业务实现类上的自定义注解中获取到value,从来获取到业务接口的全名
String interfaceName = serviceBean.getClass()
.getAnnotation(MyDubboService.class).value().getName();
handlerMap.put(interfaceName, serviceBean.getClass());
}
}
}
/**
* 组件启动时会执行run,启动netty服务
* @throws Exception
*/
public void run(String... args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel channel)
throws Exception {
channel.pipeline()
.addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(new NettyServerHandler(handlerMap));
}
}).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(url.getHost(), url.getPort()).sync();
for (String clazzName : handlerMap.keySet()) {
redisRegisterCenter.register(clazzName,url);
}
log.info("mydubbo提供方启动");
future.channel().closeFuture().sync();
} catch (Exception e){
log.error("mydubbo提供方启动error: {}",e.getMessage());
}finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
} - netty-server-handler 提供方接收到请求的处理器,解析Invocation,利用反射执行方法得到结果并返回给消费端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Map<String, Class> handlerMap;
public NettyServerHandler(Map<String, Class> handlerMap) {
this.handlerMap = handlerMap;
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 接收到请求,处理请求
Invocation invocation = (Invocation) msg;
Class aClass = handlerMap.get(invocation.getInterfaceName());
// 利用反射执行方法得到res
Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParamTypes());
Object res = method.invoke(aClass.newInstance(), invocation.getParams());
// 写回netty,让client端监听到
ctx.writeAndFlush(res);
}
}
proxy包
- BeanDefinitionRegistryPostProcessor,其父类是BeanFactoryPostProcessor,很熟悉,在aac类-refresh方法-invokeBeanFactoryPostProcessors方法中能够看到,简单说,这玩意是动态注入bean的,而这里我就是利用它将PayService动态注入到spring中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62/**
* 用于Spring动态注入自定义接口
*/
public class ServiceBeanDefinitionRegistry implements BeanDefinitionRegistryPostProcessor {
ApplicationContext applicationContext;
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
Properties properties = new Properties();
InputStream is = this.getClass().getResourceAsStream("/application.properties");
try {
if (is == null){
is = this.getClass().getResourceAsStream("/application.yml");
}
properties.load(is);
} catch (IOException ignored) {}
Set<Class<?>> typesAnnotatedWith = new Reflections(properties.getProperty("dubbo.interface.path"), Arrays.asList(
new SubTypesScanner(false)//允许getAllTypes获取所有Object的子类, 不设置为false则 getAllTypes 会报错.默认为true.
,new MethodParameterNamesScanner()//设置方法参数名称 扫描器,否则调用getConstructorParamNames 会报错
,new MethodAnnotationsScanner() //设置方法注解 扫描器, 否则getConstructorsAnnotatedWith,getMethodsAnnotatedWith 会报错
,new MemberUsageScanner() //设置 member 扫描器,否则 getMethodUsage 会报错, 不推荐使用,有可能会报错 Caused by: java.lang.ClassCastException: javassist.bytecode.InterfaceMethodrefInfo cannot be cast to javassist.bytecode.MethodrefInfo
,new TypeAnnotationsScanner()//设置类注解 扫描器 ,否则 getTypesAnnotatedWith 会报错
)).getTypesAnnotatedWith(MyDubboRefrence.class);
for (Class beanClazz : typesAnnotatedWith) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClazz);
GenericBeanDefinition definition = (GenericBeanDefinition) builder.getRawBeanDefinition();
//在这里,我们可以给该对象的属性注入对应的实例。
//比如mybatis,就在这里注入了dataSource和sqlSessionFactory,
// 注意,如果采用definition.getPropertyValues()方式的话,
// 类似definition.getPropertyValues().add("interfaceType", beanClazz);
// 则要求在FactoryBean(本应用中即ServiceFactory)提供setter方法,否则会注入失败
// 如果采用definition.getConstructorArgumentValues(),
// 则FactoryBean中需要提供包含该属性的构造方法,否则会注入失败
String host = properties.getProperty("dubbo.redis.host");
Integer port = Integer.valueOf(properties.getProperty("dubbo.redis.port"));
String password = properties.getProperty("dubbo.redis.password");
RedisDiscoveryCenter redisDiscoveryCenter = new RedisDiscoveryCenter();
redisDiscoveryCenter.jedis(host,port,password);
AsyncTaskExecutor executor = ExecutorServicePool.executor();
definition.getPropertyValues().addPropertyValue("redisDiscoveryCenter", redisDiscoveryCenter);
definition.getPropertyValues().addPropertyValue("executor", executor);
definition.getConstructorArgumentValues().addGenericArgumentValue(beanClazz);
//注意,这里的BeanClass是生成Bean实例的工厂,不是Bean本身。
// FactoryBean是一种特殊的Bean,其返回的对象不是指定类的一个实例,
// 其返回的是该工厂Bean的getObject方法所返回的对象。
definition.setBeanClass(ProxyFactory.class);
//这里采用的是byType方式注入,类似的还有byName等
definition.setAutowireMode(GenericBeanDefinition.AUTOWIRE_BY_TYPE);
registry.registerBeanDefinition(beanClazz.getSimpleName(), definition);
}
}
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
}
} - ProxyFactory-FactoryBean:自定义bean的创建过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44/**
* @author 小五
*/
public class ProxyFactory<T> implements FactoryBean<T> {
private AsyncTaskExecutor executor;
private RedisDiscoveryCenter redisDiscoveryCenter;
private Class<T> interfaceClass;
public T getObject() throws Exception {
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Invocation invocation = new Invocation(interfaceClass.getName(), method.getName(), method.getParameterTypes(), args);
if(clientHandler == null){
Url url = redisDiscoveryCenter.get(interfaceClass.getName());
initClient(url.getHost(),url.getPort());
}
clientHandler.setInvocation(invocation);
return executor.submit(clientHandler).get();
}
});
}
public ProxyFactory(Class<T> interfaceClass) {
this.interfaceClass = interfaceClass;
}
public Class<?> getObjectType() {
return interfaceClass;
}
public boolean isSingleton() {
return true;
}
} - 还有一个注册中心
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @author 小五
*/
public class RedisRegisterCenter {
private String host;
private int port;
private String password;
public Jedis jedis(){
Jedis jedis = new Jedis(host, port);
if (StrUtil.isNotEmpty(password)){
jedis.auth(password);
}
return jedis;
}
// 服务注册
public void register(String interfaceName, Url url){
try {
Jedis jedis = jedis();
String s = jedis.get(interfaceName);
List<Url> urls = null;
if (s == null){
urls = new ArrayList<>();
}else {
JSONArray objects = JSONUtil.parseArray(s);
urls = objects.toList(Url.class);
}
urls.add(url);
jedis.set(interfaceName,JSONUtil.toJsonStr(urls));
} catch (Exception e) {
e.printStackTrace();
}
}
}
整个核心包的开发并没过多的代码,整个流程是这样的
提供者在开启@EnableDubboProvider后,获取所有标注@MyDubboService的实现类,放到handlerMap中,key为其value值(所实现接口全路径),value为实现类的class,,供netty-server-handler使用.然后调用注册中心,将提供者项目的ip和port注册到redis中,然后启动netty-server等待请求过来,当请求发送给来,走handler的read方法,解析,反射,返回response
消费者在开启@EnableDubboConsumer后,走ServiceBeanDefinitionRegistry的逻辑,扫描dubbo.interface.path下所有标注了@MyDubboRefrence的接口,将其动态注入到spring中,以便项目可以对其进行依赖注入,(autowired,这里用的是byType),然后定义了具体创建该bean过程的ProxyFactory
当客户端发送请求后会走到ProxyFactory的getObject方法,然后组装Invocation,从服务发现中心获取到该服务的地址和port,发送到netty-server端,然后阻塞等待结果通知,