Netty学习3-Netty服务端基本结构

一、介绍

1. 版本介绍

netty的版本大致分为netty3.x和netty4.x、netty5.x

2. netty可以运用在那些领域?

    1. 分布式进程通信
      例如: hadoop、dubbo、akka等具有分布式功能的框架,底层RPC通信都是基于netty实现的,这些框架使用的版本通常都还在用netty3.x
    1. 游戏服务器开发
      最新的游戏服务器有部分公司可能已经开始采用netty4.x 或 netty5.x

二、依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
<version>3.10.4.Final</version>
</dependency>

三、启动Netty服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.sohu.tv.netty.train.server.main;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import com.sohu.tv.netty.train.server.handler.ServerHelloHandler;
/**
* netty服务端
*
* @author leifu
* @Date 2016-5-29
* @Time 上午10:10:56
*/
public class NettyServer {
private final static int PORT = 8888;
public static void main(String[] args) {
// 服务类
ServerBootstrap bootstrap = new ServerBootstrap();
// boss线程池负责监控端口,worker线程池负责数据读写
ExecutorService bossPool = Executors.newCachedThreadPool();
ExecutorService workerPool = Executors.newCachedThreadPool();
// 设置niosocket工厂
bootstrap.setFactory(new NioServerSocketChannelFactory(bossPool, workerPool));
// 设置管道工厂
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("helloHandler", new ServerHelloHandler());
return pipeline;
}
});
bootstrap.bind(new InetSocketAddress(PORT));
System.out.println("netty server start!");
}
}

nettyServer的初始化基本是这样的,可以参考一下注释,不是很复杂,应该是封装的比较好,可以看一下Dubbo对于Netty的服务端是差不多的,就是一些细节不太一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
/*
* Copyright 1999-2011 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.remoting.transport.netty;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ExecutorUtil;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.Server;
import com.alibaba.dubbo.remoting.transport.AbstractServer;
import com.alibaba.dubbo.remoting.transport.dispather.ChannelHandlers;
/**
* NettyServer
*
* @author qian.lei
* @author chao.liuc
*/
public class NettyServer extends AbstractServer implements Server {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private Map<String, Channel> channels; // <ip:port, channel>
private ServerBootstrap bootstrap;
private org.jboss.netty.channel.Channel channel;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
@Override
protected void doOpen() throws Throwable {
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
public Collection<Channel> getChannels() {
Collection<Channel> chs = new HashSet<Channel>();
for (Channel channel : this.channels.values()) {
if (channel.isConnected()) {
chs.add(channel);
} else {
channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
}
}
return chs;
}
public Channel getChannel(InetSocketAddress remoteAddress) {
return channels.get(NetUtils.toAddressString(remoteAddress));
}
public boolean isBound() {
return channel.isBound();
}
}

四、Handler:处理客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package com.sohu.tv.netty.train.server.handler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* 服务端handler
* @author leifu
* @Date 2016-5-29
* @Time 上午10:20:23
*/
public class ServerHelloHandler extends SimpleChannelHandler {
/**
* 接收消息
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
System.out.println("client messageReceived");
// System.out.println("message is: " + e.getMessage());
// 方法一:直接解析MessageEvent
// ChannelBuffer message = (ChannelBuffer) e.getMessage();
// String s = new String(message.array());
// System.out.println("string message is " + s);
// 方法二:在NettyServer的管道中定义decoder,详见StringDecoder
String s = (String) e.getMessage();
System.out.println("string message is " + s);
// 回写消息
// 方法一:直接发送ChannelBuffer
String reply = "hello";
// ChannelBuffer replyBuffer = ChannelBuffers.copiedBuffer(reply.getBytes());
// ctx.getChannel().write(replyBuffer);
// 方法二:在NettyServer的管道中定义encoder,详见StringDecoder
ctx.getChannel().write(reply);
super.messageReceived(ctx, e);
}
/**
* 捕获异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("client exceptionCaught");
super.exceptionCaught(ctx, e);
}
/**
* 新建连接
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("client channelConnected");
super.channelConnected(ctx, e);
}
/**
* 连接必须是已经建立的,关闭通道的时候才会触发
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("client channelDisconnected");
super.channelDisconnected(ctx, e);
}
/**
* channel关闭的时候触发
*/
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
System.out.println("client channelClosed");
super.channelClosed(ctx, e);
}
}

基本说明:

SimpleChannelHandler处理消息接收和写:

  • messageReceived接收消息
    解析消息的方法有两种,详见代码注释,注意如果想直接使用string,请在NettyServer加入new StringDecoder(),具体可以参考StringDecoder的decode方法。
    发送消息的方法有两种,详见代码注释,注意如想想直接使用string,请在NettyServer加入new StringEncoder(),具体可以参考StringEncoder的encode方法。

  • channelConnected新连接(例如:用来检测IP是否是黑名单)

  • channelDisconnected链接关闭,(例如:用户断线的时候清楚用户的缓存数据等)

五、使用Telnet测试

  1. windows7默认没有提供telnet,需要配置一下既可。具体可以参考:http://jingyan.baidu.com/article/870c6fc3cd6fa9b03fe4bee4.html

  2. 使用方法,cmd或者powershell

  • 连接服务端
    1
    telnet ip port

连接

  • 发送命令
    连接后,使用ctrl + ]就可以发送命令
    send
  • 服务端的日志

    1
    2
    3
    4
    netty server start!
    client channelConnected
    client messageReceived
    string message is hello
  • 接受命令
    使用ctrl + ]进入后,回车即可看到服务端返回的消息
    接受命令