手撕RPC框架——实现简单的RPC调用

本文最后更新于 2024年7月12日 凌晨

一、场景设计

现在A,B位于不同的服务器上,但现在A想调用B的某个方法,如何实现呢?

服务端B
有一个用户表

  1. UserService 里有一个功能:getUserByUserId(Integer id)
  2. UserServiceImpl 实现了UserService接口和方法

客户端A
调用getUserByUserId方法, 内部传一个Id给服务端,服务端查询到User对象返回给客户端

如何实现以上调用过程呢?

二、设计思路

主要考虑客户端、服务端、以及双方如何通信才能实现此功能

2.1 客户端的设计

  1. 调用getUserByUserId方法时,内部将调用信息处理后发送给服务端B,告诉B我要获取User
  2. 外部调用方法,内部进行其它的处理——这种场景我们可以使用动态代理的方式,改写原本方法的处理逻辑

2.2 服务端的设计

  1. 监听到A的请求后,接收A的调用信息,并根据信息得到A想调用的服务与方法
  2. 根据信息找到对应的服务,进行调用后将结果发送回给A

2.3 通信设计

  1. 使用Java的socket网络编程进行通信
  2. 为了方便A ,B之间 对接收的消息进行处理,我们需要将请求信息和返回信息封装成统一的消息格式

三、代码实现

在此部分我们将理论转化为代码,分别实现客户端和服务端。

项目目录结构

项目结构

3.1 定义用户信息

1
2
3
4
5
6
7
8
9
10
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
// 客户端和服务端共有的
private Integer id;
private String userName;
private Boolean sex;
}

3.2 用户服务接口

1
2
3
4
5
6
public interface UserService {
// 客户端通过这个接口调用服务端的实现类
User getUserByUserId(Integer id);
//新增一个功能
Integer insertUserId(User user);
}

3.3 用户服务接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class UserServiceImpl implements UserService {
@Override
public User getUserByUserId(Integer id) {
System.out.println("客户端查询了"+id+"的用户");
// 模拟从数据库中取用户的行为
Random random = new Random();
User user = User.builder().userName(UUID.randomUUID().toString())
.id(id)
.sex(random.nextBoolean()).build();
return user;
}

@Override
public Integer insertUserId(User user) {
System.out.println("插入数据成功"+user.getUserName());
return user.getId();
}
}

3.4 定义消息格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//定义请求信息格式RpcRequest
@Data
@Builder
public class RpcRequest implements Serializable {
//服务类名,客户端只知道接口
private String interfaceName;
//调用的方法名
private String methodName;
//参数列表
private Object[] params;
//参数类型
private Class<?>[] paramsType;
}

//定义返回信息格式RpcResponse(类似http格式)
@Data
@Builder
public class RpcResponse implements Serializable {
//状态码
private int code;
//状态信息
private String message;
//具体数据
private Object data;
//构造成功信息
public static RpcResponse sussess(Object data){
return RpcResponse.builder().code(200).data(data).build();
}
//构造失败信息
public static RpcResponse fail(){
return RpcResponse.builder().code(500).message("服务器发生错误").build();
}
}

3.5 实现动态代理类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@AllArgsConstructor
public class ClientProxy implements InvocationHandler {
//传入参数service接口的class对象,反射封装成一个request
private String host;
private int port;

//jdk动态代理,每一次代理对象调用方法,都会经过此方法增强(反射获取request对象,socket发送到服务端)
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//构建request
RpcRequest request=RpcRequest.builder()
.interfaceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.params(args).paramsType(method.getParameterTypes()).build();
//IOClient.sendRequest 和服务端进行数据传输
RpcResponse response= IOClient.sendRequest(host,port,request);
return response.getData();
}
public <T>T getProxy(Class<T> clazz){
Object o = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
return (T)o;
}
}

3.6 封装信息传输类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class IOClient {
//这里负责底层与服务端的通信,发送request,返回response
public static RpcResponse sendRequest(String host, int port, RpcRequest request){
try {
Socket socket=new Socket(host, port);
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());

oos.writeObject(request);
oos.flush();

RpcResponse response=(RpcResponse) ois.readObject();
return response;
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
return null;
}
}
}

