手撕RPC框架——引入Netty

本文最后更新于 2024年8月9日 上午

上篇文章中我们使用了Socket编程实现了一个简单的RPC调用的demo,此篇文章,引入Netty框架来进一步提升框架性能。每篇文章对应的八股知识我会在收集整理相关资料后发出,敬请期待!😊

项目地址:【代码随想录知识星球】项目分享-手撕RPC框架

相关理论基础

简单介绍一下Netty,对比Socket编程Netty有哪些优势!

Netty 是一个 NIO 客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如 TCP 和 UDP 套接字服务器。

Netty核心组件

Channel

Channel是 Java NIO 的一个基本构造。可以看作是传入或传出数据的载体。因此,它可以被打开或关闭,连接或者断开连接。

EventLoop 与 EventLoopGroup

EventLoop 定义了Netty的核心抽象,用来处理连接的生命周期中所发生的事件,在内部,将会为每个Channel分配一个EventLoop。

EventLoopGroup 是一个 EventLoop 池,包含很多的 EventLoop。

Netty 为每个 Channel 分配了一个 EventLoop,用于处理用户连接请求、对用户请求的处理等所有事件。EventLoop 本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个 Channel 的所有 IO 事件。

一个 Channel 一旦与一个 EventLoop 相绑定,那么在 Channel 的整个生命周期内是不能改变的。一个 EventLoop 可以与多个 Channel 绑定。即 Channel 与 EventLoop 的关系是 n:1,而 EventLoop 与线程的关系是 1:1。

ServerBootstrap 与 Bootstrap

Bootstarp 和 ServerBootstrap 被称为引导类,指对应用程序进行配置,并使他运行起来的过程。Netty处理引导的方式是使你的应用程序和网络层相隔离。

Bootstrap 是客户端的引导类,Bootstrap 在调用 bind()(连接UDP)和 connect()(连接TCP)方法时,会新创建一个 Channel,仅创建一个单独的、没有父 Channel 的 Channel 来实现所有的网络交换。

ServerBootstrap 是服务端的引导类,ServerBootstarp 在调用 bind() 方法时会创建一个 ServerChannel 来接受来自客户端的连接,并且该 ServerChannel 管理了多个子 Channel 用于同客户端之间的通信。

ChannelHandler 与 ChannelPipeline

ChannelHandler 是对 Channel 中数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以是用户自定义的。这些处理器会被统一添加到一个 ChannelPipeline 的对象中,然后按照添加的顺序对 Channel 中的数据进行依次处理。

ChannelFuture

Netty 中所有的 I/O 操作都是异步的,即操作不会立即得到返回结果,所以 Netty 中定义了一个 ChannelFuture 对象作为这个异步操作的“代言人”,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的addListener() 方法为该异步操作添加监 NIO 网络编程框架 Netty 听器,为其注册回调:当结果出来后马上调用执行。

与Socekt相比优势

  • 由BIO转换为NIO

    • 阻塞(Block):往往需要等待缓冲区中的数据准备好过后才处理其他的事情,否则==一直等待在那里==。
      阻塞IO
    • 非阻塞(Non-Block):当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,==不会等待==。如果数据已经准备好,也直接返回。
      非阻塞IO
  • 可以自主编写 编码/解码器,序列化器等等,可拓展性和灵活性高

  • 支持TCP,UDP多种传输协议;支持堵塞返回和异步返回

实现思路

Netty所起到的作用

  • 客户端调用RpcClient.sendRequest方法 → NettyClientInitializer → Encoder编码 → 发送request
  • 服务端RpcServer接收 → NettyServerInitializer → Decoder解码 → NettyRPCServerHandler → getResponse调用 → 返回response
  • 客户端接收 → NettyServerInitializer → Decoder解码 → NettyRPCServerHandler处理结果并返回给上层

代码实现

在此部分我们将理论转化为代码,分别实现客户端和服务端。

项目目录结构

项目目录结构

定义RpcClient接口

1
2
3
4
5
public interface RpcClient {

//定义底层通信的方法
RpcResponse sendRequest(RpcRequest request);
}

配置netty对消息的处理机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class NettyClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//使用长度字段解码器 消息格式 【长度】【消息体】,解决沾包问题
pipeline.addLast(
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
//计算当前待发送消息的长度,写入到前4个字节中
pipeline.addLast(new LengthFieldPrepender(4));

//使用Java序列化方式,netty的自带的解码编码支持传输这种结构
pipeline.addLast(new ObjectEncoder());
//使用了Netty中的ObjectDecoder,它用于将字节流解码为 Java 对象。
//在ObjectDecoder的构造函数中传入了一个ClassResolver 对象,用于解析类名并加载相应的类。
pipeline.addLast(new ObjectDecoder(new ClassResolver() {
@Override
public Class<?> resolve(String className) throws ClassNotFoundException {
return Class.forName(className);
}
}));

pipeline.addLast(new NettyClientHandler());
}
}

Netty粘包问题的常见解决方案

