登陆

极彩娱乐测试-「系列教程」手写RPC结构(1),看看100个线程一起调用情况如何

admin 2019-10-10 292人围观 ,发现0个评论

共享自己在Java方面的所思所想,期望你看完之后能有更多更深化的了解,欢迎重视(VX同号)

此文章将会是一个系列,从最简略的RPC结构开端,渐渐的引进zookeeper、netty、最终结合spring完结小的RPC结构,本篇会简略的介绍RPC是什么,RPC整个调用流程是什么,包括了什么组件。然后实践编写一个RPC实例,模仿100个线程调用以验证RPC的可用性,稳定性等。最终总结当时编写的RPC结构存在哪些问题,一个优异的RPC结构应该必备哪些功用点。

该RPC的代码可重视后私信「RPC」获取

目录

1、「系列教程」手写RPC结构(1),看看100个线程一起调用情况怎么

2、「系列教程」手写RPC结构(2) NIO模型学习

3、「系列教程」手写RPC结构(3) zookeeper怎么进行服务办理的?

什么是RPC

RPC(Remote Procedure Call),远程进程调用,可经过网络调用其他机器的服务恳求。RPC是一种标准,和TCP、UDP都没有联系,RCP能够选用TCP协议完结数据传输,乃至能够运用HTTP运用协议。RCP是C端形式,包括了服务端(服务供给方)、客户端(服务运用方),选用特定的网络传输协议,把数据依照特定的协议包装后进行传输操作等操作。先来了解下一个详细的RPC调用恳求的履行进程

本图来自网络

  • 1、服务调用方(Client)调用本地调用的办法调用本地署理目标
  • 2、署理目标将类称号、办法、参数等恳求数据依照恳求协议组装成Request
  • 3、经过Request数据从服务办理获取有用的服务端信息
  • 4、将Request数据依照序列化协议序列化后,运用网络传输协议经过网络发送到服务端中
  • 5、服务端接收到序列化后到数据,运用序列号协议反序列化操作生成Request数据
  • 6、经过Request数据找到详细的服务供给方,并调用履行特定的办法,计算出履行成果
  • 7、履行成果包装成Response,依照原路回来至客户端
  • 8、客户端解析Response,得到对应的履行成果,又或者是详细的过错信息

这便是一个完好的RPC调用进程,对运用方而言就只露出了本地署理目标,剩余的数据解析、运送等都被包装了,从服务供给方的视点看还有服务露出,如下图DUBBO的架构图。

RPC 实践

学习写RPC之前有必要先了解动态署理反射这两个知识点,如不了解先自行了解,本学习笔记不触及到此内容的介绍。

文件夹目录

Request目标

// lombok 
@Data
public class MethodParameter {
String className;
String methodName;
Object[] arguments;
Class[] parameterTypes;
@Override
public String toString() {
return JSON.toJSONString(this);
}
public static MethodParameter convert(InputStream inputStream) {
try {
ObjectInputStream input = new ObjectInputStream(inputStream);
String className = input.readUTF();
String methodName = input.readUTF();
Class[] parameterTypes = (Class[])input.readObject();
Object[] arguments = (Object[])input.readObject();
MethodParameter methodParameter = new MethodParameter();
methodParameter.setClassName(className);
methodParameter.setMethodName(methodName);
methodParameter.setArguments(arguments);
methodParameter.setParameterTypes(parameterTypes);
return methodParameter;
} catch (Exception e) {
throw new RuntimeException("解析恳求过错:" + e.getMessage());
}
}
}

能够很清楚的看到convert办法便是从一个输入流中读取出类称号、办法名等数据,组成一个MethodParameter目标,也便是上面所说的Request。

服务端 - 服务露出

