分布式框架RPC实现及优化
一个实现了Netty网络通信,ZooKeeper注册中心的RPC框架
Netty
对于需要处理大量并发连接的应用程序,或者即时通信相关的网络应用项目(聊天服务器),Netty是一个更好的选择。它简化了网
络通信的复杂性,提供了丰富的API和灵活的定制能力。
而对于一些简单的网络通信需求,或者对性能要求不高的项目,可以考虑使用NI0来降低开发成本,比如超大文件的写入和导出。
它通过 Pipeline + 事件驱动 实现高效的非阻塞处理
核心设计
- I/O 多路复用:单线程通过
Selector
监听多个 Channel 事件(连接、读写)。 - 责任链模式:每个 Channel 的
Pipeline
由多个ChannelHandler
组成,按顺序处理数据流。 - 事件驱动:仅在有实际 I/O 事件(如数据到达)时触发 Handler 执行。
RPC项目架构
简单架构
简单架构分为客户端、服务端,服务端包含需要调用的方法。
eg:
A想调用B中的方法,例如B中有一个User表,可以通过UseService的getById得到用户属性。
客户端A想要调用getById方法,传入一个id给服务端b,b返回查询对象。
若方法都在A端,则直接传统方法访问数据库即可,但在b端实现则需要用到动态代理转发。
通过动态代理,可以在方法前后增强,即封装为网络通信并获取返回数据。
1、首先声明B中的方法:
common里Pojo实体类建立User类,包含id、名称、性别,UserService里定义一个接口,具体实现在impl中,模拟查询并返回数据。
User类需要实现序列化接口,表示该对象可以序列化。
2、定义A发给B的请求消息RpcRequest和返回的响应RpcResponse,类似于JavaWeb中的DTO
Rpc请求包括四个:服务类名,即要调用的类;方法名,即调用类里的什么方法;参数列表与类型,定义传入的参数和类型,用来定位方法并传入参数。
通过这四个参数,服务端可以通过反射调用获得参数并返回:
RpcResponse如下:
包含状态码,信息、返回数据等。
3、客户端创建IoClient用于实现通信底层逻辑,ClientProxy用于对调用方法使用动态代理来调用IoClient,TestClient用于测试调用。
简单框架底层使用Socket通信,创建OOS用于发送序列化,OIS用于接受序列化对象,随后调用oos.writeObject(requese)将获取的对象序列化发送并刷新,reponse通过ois.readObject读取并反序列化。
命名host和port作为传入的服务器地址和端口,重写invoke函数,是动态代理的核心逻辑,用于封装请求并处理服务器响应。
每次调用方法,传入动态代理,构建request并发送,reponse接受返回数据并返回给调用者。
使用getProxy方法获得该代理对象,传入class参数即可。
TestClient
测试方法创建ClientProxy对象并初始化ip和端口,指定访问地址,随后传入userService类获得代理对象,调用方法。
即本地只需要有UserService的接口,不需要有实现逻辑,服务端B需要有该接口和对应实现方法。
4、服务端需要创建RpcServer接口以及实现(分为SimpleRPCServer和ThreadPoolRPCServer),提供端口监听功能,ServiceProvider用于实现一个简单的注册中心,告诉服务端本地实现了什么服务并获取,WorkThread用于处理客户端请求,调用本地服务并返回响应;TestServer用于启动测试服务端。
RPCServer如下。
SimpleRPCServer用于创建一个serverSocket实现端口监听,为堵塞监听。
ThreadPoolRPCServer类是通过线程池管理和执行任务,处理并发能力。相比简单版每个请求都创建一个新线程,该方法有一套线程管理模式。
简单的注册中心,其中Map存储数据如下,对应接口类名和实现对象:
注册时提供一个实现接口的实例,即可通过service.getClass().getName()获得该实现类对应类名,然后通过service.getClass().getInterfaces(); 获得该类实现的所有方法(一个实现类可能实现多种方法),然后把接口名作为key,实现类实例作为value存入map。
workThread用于实现调用逻辑,本类实现了Runnable接口,可以多线程接受请求并调用本地。
首先通过ois获取对应通信数据并反序列化,调用本类里定义的getResponse获得返回数据,并通过oos序列化发送回去。
GetResponse中的关键逻辑:
1 |
|
Test部分创建服务实现类,实例化服务中心并注册,实例化服务端并启动。
以上即实现了简单的rpc框架。
总结:
前置:定义Requeset,Response,User
客户端:定义RpcClient用于实现与B段网络连接(SOCKET),ClientProxy动态代理用于对调用的Userservice进行封装,把传入的参数(ID)封装成Request,包括接口名、方法、参数、参数类型以方便服务端定位方法。
服务端:定义RpcServer用于监听端口(包括SimpleServer和ThreadPool),ServerProvider用于提供注册服务方便知道本地有什么服务(Map类型),WorkThread用于实现方法,包括通过Socket接收参数、处理参数、返回参数;
引入netty框架
客户端
对于引入netty框架,客户端需要创建三个文件,NettyClientHandler负责接收和处理来自服务器的RpcResponse对象;NettyClientInitializer用于初始化channel和ChannelPipeline,其中channel是网络通信的基本单元,ChannelPipeline为处理消息的责任链,包含一系列ChannelHnadler,每个都做不同操作,如编码解码等;NettyRpcClient,作用等同于前面的IOclient,用于处理网络发送的底层逻辑。
1、NettyClientHandler
channerRead0为核心方法,用于读取返回数据,输入中的ctx是ChannelHandlerContext类型的参数,它是 Netty中每个处理器(ChannelHandler )的上下文对象,代表了当前 I/0 操作的环境。response 是返回的响应信息。
ctx.channel()用于获取当前通道,即于服务端的网络连接。
AttributeKey
2、NettyClientInitializer
用于初始化客户端的channel和channelPipeLine,即定义这个发送器怎么发送,编码器解码器等等。
类中的方法initChannel用于给每个SocketChannel(即新的连接)初始化,并初始化一个独立的流水线用于连接上所有数据的操作。
1 |
|
0,4字节,即一个int的字节,表示长度。
随后添加编码器(序列化),解码器用于接收到的字节流解码回java对象,这里重写通过根据类名解析java类,并转化为该类的对象。
最后把Handler加入流水线。
3、NettyRpcClient
作为客户端调用rpc逻辑,首先客户端初始化,bootstrap为netty用于启动客户端的对象,负责设置于服务器的连接配置,eventLoopGroup为netty的线程池,用于处理I/O操作,NIO为基于非阻塞IO实现。
重写SendReuqest作为发送请求方法,首先ChannelFuture 用于阻塞链接服务端,sync代表同步阻塞直接连接成功,然后channel = channelFuture.channel()获得当前连接通道,类似于socket。随后实现发送数据并阻塞获得结果,调用NettyHandler里添加的key对应的Response,获得响应并返回。
4、修改ClientProxy,选择nettyClient,修改test,选择nettyClient。
总结:需要将客户端RPCClient改为Netty版本,初始化bootstrap并传入初始化器,在初始化器中声明pipeline操作,包括解决粘包、编码器解码器、加入Handler处理逻辑等,其中Handler处理器用于接收服务器返回数据并绑定key在channel中方便后续查询。然后NettyRpcClient后半部分执行Channel发送逻辑和接收逻辑。
服务端
服务端需要创建2个部分,NettyServerHandler,类似于workThread,用于接收客户端数据并返回响应;NettyRpcInitializer,用于初始化,NettyRpcServer用于监听请求。
1、NettyServerHandler
重写的channelRead0用于读取数据,并调用getRespnse得到响应,并发送回去。
2、NettyRpcInitializer
与客户端一致,发送流程和流水线都一样,差别在于多一个注册中心的字段,最后调用NettyServerHandler。
3、NettyRpcServer
与之前的simpleServer,ThreadPoolServer相同,都是实现接口,不同的是,前两者是负责监听端口,每当有request发送,则通过workThread新建一个线程进行操作,而Netty通过IO多路复用实现单Selector(BossGroup)对多channel实现监听,把数据处理逻辑都放到pipeLine中。
首先启动监听时初始化bossGroup和WordGroup,使用ServerBootSrap启动服务器,并指定配置两个线程组,分别用于处理链接请求和IO请求,并指定使用NIO通道。
使用bind将服务端绑定到端口,此时会启动一个监听套接字,随时准备接收,
总结:使用NettyServer初始化bossGroup和workGroup,使用bootStrap启动器绑定初始化器,定义channel pipeline接收的逻辑,并在pipeline中处理输出,其中pipeline最后调用Handler,调用getResponse获得返回。
引入ZooKeeper注册中心
客户端
1、创建ServiceCenter服务中心
InnetSocketAddress是一个类,包含网络地址的IP和端口号。
对应实现类:
2、修改NettyRpcClient
从固定端口号改为ZK传入。
改为
3、修改ClientProxy
选择netty客户端并且不用传参。
改为:
4、修改TestClient
创建代理对象时,不用从客户端这传入端口、地址等信息了
总结:声明一个服务中心,提前写好对应的IP和端口号,修改NettyClient中输入ip和端口的方式,初始化时自动获得一个服务中心的对象。
服务端
1、创建ServiceRegister注册中心。
实现类:
2、修改ServiceProvider
3、修改TestServer
总结:服务端创建一个注册中心,提前定义好ZK的ip和端口,修改ServerProvider,获得自身的ip和端口号,生成本地注册表时发送一份注册到ZK。