计算机网络报告week10

RPC 协议

实验目的

  • 掌握RPC的工作原理
  • 会写RPC程序

实验任务

  • 学习并理解RPC的工作原理

  • 熟悉并掌握gRPC框架使用

协议简介

RPC协议

RPC(Remote Procedure Call Protocol)远程过程调用协议,一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。

RPC协议要点

RPC是协议:既然是协议就只是一套规范,那么就需要有人遵循这套规范来进行实现。目前典型的RPC实现包括:Dubbo、Thrift、GRPC、Hetty等。这里要说明一下,目前技术的发展趋势来看,实现了RPC协议的应用工具往往都会附加其他重要功能,例如Dubbo还包括了服务治等功能。

网络协议和网络IO模型对其透明:既然RPC的客户端认为自己是在调用本地对象。那么传输层使用的是TCP/UDP还是HTTP协议,又或者是一些其他的网络协议它就不需要关心了。既然网络协议对其透明,那么调用过程中,使用的是哪一种网络IO模型调用者也不需要关心。

信息格式对其透明:我们知道在本地应用程序中,对于某个对象的调用需要传递一些参数,并且会返回一个调用结果。至于被调用的对象内部是如何使用这些参数,并计算出处理结果的,调用方是不需要关心的。那么对于远程调用来说,这些参数会以某种信息格式传递给网络上的另外一台计算机,这个信息格式是怎样构成的,调用方是不需要关心的。

应该有跨语言能力:调用方实际上不清楚远程服务器的应用程序是使用什么语言运行的。那么对于调用方来说,无论服务器方使用的是什么语言,本次调用都应该成功,并且返回值也应该按照调用方程序语言所能理解的形式进行描述。

Client:

RPC协议的调用方。就像上文所描述的那样,最理想的情况是RPC Client在完全不知道有RPC框架存在的情况下发起对远程服务的调用。但实际情况来说Client或多或少的都需要指定RPC框架的一些细节。

Server:

在RPC规范中,这个Server并不是提供RPC服务器IP、端口监听的模块。而是远程服务方法的具体实现(在JAVA中就是RPC服务接口的具体实现)。其中的代码是最普通的和业务相关的代码,甚至其接口实现类本身都不知道将被某一个RPC远程客户端调用。

Stub/Proxy:

RPC代理存在于客户端,因为要实现客户端对RPC框架“透明”调用,那么客户端不可能自行去管理消息格式、不可能自己去管理网络传输协议,也不可能自己去判断调用过程是否有异常。这一切工作在客户端都是交给RPC框架中的“代理”层来处理的。

Message Protocol:

在上文我们已经说到,一次完整的client-server的交互肯定是携带某种两端都能识别的,共同约定的消息格式。RPC的消息管理层专门对网络传输所承载的消息信息进行编码和解码操作。目前流行的技术趋势是不同的RPC实现,为了加强自身框架的效率都有一套(或者几套)私有的消息格式。

Transfer/Network Protocol:

传输协议层负责管理RPC框架所使用的网络协议、网络IO模型。例如Hessian的传输协议基于HTTP(应用层协议);而Thrift的传输协议基于TCP(传输层协议)。传输层还需要统一RPC客户端和RPC服务端所使用的IO模型;

Selector/Processor:

存在于RPC服务端,用于服务器端某一个RPC接口的实现的特性(它并不知道自己是一个将要被RPC提供给第三方系统调用的服务)。所以在RPC框架中应该有一种“负责执行RPC接口实现”的角色。包括:管理RPC接口的注册、判断客户端的请求权限、控制接口实现类的执行在内的各种工作。

IDL:

实际上IDL(接口定义语言)并不是RPC实现中所必须的。但是需要跨语言的RPC框架一定会有IDL部分的存在。这是因为要找到一个各种语言能够理解的消息结构、接口定义的描述形式。如果您的RPC实现没有考虑跨语言性,那么IDL部分就不需要包括,例如JAVA RMI因为就是为了在JAVA语言间进行使用,所以JAVA RMI就没有相应的IDL。

Task1:运行项目

结合代码分析rpc流程和其中涉及的关键组件。

新知识 annotation

在anno这个文件夹下存放的是Java注解