public class RpcExploreService {
private Map objectMap = new HashMap<>();
public void explore(String className, Object object) {
objectMap.put(className, object);
}
public Object invoke(MethodParameter methodParameter) {
Object object = objectMap.get(methodParameter.getClassName());
if (object == null) {
throw new RuntimeException("无对应履行类:" + methodParameter.getClassName());
}
Method method = null;
try {
method = object.getClass().getMethod(methodParameter.getMethodName(), methodParameter.get恐龙图片大全ParameterTypes());
} catch (NoSuchMethodException e) {
throw new RuntimeException("无对应履行办法:" + methodParameter.getClassName() + ", 办法:" + methodParameter.getMethodName());
}
try {
Object result = method.invoke(object, methodParameter.getArguments());
System.out.println(methodParameter);
return result;
} catch (Exception e) {
throw new RuntimeException("invoke办法履行失利:" + e.getMessage());
}
}
}

服务露出存储了一个Map objectMap目标,一切可对外供给服务的都有必要添加到该容器中,以便于收到网络数据后能找到对应的服务,然后选用反射invoke调用,回来得到的成果。

服务端 - 网络数据处理

public class IOService implements Runnable{
private int port;
private ServerSocket serverSocket;
private RpcExploreService rpcExploreService;
private volatile boolean flag;
public IOService(RpcExploreService rpcExploreService, int port) throws IOException {
this.rpcExploreService = rpcExploreService;
this.port = port;
this.serverSocket = new ServerSocket(port);
this.fla极彩娱乐测试-「系列教程」手写RPC结构(1),看看100个线程一起调用情况如何g = true;
System.out.println("服务端启动了");
// 高雅封闭
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
flag = false;
System.out.println("服务端封闭了");
}
});
}
@Override
public void run() {
while (flag) {
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException e) {
}
if (socket == null) {
continue;
}
new Thread(new ServerSocketRunnable(socket)).start();
}
}
class ServerSocketRunnable implements Runnable {
private Socket socket;
public ServerSocketRunnable(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
MethodParameter methodParameter = MethodParameter.convert(inputStream);
Object result = rpcExploreService.invoke(methodParameter);
ObjectOutputStream output = new ObjectOutputStream(outputStream);
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}

简略的BIO模型,敞开了一个ServerSocket后,接收到数据后就把套接字丢给一个新的线程处理,ServerSocketRunnable承受一个socket后,解分出MethodParameter这个恳求目标,然后调用服务露出的invoke办法,再写回到socket传输给客户端

客户端 - 服务订阅

public class RpcUsedService {
private Map proxyObjectMap = new HashMap<>();
private Map classMap = new HashMap<>();
private IOClient ioClient;
public void setIoClient(IOClient ioClient) {
this.ioClient = ioClient;
}
public void register(Class clazz) {
String className = clazz.getName();
classMap.put(className, clazz);
if (!clazz.isInterface()) {
throw new RuntimeException("暂时只支撑接口类型的");
}
try {
RpcInvocationHandler handler = new RpcInvocationHandler();
handler.setClazz(clazz);
Object proxyInstance = Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, handler);
proxyObjectMap.put(className, proxyInstance);
// 然后需求包装起来
} catch (Exception e) {
e.printStackTrace();
}
}
public T get(Class clazz) {
String className = clazz.getName();
return (T) proxyObjectMap.get(className);
}
class RpcInvocationHandler implements InvocationHandler {
private Class clazz;
public void setClazz(Class clazz) {
this.clazz = clazz;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 实践上proxy没啥用途,不需求实在的反invoke射
MethodParameter methodParameter = new MethodParameter();
methodParameter.se极彩娱乐测试-「系列教程」手写RPC结构(1),看看100个线程一起调用情况如何tClassName(clazz.getName());
methodParameter.setMethodName(method.getName());
methodParameter.setArguments(args);
methodParameter.setParameterTypes(method.getParameterTypes());
return ioClient.invoke(methodParameter);
}
}
}

服务运用方需求运用register进行服务的注册,会生成对应的本地署理目标,后续只需求经过本地署理目标。

客户端 - 网络处理

