分布式服务框架学习1-一个最简单的RPC服务

一、实现思路

  • SocketServer作为服务端,Socket作为客户端。
  • 定义公共接口,例如XXXService。
  • 在服务端,定义XXXService的实现类,例如XXXService。
  • 客户端将要调用XXXService、方法名、参数,通过Socket的中的OutPutStream传到服务端,服务端利用反射拿到要调用的实现类、方法、参数类型、参数进行本地调用,然后通过Socket返回给客户端。
1
整个过程,基本上述所描述,基本的RPC思路已经出来了(先不讨论分布式服务框架的层面),单单作为RPC已经足够,那么SocketServer和Socket对于要传的参数的含义、顺序等应该打成一致,其实也就是所谓的协议,后面在介绍dubbo源码的时候再进行分析。

二、实现代码(来自《分布式服务框架原理与实践》一书)

注意:一定要注意传参的顺序

1. 服务接口类(客户端与服务端共享)

1
2
3
4
5
6
7
8
9
10
11
12
package com.sohu.train.rpc.service;
/**
* echo服务
* @author leifu
* @Date 2016年5月26日
* @Time 下午2:08:13
*/
public interface EchoService {
String echo(String ping);
}

2. 服务实现类(服务端)

1
2
3
4
5
6
7
8
9
10
11
12
package com.sohu.train.rpc.service.impl;
import com.sohu.train.rpc.service.EchoService;
public class EchoServiceImpl implements EchoService {
@Override
public String echo(String ping) {
return "hello " + ping;
}
}

3. 通过Socket的形式(SocketServer实现)对外暴露服务(服务端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.sohu.train.rpc.exporter;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* @author leifu
* @Date 2016年5月26日
* @Time 下午2:10:41
*/
public class RpcExporter {
// 定义线程池
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void exporter(String hostname, int port) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(hostname, port));
try {
while (true) {
Socket socket = serverSocket.accept();
executor.execute(new ExporterTask(socket));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
private static class ExporterTask implements Runnable {
Socket client;
public ExporterTask(Socket client) {
this.client = client;
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
//读取客户端的数据,并解析相应参数,利用反射最终生成目标对应,调用对应的方法。
input = new ObjectInputStream(client.getInputStream());
String interfaceName = input.readUTF();
String methodName = input.readUTF();
Class<?> service = Class.forName(interfaceName);
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Method method = service.getMethod(methodName, parameterTypes);
Object result = method.invoke(service.newInstance(), arguments);
output = new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (output != null) {
try {
output.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}

4. 拿到目标服务的代理(客户端),利用的是反向代理

**很长时间没见动态代理了,注意这个importer方法拿到的EchoService的代理实现类,这个方法只是返回了代理对象,没有执行操作,当客户端拿到这个代理对象时候,调用echo方法就会执行InvocationHandler的invoke方法,这个方法实现对服务端的调用,所以如果使用现成的服务框架,这个动态代理的逻辑是透明的。**
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.sohu.train.rpc.importer;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
public class RpcImporter<T> {
public T importer(final Class<?> serviceClass, final InetSocketAddress addr) {
return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{serviceClass.getInterfaces()[0]}, new InvocationHandler() {
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream output = null;
ObjectInputStream input = null;
try {
socket = new Socket();
socket.connect(addr);
output = new ObjectOutputStream(socket.getOutputStream());
output.writeUTF(serviceClass.getName());
output.writeUTF(method.getName());
output.writeObject(method.getParameterTypes());
output.writeObject(args);
input = new ObjectInputStream(socket.getInputStream());
return input.readObject();
} catch (Exception e) {
e.printStackTrace();
} finally {
if(socket != null) {
socket.close();
}
if(output != null){
output.close();
}
if(input != null){
input.close();
}
}
return null;
}
});
}
}

5. 测试

初始化服务端,生成一个RpcImporter对象,调用importer方法拿到EchoService的代理类,使用代理类调用echo方法拿到返回结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.sohu.train.rpc.main;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import com.sohu.train.rpc.exporter.RpcExporter;
import com.sohu.train.rpc.importer.RpcImporter;
import com.sohu.train.rpc.service.EchoService;
import com.sohu.train.rpc.service.impl.EchoServiceImpl;
/**
* rpc测试
*
* @author leifu
* @Date 2016年5月26日
* @Time 下午3:30:49
*/
public class RpcTest {
private final static String hostName = "localhost";
private final static int port = 8088;
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
RpcExporter.exporter(hostName, port);
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
TimeUnit.SECONDS.sleep(3);
RpcImporter<EchoService> importer = new RpcImporter<EchoService>();
EchoService echo = importer.importer(EchoServiceImpl.class, new InetSocketAddress(hostName, port));
System.out.println(echo.echo("carlosfu"));
}
}

三、总结

上述过程完成一个基本的RPC实现,很多服务框架虽然复杂,但是也是借助了这种思想,只不过分布式服务框架要做的事情要复杂的多,而且对上述很多东西做了一些抽象,例如协议,Invoker, Exporter等等,后期随着不断学习,相信会逐步了解。