Annotation其实是代码里的特殊标记,这些标记可以在编译、类加载、运行时被读取,并执行相应的处理。通过使用Annotation,程序开发人员可以在不改变原有逻辑的情况下,在源文件嵌入一些补充信息。代码分析工具、开发工具和部署工具可以通过这些补充信息进行验证或者进行部署。

Annotation提供了一条为程序元素设置元数据的方法,从某些方面来看,Annotation就像修饰符一样被使用,可用于修饰包、类、构造器、方法、成员变量、参数、局部变量的声明,这些信息被存储在Annotation的“name=value”对中。

  • 使用@Retention

  @Retention只能用于修饰一个Annotation定义,用于指定该Annotation可以保留多长时间,如果未设置@RetentionAnnotation的有效范围为枚举常量Class表示的范围@Retention包含一个RetentionPolicy类型的value成员变量,所以使用@Retention时必须为该value成员变量指定值。

value成员变量的值只能是如下三个:

  1. RetentionPolicy.CLASS: 编译器将把注释记录在class文件中。当运行Java程序时,JVM不在保留注释,这是默认值。
  2. RetentionPolicy.RUNTIME: 编译器将把注释记录在class文件中。当运行Java程序时,JVM也会保留注释,程序可以通过反射获取该注释。
  3. RetentionPolicy.SOURCE: 注解仅存在于源码中,在class字节码文件中不包含。
  • 使用@Target

  @Target也是用于修饰一个Annotation定义,它用于指定被修饰Annotation能用于修饰哪些程序元素。如果未设置@target,说明annotation适用于所有程序元素。@Target Annotation也包含一个名为value的成员变量,该成员变量只能是如下几个:

  1. ElementType.ANNOTATION_TYPE: 指定该策略的Annotation只能修饰Annotation。
  2. ElementType.CONSTRUCTOR: 指定该策略的Annotation能修饰构造器。
  3. ElementType.FIELD: 指定该策略的Annotation只能修饰成员变量。
  4. ElementType.LOCAL_VARIABLE: 指定该策略的Annotation只能修饰局部变量。
  5. ElementType.METHOD: 指定该策略的Annotation只能修饰方法。
  6. ElementType.PACKAGE: 指定该策略的Annotation只能修饰包定义。
  7. ElementType.PARAMETER: 指定该策略的Annotation可以修饰参数。
  8. ElementType.TYPE: 指定该策略的Annotation可以修饰类、接口(包括注释类型)或枚举定义。

在这里,我们使用了 @Retention 中的 RetentionPolicy.RUNTIME 和 @Target 中的 ElementType.TYPE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package qjm.rpc.anno;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* rpc服务类注解
* @author QJM
*
*/
@Target(ElementType.TYPE)//适用于类、接口、枚举
@Retention(RetentionPolicy.RUNTIME)//运行时加载到JVM中
public @interface RpcService {
//实现接口
Class<?> value();
}

在定义 Annotation 类型时,需要用关键字 @interface ,这个关键字的隐含意思是继承了 java.lang.annotation.Annotation接口。上面这个Annotation类型只包含一个泛型类型的成员Class<?> value() (如果在所定义的Annotation类型中只包含一个成员,通常将成员名称命名为value)

流程分析

流程如下所示:

根据上面的流程示意图,我们依次拆分这个项目:因为 ServerTest 代码很简单,就是启动一个RpcServer 实例,因此我们从ClientTest出发,用递归思路来剖析一下

ServerTest和ClientTest两个文件模拟的是本地的客户端,在不知道调用细节的情况下,调用存在于远程计算机上的某个对象也就是PersonService中的两个方法,就像调用本地应用程序中的对象一样。

首先,rpc-demo 项目是一个maven项目。其主体架构如下:

根目录
pom.xml Maven的pom文件
src/
main/ 项目主体目录根
java 源代码目录
test/ 项目测试目录根
java 测试代码目录
target/ 输出目录根

从ClientTest出发

首先我们来看 test/java/qjm.rpc.test/ClientTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ClientTest {

public static void main(String[] args) {
RpcClientProxy proxy = new RpcClientProxy("127.0.0.1",9998);
PersonService service = proxy.getProxy(PersonService.class);

System.out.println(service.getInfo());

Person person = new Person();
person.setAge(23);
person.setName("Qjm");
person.setSex("男");
System.out.println(service.printInfo(person));
}
}

在这个测试文件中,首先调用了RpcClientProxy 的构造方法,创建一个RpcClientProxy对象。然后当proxy对象调用getProxy方法的时候, 会被 invoke 方法拦截,并执行 invoke 方法。

