Netty4自学笔记 (3) - Netty NIO Server和Client 样例说明
更新节奏缓慢,因为每晚学习注意力不够集中,学习进展缓慢。本还给自己找了一大堆其他理由,但摸着良心问自己,似乎只有这个理由说的通。
想搞懂的太多,却始终没搞明白。先看一个用Netty编写的NIO Server的样例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.coderli.nettylab.guide;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author lihongzhe 2018/7/24 23:19
*/
public class NettyNioServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(7060).sync(); // (5)
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ServerHanler代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.coderli.nettylab.guide;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author lihongzhe 2018/7/24 23:58
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("Receive Msg.");
((ByteBuf) msg).release();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
上述代码改自Netty官方手册。NettyNioServer代码中做了几处标记,分别对应我们在Netty4 自学笔记(2)中讨论的关键点,简析如下:
- 标记1是构建了两个线程池,我们在Java NIO学习中提到的,针对Select的实现方式,如果想要实现并发,只需要在事件处理时,启用多线程处理即可,此处的workerGroup正是为此服务。而在Netty中,不仅仅对于事件到达后的处理启用了线程池,为了实现高并发,Netty开启多个线程注册多个Selector同时处理事件。Netty中,默认线程池数量为,cpu核心数 * 2(NettyRuntime.availableProcessors() * 2),个人认为Selector线程数过多意义也不大,关键还是在于事件分发后的后续处理,即work线程。同样,这里worker线程池的默认数量与boss一致,因此在业务实现中应注意异步处理worker中的回调,以免堵塞worker,影响并发。
- 标记2的ServerBootstrap是Netty封装的统一配置并启动Server的启动器,在Java NIO学习中提到的Channel(标记3)和事件处理器Handler(标记4),都统一配置在此。标记3处指定不同类型的Channel(例如:Nio、Oio),底层便可方便的在不同的通信模式下进行切换,上层无感知。
- 标记5,一切配置好后,启动器绑定到指定端口。
从样例代码的直观感觉来说,Netty提供了良好的封装,无论是Server还是事件处理的Handler,Netty几乎帮我们做好了一切。对于开发人员来说,只需要关注于Netty提供的Handler的回调时机,开发自己的业务逻辑,比直接使用Java NIO的API节约了很多的开工作量,而且保证了代码的健壮性和多线程支持。
因此,我下一步的思路就是去研究一下Netty中Handler的回调机制,真正掌握才可开发逻辑正确的代码。
本文的最后,把Client端的代码补充完整,以便调试,
NettyNioClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.coderli.nettylab.guide;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* @author lihongzhe 2018/8/6 22:55
*/
public class NettyNioClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 7060).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
ClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.coderli.nettylab.guide;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
/**
* @author lihongzhe 2018/8/6 23:13
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Registered, Client.");
ctx.fireChannelRegistered();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("Handler added.");
buf = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release();
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m);
m.release();
if (buf.readableBytes() >= 4) {
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
后续的研究,我也会基于上述代码加以改造和调试。
本文由作者按照 CC BY 4.0 进行授权