粘包和拆包问题也叫做粘包和半包问题,它是指在数据传输时,接收方未能正常读取到一条完整数据的情况(只读取了部分数据,或多读取到了另一条数据的情况)就叫做粘包或拆包问题。

粘包问题

粘包问题是指在网络通信中,发送方连续发送的多个小数据包被接收方一次性接收的现象。这可能是因为底层传输层协议(如 TCP)会将多个小数据包合并成一个大的数据块进行传输,导致接收方在接收数据时一次性接收了多个数据包,造成粘连。

粘包问题实例

拆包/半包问题

拆包问题是指发送方发送的一个大数据包被接收方拆分成多个小数据包进行接收的现象。这可能是因为底层传输层协议(如 TCP)将一个大数据包拆分成多个小的数据块进行传输,导致接收方在接收数据时分别接收了多个小数据包,造成拆开。

拆包问题实例

xxxxxxxxxx public class TestServer {    public static void main(String[] args) {        UserService userService=new UserServiceImpl();​        ServiceProvider serviceProvider=new ServiceProvider();        serviceProvider.provideServiceInterface(userService);​        RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);        rpcServer.start(9999);   }}java

粘包问题通常发生在 TCP/IP 协议中,因为 TCP 是面向连接的传输协议,它是以“流”的形式传输数据的,而“流”数据是没有明确的开始和结尾边界的,所以就会出现粘包问题

常见解决方案

  • 固定大小方法:发送方和接收方固定发送数据大小,当字符长度不够时用空字符弥补,有了固定大小之后就知道每条消息的具体边界了,这样就没有粘包的问题了。
  • 自定义数据协议(定义数据长度):在 TCP 协议的基础上封装一层自定义数据协议,在自定义数据协议中,包含数据头(存储数据的大小)和 数据的具体内容,这样服务端得到数据之后,通过解析数据头就可以知道数据的具体长度了,也就没有粘包的问题了。
  • 特殊分割符:以特殊的字符结尾,比如以“\n”结尾,这样我们就知道数据的具体边界了,从而避免了粘包问题。

以上三种方案中,第一种固定大小的方法可能会造成网络流量的浪费,以及传输性能慢的问题;第二种解决方案实现难度大,且不利于维护,所以比较推荐的是第三种方案,使用特殊分隔符来区分消息的边界,从而避免粘包问题。

Netty解决方案

PS:在 Netty 中,解码器(Decoder)起着非常重要的作用。解码器主要负责将从网络中接收到的原始字节流数据转换为应用程序能够理解的 Java 对象或消息格式。使用解码器可以解决粘包和拆包问题、协议转换问题、消息编码(如文本转换为字节流)等问题。

  • 使用定长解码器(FixedLengthFrameDecoder):每个数据包都拥有固定的长度,接收端根据固定长度对数据进行切分,从而解决了粘包问题。
  • 使用行分隔符解码器(LineBasedFrameDecoder):以行为单位进行数据包的解码,从而解决粘包问题。
  • 使用分隔符解码器(DelimiterBasedFrameDecoder):使用特定的分隔符来标识消息边界,这样接收端可以根据分隔符正确切分消息。
  • 使用长度字段解码器(LengthFieldBasedFrameDecoder):在消息头部加入表示消息长度的字段,接收端根据长度字段来确定消息的边界,而从解决粘包问题。

NettyClientHandler类

指定对接收消息的处理方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
// 接收到response, 给channel设计别名,让sendRequest里读取response
AttributeKey<RpcResponse> key = AttributeKey.valueOf("RPCResponse");
//相当于将channel和特定信息进行绑定
ctx.channel().attr(key).set(response);
ctx.channel().close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常处理
cause.printStackTrace();
ctx.close();
}
}

服务端重构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@AllArgsConstructor
public class NettyRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvider;
@Override
public void start(int port) {
// netty 服务线程组boss负责建立连接, work负责具体的请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
System.out.println("netty服务端启动了");
try {
//启动netty服务器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化
serverBootstrap.group(bossGroup,workGroup).channel(NioServerSocketChannel.class)
//NettyClientInitializer这里 配置netty对消息的处理机制
.childHandler(new NettyServerInitializer(serviceProvider));
//同步堵塞
ChannelFuture channelFuture=serverBootstrap.bind(port).sync();
//死循环监听
channelFuture.channel().closeFuture().sync();
}catch (InterruptedException e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

@Override
public void stop() {
}
}

NettyRPCServerHandler类

NettyServerInitializer类 和NettyClientInitializer类似,接收来自客户端的信息,并解析调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@AllArgsConstructor
public class NettyRPCServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
private ServiceProvider serviceProvider;
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception {
//接收request,读取并调用服务
RpcResponse response = getResponse(request);
ctx.writeAndFlush(response);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvider.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

手撕RPC框架——引入Netty
https://love-enough.github.io/2024/07/07/手撕RPC框架——引入Netty/
作者
GuoZihan
发布于
2024年7月7日
更新于
2024年8月9日
许可协议