PrcClientProxy

首先, getProxy 会接收一个泛型,然后返回一个代理对象。

1
2
3
4
5
@SuppressWarnings("unchecked")
public <T>T getProxy(Class<T> clazz){
// clazz不是接口不能使用JDK动态代理
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, RpcClientProxy.this);
}

这里,传入的是 PersonService.class也就是获得这个接口所对应的Class实例,这个实例中提供了两个方法:Person getInfo(),printInfo(Person person)

接下来,当 ClientTest 调用 service.getInfo() 时,代理对象的方法被调用时会被invoke方法拦截,执行invoke方法

  • 封装参数,用于发送到服务器,定位服务、执行服务
  • 链接服务器调用服务
1
2
3
4
5
6
7
8
9
10
11
public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
//封装参数
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(params);
//链接服务器调用服务
RpcClient client = new RpcClient();
return client.start(request, host, port);
}

在封装参数阶段,首先,会创建一个RpcRequest 对象来标准化参数。关于RpcRequest我们之后再介绍。然后,我们给request设置完整的类名、方法名和参数类型,为调用服务做准备。这里,

  • 类名:qjm.rpc.test.imp.PersonService
  • 方法名:getInfo
  • 参数类型: [],因为 getinfo() 是没有参数的
  • 参数:null

封装完毕之后,会新建一个RpcClient 对象,传入刚刚封装好的 Rpcrequest对象,本地ip和端口作为参数,并启动它。

RpcClient