3.7 定义服务端Server接口

1
2
3
4
5
public interface RpcServer {
//开启监听
void start(int port);
void stop();
}

3.8 实现RpcServer接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@AllArgsConstructor
public class SimpleRPCRPCServer implements RpcServer {
private ServiceProvider serviceProvide;
@Override
public void start(int port) {
try {
ServerSocket serverSocket=new ServerSocket(port);
System.out.println("服务器启动了");
while (true) {
//如果没有连接,会堵塞在这里
Socket socket = serverSocket.accept();
//有连接,创建一个新的线程执行处理
new Thread(new WorkThread(socket,serviceProvide)).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void stop() {
}
}

3.9 实现WorkThread类

WorkThread类负责启动线程和客户端进行数据传输,WorkThread类中的getResponse方法负责解析收到的request信息,寻找服务进行调用并返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@AllArgsConstructor
public class WorkThread implements Runnable{
private Socket socket;
private ServiceProvider serviceProvide;
@Override
public void run() {
try {
ObjectOutputStream oos=new ObjectOutputStream(socket.getOutputStream());
ObjectInputStream ois=new ObjectInputStream(socket.getInputStream());
//读取客户端传过来的request
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
//反射调用服务方法获取返回值
RpcResponse rpcResponse=getResponse(rpcRequest);
//向客户端写入response
oos.writeObject(rpcResponse);
oos.flush();
} catch (IOException | ClassNotFoundException e) {
e.printStackTrace();
}
}
private RpcResponse getResponse(RpcRequest rpcRequest){
//得到服务名
String interfaceName=rpcRequest.getInterfaceName();
//得到服务端相应服务实现类
Object service = serviceProvide.getService(interfaceName);
//反射调用方法
Method method=null;
try {
method= service.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParamsType());
Object invoke=method.invoke(service,rpcRequest.getParams());
return RpcResponse.sussess(invoke);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
System.out.println("方法执行错误");
return RpcResponse.fail();
}
}
}

3.10 实现本地服务存放器

因为一个服务器会有多个服务,所以需要设置一个本地服务存放器serviceProvider存放服务,在接收到服务端的request信息之后,我们在本地服务存放器找到需要的服务,通过反射调用方法,得到结果并返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//本地服务存放器
public class ServiceProvider {
//集合中存放服务的实例
private Map<String,Object> interfaceProvider;

public ServiceProvider(){
this.interfaceProvider=new HashMap<>();
}
//本地注册服务
public void provideServiceInterface(Object service){
String serviceName=service.getClass().getName();
Class<?>[] interfaceName=service.getClass().getInterfaces();

for (Class<?> clazz:interfaceName){
interfaceProvider.put(clazz.getName(),service);
}

}
//获取服务实例
public Object getService(String interfaceName){
return interfaceProvider.get(interfaceName);
}
}

3.11 客户端主程序

1
2
3
4
5
6
7
8
9
10
11
12
13
public class TestClient {
public static void main(String[] args) {
ClientProxy clientProxy=new ClientProxy("127.0.0.1",9999);
UserService proxy=clientProxy.getProxy(UserService.class);

User user = proxy.getUserByUserId(1);
System.out.println("从服务端得到的user="+user.toString());

User u=User.builder().id(100).userName("wxx").sex(true).build();
Integer id = proxy.insertUserId(u);
System.out.println("向服务端插入user的id"+id);
}
}

3.12 服务端主程序

1
2
3
4
5
6
7
8
9
10
11
public class TestServer {
public static void main(String[] args) {
UserService userService=new UserServiceImpl();

ServiceProvider serviceProvider=new ServiceProvider();
serviceProvider.provideServiceInterface(userService);

RpcServer rpcServer=new SimpleRPCRPCServer(serviceProvider);
rpcServer.start(9999);
}
}

手撕RPC框架——实现简单的RPC调用
https://love-enough.github.io/2024/07/06/手撕RPC框架——实现简单的RPC调用/
作者
GuoZihan
发布于
2024年7月6日
更新于
2024年7月12日
许可协议