public class IOClient {
private String ip;
private int port;
public IOClient(String ip, int port) throws IOException {
this.ip = ip;
this.port = port;
}
public Object invoke(MethodParameter methodParameter) {
Socket socket = null;
try {
socket = new Socket(ip, port);
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream ouput = new ObjectOutputStream(outputStream);
ouput.writeUTF(methodParameter.getClassName());
ouput.writeUTF(methodParameter.getMethodName());
ouput.writeObject(methodParameter.getParameterTypes());
ouput.writeObject(methodParameter.getArguments());
InputStream inputStream = socket.getInputStream();
ObjectInputStream input = new ObjectInputStream(inputStream);
return input.readObject();
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
return null;
}
}

署理目标被调用后生成一个MethodParameter目标,经过此IOClient把数据传输到服务端,而且回来对应的数据。

实践

服务端

public class Service {
public static void main(String[] args) {
RpcExploreService rpcExploreService = new RpcExploreService();
// 传入的字符串是接口的全称号
rpcExploreService.explore("new2019.rpc.rpc_v1.expore.Helloworld", new HelloWorldImpl());
try {
Runnable ioService = new IOService(rpcExploreService, 10001);
new Thread(ioService).start极彩娱乐测试-「系列教程」手写RPC结构(1),看看100个线程一起调用情况如何();
// 敞开了端口为10001的服务监听
} catch (IOException e) {
}
}
}

客户端

public class Client {
public static void main(String[] args) {
RpcUsedService rpcUsedService = new RpcUsedService();
rpcUsedService.register(Helloworld.class);
try {
IOClient ioClient = new IOClient("127.0.0.1", 10001);
// 网络套接字链接 同上是10001端口
rpcUsedService.setIoClient(ioClient);
Helloworld helloworld = rpcUsedService.get(Helloworld.class);
// 生成的本地署理目标 proxy
for(int i=0; i< 100; i++) {
// 敞开了100个县城
new Thread(() -> {
long start = System.currentTimeMillis();
int a = new Random().nextInt(100);
int b = new Random().nextInt(100);
int c = helloworld.add(a, b);
// .add 操作便是屏蔽了一切的细节,供给给客户端运用的办法
System.out.println("a: " + a + ", b:" + b + ", c=" + c + ", 耗时:" + (System.currentTimeMillis() - start));
}).start();
}
} catch (IOException e) {
}
}
}

测验服务

// Helloworld 接口
public interface Helloworld {
String hi();
int add(int a, int b);
}
// Helloworld 接口 完成类
public class HelloWorldImpl implements Helloworld {
@Override
public String hi() {
return "ok";
}
@Override
public int add(int a, int b) {
long start = System.currentTimeMillis();
try {
Thread.sleep(new Random().nextInt(10000));
// 成心添加了耗时操作,以便于模仿实在的调用操作
} catch (InterruptedException e) {
e.pr极彩娱乐测试-「系列教程」手写RPC结构(1),看看100个线程一起调用情况如何intStackTrace();
}
int c = a + b;
System.out.println(Thread.currentThread().getName() + " 耗时:" + (System.currentTimeMillis() - start));
return c;
}
}

运转作用

服务端履行成果


客户端调用成果

总结 & 考虑

这仅仅一个十分简略的RPC实践,包括了服务露出、服务注册(Proxy生成)、BIO模型进行网络传输,java默许的序列化办法,对RPC有一个开始的知道和了解,知道RPC有必要包括的模块

不过仍是有许多需求优化的点以改善。

  • IO模型:运用的是BIO模型,能够改善换成NIO模型,引进netty
  • 池化:不要随意新建线程,一切的线程都应有线程池统一办理
  • 服务发现:本地模仿的小demo,并没有服务发现,能够选用zk办理
  • 序列化:java自身自带的序列化功率很低,能够换成Hessian(DUBBO默许选用其作为序列化东西)、Protobuf(Protobuf是由Google提出的一种支撑多语言的跨渠道的序列化结构)等

还有例如服务计算、高雅下线、负载均衡等也都是一个老练的RPC结构有必要要考虑到的点。

该RPC的代码可重视后私信「RPC」获取

请关注微信公众号
微信二维码
不容错过
Powered By Z-BlogPHP