RpcClient对象的作用就是会把传入的请求对象通过socket发送给RpcServer,收到server传回来的数据之后,经过处理并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class RpcClient {

public Object start(RpcRequest request, String host, int port) throws Throwable{
// 首先,用传入的ip和端口 创建一个 socket对象
Socket server = new Socket(host, port);

InputStream in = null;
ObjectInputStream oin = null;
OutputStream out = null;
ObjectOutputStream oout = null;
try {
// 1. 发送请求数据
out = server.getOutputStream();
oout = new ObjectOutputStream(out);
oout.writeObject(request);//把请求体通过socket推出
oout.flush();

// 2. 获取返回数据,强转参数类型
in = server.getInputStream();
oin = new ObjectInputStream(in);
Object res = oin.readObject(); //从返回流中读取返回的对象
RpcResponse response = null;
if(!(res instanceof RpcResponse)){ //如果返回的对象不是RpcResponse实例,报错
throw new RuntimeException("返回参数不正确");
}else{ // 如果是,那么进行一个显式类型转换
response = (RpcResponse) res;
}

// 3. 返回结果
if(response.getError() != null){ //服务器产生异常
throw response.getError();
}
return response.getResult();
}finally{
try { //关闭流
if(in != null) in.close();
if(oin != null) oin.close();
if(out != null) out.close();
if(oout != null) oout.close();
if(server != null) server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}
RpcRequest

invoke方法和RpcClient中已经提到了多次RpcRequest,现在我们来分析一下这个类

这个类实现了一个Serializable 接口,也就是将一个类序列化。一个类的对象要想序列化成功,必须满足两个条件:

  • 该类必须实现 java.io.Serializable 接口。
  • 该类的所有属性必须是可序列化的。如果有一个属性不是可序列化的,则该属性必须注明是短暂的。

Java的序列化机制中,一个对象可以表示为一个字节序列,该字节序列包括该对象的数据、有关对象类型的信息和存储在对象中的数据的类型。将序列化对象写入文件之后,可以从文件中读取出来,并对其进行反序列化。

因为RpcRequest,RpcResponse的实例化对象是要通过socket在client和server之间传递的,因此我们这里需要将其序列化。

RpcRequest 的实例化对象是 RpcClient 发出的 ,是RpcServer 接收的

我们来看看RpcRequest 方法中的变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RpcRequest implements Serializable{
/*
serialVersionUID作用:
序列化时为了保持版本的兼容性,即在版本升级时反序列化仍保持对象的唯一性。
默认的1L,比如:private static final long serialVersionUID = 1L;
*/
private static final long serialVersionUID = 1L;
// 存放当前请求方法所在的类的名字
private String className;
// 存放当前请求方法的名字
private String methodName;
// 存放当前请求方法的参数的数据类型,因为参数的类型各不相同,这里需要使用泛型
private Class<?>[] paramTypes;
// 存放请求方法的参数
private Object[] params;
// 剩下的方法都是上面变量的 getter和setter,这里略去不讲
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
RpcResponse

RpcResponse类,是RpcServer 处理并发出的,是RpcClient接收的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class RpcResponse implements Serializable {
//serialVersionUID 和 RpcRequest 一样
private static final long serialVersionUID = 1L;
// 当参数错误时,用来存放错误
private Throwable error;
// 存放RpcServer处理 RpcRequest后的结果
private Object result;

// 下面是参数的getter和setter
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}

RpcServer

现在我们来讲一下 RpcServer类,这个类比较复杂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class RpcServer {
/**
* 启动rpc服务
* @param port 监听端口
* @param clazz 服务类所在包名,多个用英文逗号隔开
*/
// 这里有两个参数,第一个是端口,第二个是
public void start(int port, String clazz) {
ServerSocket server = null;
try {
// 1. 创建socket连接
server = new ServerSocket(port);
// 2. 获取所有rpc服务类,即发布服务。services 是 键值对类型的
Map<String, Object> services = getService(clazz);
// 3. 创建线程池
/*
ThreadPoolExecutor 的参数:
int corePoolSize, 线程池中的线程数量
int maximumPoolSize, 线程池中最大允许存放的线程数量
long keepAliveTime, 当线程数大于核心数时,这是多余的空闲线程在终止之前等待新任务的最大时间。
TimeUnit unit,保持活动时间参数的时间单位,这里设为 秒
BlockingQueue<Runnable> workQueue,在执行任务之前用于保存任务的队列。这个队列将只保存execute方法提交的Runnable任务。
*/
Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
while(true){
// 4. 获取客户端连接
Socket client = server.accept();
// 5. 查找并执行服务
RpcService service = new RpcService(client, services);
//利用线程池,来执这个RpcService,这里 execute方法的对象一定要是Runnable的
executor.execute(service);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
//关闭监听
if(server != null)
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

/**
* 实例化所有rpc服务类,并返回键值对
* @param clazz 服务类所在包名,多个用英文逗号隔开
* @return
*/
public Map<String,Object> getService(String clazz){
try {
Map<String, Object> services = new HashMap<String, Object>();
// 这里是将传入的字符串数组用 , 依次分割其中的元素。但是我们传入的只有一个元素:qjm.rpc.test
// 因此:clazzes: [qjm.rpc.test]
String[] clazzes = clazz.split(",");
List<Class<?>> classes = new ArrayList<Class<?>>();
for(String cl:clazzes){
List<Class<?>> classList = getClasses(cl);
classes.addAll(classList);
}
//对每一个文件夹进行查找,并找出有 @RpcService 注解的类,放入classes列表中
//classes: [class qjm.rpc.test.imp.PersonServiceImpl]
//对classes中的类进行循环实例化
for(Class<?> cla:classes){
Object obj = cla.newInstance();
// key 值 是该类的名字,value值是该类的一个实例化对象
services.put(cla.getAnnotation(qjm.rpc.anno.RpcService.class).value().getName(), obj);
}
//此时的services: {qjm.rpc.test.imp.PersonService=qjm.rpc.test.imp.PersonServiceImpl@d716361}
return services;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* 获取包下所有有@RpcSercive注解的类
* @param pckgname
* @return
* @throws ClassNotFoundException
*/
public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
List<Class<?>> classes = new ArrayList<Class<?>>();
File directory = null;
try {
ClassLoader cld = Thread.currentThread().getContextClassLoader();
if (cld == null)
throw new ClassNotFoundException("Can't get class loader.");
String path = pckgname.replace('.', '/');
URL resource = cld.getResource(path);
if (resource == null)
throw new ClassNotFoundException("No resource for " + path);
directory = new File(resource.getFile());
} catch (NullPointerException x) {
throw new ClassNotFoundException(pckgname + " (" + directory + ") does not appear to be a valid package a");
}
if (directory.exists()) {
//获取所有文件
String[] files = directory.list();
File[] fileList = directory.listFiles();
for (int i = 0;fileList != null && i < fileList.length; i++) {
File file = fileList[i];
//判断是否是Class文件
if (file.isFile() && file.getName().endsWith(".class")) {
Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
if(clazz.getAnnotation(qjm.rpc.anno.RpcService.class) != null){
classes.add(clazz);
}
}else if(file.isDirectory()){ //如果是目录,递归查找
List<Class<?>> result = getClasses(pckgname+"."+file.getName());
if(result != null && result.size() != 0){
classes.addAll(result);
}
}
}
} else{
throw new ClassNotFoundException(pckgname + " does not appear to be a valid package b");
}
return classes;
}
}
imp文件夹

我们在RpcServer中实例化了这个文件夹中的PersonServiceImpl类,现在来看看这个类长啥样:

这个类继承自 PersonService 接口,接口中定义了 getInfo()方法和printInfo()方法

此外,之前说的 Annotation在这里被派上了用场,因为这个类被标记了,所以能被RpcServer快速地找出并标记、实例化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package qjm.rpc.test.imp;

import qjm.rpc.anno.RpcService;

@RpcService(PersonService.class)
public class PersonServiceImpl implements PersonService{

public Person getInfo() {
Person person = new Person();
person.setAge(22);
person.setName("qjm");
person.setSex("男");
return person;
}

public boolean printInfo(Person person) {
if(person != null){
System.out.println(person);
return true;
}
return false;
}

}

当调用getInfo()时,会返回一个person对象,当调用printInfo()时,会返回一个布尔值

Person类的定义如下:

定义了名字、年龄和性别以及它们的getter和setter,此外,还重写了toString函数,也就是当我们打印一个Person对象时,会按照我们定义的格式输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Person implements Serializable{

private static final long serialVersionUID = 1L;

private String name;
private int age;
private String sex;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getSex() {
return sex;
}
public void setSex(String sex) {
this.sex = sex;
}
@Override
public String toString() {
return "Person [name=" + name + ", age=" + age + ", sex=" + sex + "]";
}
}

RpcService

RpcServer在与RpcClient 建立连接后,会新建一个 RpcService 对象,传入参数为:client 连接以及键值对类型的services

在这个类中,首先会读取流中的 RpcRequest, 然后在services中查找并执行RpcRequest中包含着的本地客户端要请求的方法,最后返回执行后的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class RpcService implements Runnable{

private Socket client;
private Map<String,Object> services;

/**
* @param client 客户端
* @param services 所有服务
*/
// 这是一个构造器
public RpcService(Socket client, Map<String, Object> services) {
super();
this.client = client;
this.services = services;
}



public void run() {
InputStream in = null;
ObjectInputStream oin = null;
OutputStream out = null;
ObjectOutputStream oout = null;
RpcResponse response = new RpcResponse();
try {
// 1. 获取流
in = client.getInputStream();
oin = new ObjectInputStream(in);
out = client.getOutputStream();
oout = new ObjectOutputStream(out);

// 2. 获取RpcRequest中的请求数据,强转参数类型
Object param = oin.readObject();
RpcRequest request = null;
if(!(param instanceof RpcRequest)){
response.setError(new Exception("参数错误"));
oout.writeObject(response);
oout.flush();
return;
}else{
request = (RpcRequest) param;
}

// 3. 查找并执行服务方法
// 首先用request.getClassName()获取类名,然后再键值对中用类名查找得到实例化的对象
Object service = services.get(request.getClassName());
// 已经获取了实例化的对象,接下来要根据方法名称、方法的参数的数据类型来获得具体的方法
Class<?> clazz= service.getClass();
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
// 最后,调用该方法,并返回结果
Object result = method.invoke(service, request.getParams());
// 4. 将结果存放到 RpcResponse 实例化对象中,并通过socket 传回。
response.setResult(result);
oout.writeObject(response);
oout.flush();
return;
} catch (Exception e) {
try { //异常处理
if(oout != null){
response.setError(e);
oout.writeObject(response);
oout.flush();
}
} catch (Exception e1) {
e1.printStackTrace();
}
return;
}finally{
try { // 关闭流
if(in != null) in.close();
if(oin != null) oin.close();
if(out != null) out.close();
if(oout != null) oout.close();
if(client != null) client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

还是回到ClientTest的这行代码: System.out.println(service.getInfo());

通过这样绕一大圈,最后是在 RpcService中通过method.invoke(service, request.getParams()); 实现的。返回一个Person对象,这个对象放在RpcResponse中传回RpcClient, RpcClient收到之后再返回客户端 response.getResult();

我们再将其输出,就得到了 Person [name=qjm, age=22, sex=男] 这个结果。

同理,对于System.out.println(service.printInfo(person)); 这行代码,传入了一个我们定义的Person对象。然后,RpcService会调用放在PersonServicelmpl 中的printInfo(person)方法。在这个方法中,如果 person对象不为空,那么就会在客户的serviceTest端输出person的信息,然后返回true。这个true通过socket 传给RpcClient并经其返回给客户的clientTest端,打印为True。

这样,一个基于rpc协议的项目就被我们分析完了。

gRPC框架使用

Task2:按照下面步骤完成gRPC使用,并将其中关键步骤和实验结果写到实验报告中。

gRPC:在 gRPC 里客户端应用可以像调用本地对象一样直接调用另一台不同的机器上服务端应用的方法,使得您能够更容易地创建分布式应用和服务。与许多 RPC 系统类似,gRPC 也是基于以下理念:定义一个服务,指定其能够被远程调用的方法(包含参数和返回类型)。在服务端实现这个接口,并运行一个 gRPC 服务器来处理客户端调用。在客户端拥有一个存根能够像服务端一样的方法。

新建一个普通的Maven项目

配置pom文件,导入grpc的依赖和插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.grpcprojects</groupId>
<artifactId>grpcExercise3</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<grpc-version>1.20.0</grpc-version>
</properties>
<dependencies>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc-version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc-version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration> <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.9.1:exe:${os.detected.classifier}</pluginArtifact>
<protoSourceRoot>src/main/proto</protoSourceRoot>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

编写proto文件

在项目main目录下新建一个proto文件夹,再在此文件夹下创建一个helloworld.proto文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}

编译proto文件

  • 右击Maven.Projects\protobuf\protobuf:compile ,选择run,生成用于序列化的java文件。
  • 再右击Maven.Projects\protobuf\protobuf:compile-custom,选择run,生成用于rpc的java代码。

添加客户端和服务端代码

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package helloworld;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;

import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A simple client that requests a greeting from the {@link HelloWorldServer}.
*/
public class HelloWorldClient {
private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());

private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub;

/** Construct client connecting to HelloWorld server at {@code host:port}. */
public HelloWorldClient(String host, int port) {
this(ManagedChannelBuilder.forAddress(host, port)
// Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
// needing certificates.
.usePlaintext()
.build());
}

/** Construct client for accessing HelloWorld server using the existing channel. */
HelloWorldClient(ManagedChannel channel) {
this.channel = channel;
blockingStub = GreeterGrpc.newBlockingStub(channel);
}

public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}

/** Say hello to server. */
public void greet(String name) {
logger.info("Will try to greet " + name + " ...");
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
response = blockingStub.sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}
logger.info("Greeting: " + response.getMessage());
}

/**
* Greet server. If provided, the first element of {@code args} is the name to use in the
* greeting.
*/
public static void main(String[] args) throws Exception {
HelloWorldClient client = new HelloWorldClient("localhost", 50051);
try {
/* Access a service running on the local machine on port 50051 */
String user = "world";
if (args.length > 0) {
user = args[0]; /* Use the arg as the name to greet if provided */
}
client.greet(user);
} finally {
client.shutdown();
}
}
}
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package helloworld;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.examples.helloworld.GreeterGrpc;
import io.grpc.examples.helloworld.HelloReply;
import io.grpc.examples.helloworld.HelloRequest;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.logging.Logger;

/**
* Server that manages startup/shutdown of a {@code Greeter} server.
*/
public class HelloWorldServer {
private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());

private Server server;

private void start() throws IOException {
/* The port on which the server should run */
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
logger.info("Server started, listening on " + port);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServer.this.stop();
System.err.println("*** server shut down");
}
});
}

private void stop() {
if (server != null) {
server.shutdown();
}
}
/**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
private void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}

/**
* Main launches the server from the command line.
*/
public static void main(String[] args) throws IOException, InterruptedException {
final HelloWorldServer server = new HelloWorldServer();
server.start();
server.blockUntilShutdown();
}

static class GreeterImpl extends GreeterGrpc.GreeterImplBase {

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}

一开始,会报错:

但事实上这是因为还没有把proto文件更新到项目中去,点击Maven上菜单栏中的文件夹图标即可修复。运行结果如下:

项目结构如下:

-------------本文结束,感谢您的阅读-------------