Netty 简介 http://netty.io
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
本系列源码在https://github.com/sail-y/netty
Netty能做什么?
可以像tomcat一样做一个http服务器。
socket开发。
支持长连接开发,例如websocket。
例子1: Http 先忘记以前学过的servlet框架,netty并没有实现servlet的规范。
不多说,我们用netty先来写第一个例子,先跑起来试试看,后面再细说。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.sail.netty.firstexample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class TestServer { public static void main (String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new TestServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899 ).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package com.sail.netty.firstexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpServerCodec;public class TestServerInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("httpServerCodec" , new HttpServerCodec()); pipeline.addLast("testHttpServerHandler" , new TestHttpServerHandler()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.sail.netty.firstexample;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.*;import io.netty.util.CharsetUtil;public class TestHttpServerHandler extends SimpleChannelInboundHandler <HttpObject > { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpObject msg) throws Exception { ByteBuf content = Unpooled.copiedBuffer("Hello World" , CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain" ); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); ctx.writeAndFlush(response); } }
虽然看起来复杂,但是几乎所有的netty程序的代码流程都是这样的。 定义好boss和worker的group->在childHandler定义ServerInitializer->在initChannel中定义通道处理器->实现通道处理器相应的回调方法。
注意loop group是死循环,必须手动停止,接下来我们来测试。
运行TestServer.
命令行测试
接着我们看一下netty处理器的生命周期,改一下TestHttpServerHandler
的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.sail.netty.firstexample;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.*;import io.netty.util.CharsetUtil;import java.net.URI;public class TestHttpServerHandler extends SimpleChannelInboundHandler <HttpObject > { @Override protected void channelRead0 (ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { HttpRequest httpRequest = (HttpRequest) msg; System.out.println("请求方法名:" + httpRequest.method().name()); URI uri = new URI(httpRequest.uri()); if ("/favicon.ico" .equals(uri.getPath())) { System.out.println("请求favicon.ico" ); return ; } ByteBuf content = Unpooled.copiedBuffer("Hello World" , CharsetUtil.UTF_8); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain" ); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); ctx.writeAndFlush(response); } } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive" ); super .channelActive(ctx); } @Override public void channelRegistered (ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered" ); super .channelRegistered(ctx); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded" ); super .handlerAdded(ctx); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive" ); super .channelInactive(ctx); } @Override public void channelUnregistered (ChannelHandlerContext ctx) throws Exception { System.out.println("channelUnregistered" ); super .channelUnregistered(ctx); } }
这个代码用curl
和浏览器访问得到的结果是不一样的,因为netty没有遵循servlet的规范,所以有些地方我们得自己处理。
这是curl
工具的输出:
这是浏览器第一次访问的输出:
这是浏览器第二次访问的输出:
那是因为curl
每次请求完之后就断了。而在http1.1协议下,浏览器有一个keep-alive功能来决定服务端收到请求什么时候关闭这个连接。所以我在多等了一会后,服务器还是自动关闭了连接,控制台还是输出了channelInactive
和channelUnregistered
例子2: Socket 代码演示
server 先写server端代码
MyServer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.sail.netty.secondexample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyServer { public static void main (String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899 ).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerHandler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.sail.netty.secondexample;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import java.util.UUID;public class MyServerHandler extends SimpleChannelInboundHandler <String > { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress() + ", " + msg); ctx.channel().writeAndFlush("from server: " + UUID.randomUUID()); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
MyServerInitializer .java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.sail.netty.secondexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;public class MyServerInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0 ,4 ,0 ,4 )); pipeline.addLast(new LengthFieldPrepender(4 )); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyServerHandler()); } }
client 客户端代码
MyClient.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.sail.netty.secondexample;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;public class MyClient { public static void main (String[] args) throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientInitializer()); ChannelFuture channelFuture = bootstrap.connect("localhost" , 8899 ).sync(); channelFuture.channel().closeFuture().sync(); }finally { eventLoopGroup.shutdownGracefully(); } } }
MyClientHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.sail.netty.secondexample;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.concurrent.EventExecutorGroup;import java.time.LocalDateTime;public class MyClientHandler extends SimpleChannelInboundHandler <String > { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(ctx.channel().remoteAddress()); System.out.println("client output: " + msg); ctx.writeAndFlush("from clinet: " + LocalDateTime.now()); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.channel().writeAndFlush("1" ); } }
MyClientInitializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.sail.netty.secondexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.LengthFieldPrepender;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;public class MyClientInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0 ,4 ,0 ,4 )); pipeline.addLast(new LengthFieldPrepender(4 )); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyClientHandler()); } }
例子3:聊天 下面做一个简单的消息广播,服务端1个,客户端3个,每个客户端上线的时候,服务端就对每个客户端广播上线的消息,下线同理。客户端发送消息的时候,服务端也会对每一个客户端进行广播。
MyChatServer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.sail.netty.thirdexample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;public class MyChatServer { public static void main (String[] args) throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new MyChatServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899 ).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyChatServerHandler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 package com.sail.netty.thirdexample;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;public class MyChatServerHandler extends SimpleChannelInboundHandler <String > { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch -> { if (channel != ch) { ch.writeAndFlush(channel.remoteAddress() + " 发送的消息:" + msg + "\n" ); } else { ch.writeAndFlush("【自己】 " + msg + "\n" ); } }); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("【服务器】 - " + channel.remoteAddress() + " 加入\n" ); channelGroup.add(channel); } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("【服务器】 - " + channel.remoteAddress() + " 离开\n" ); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 上线" ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + " 下线" ); } }
MyChatServerInitializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.sail.netty.thirdexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.Delimiters;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;public class MyChatServerInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096 , Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyChatServerHandler()); } }
MyChatClient.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 package com.sail.netty.thirdexample;import com.sail.netty.secondexample.MyClientInitializer;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;public class MyChatClient { public static void main (String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitializer()); Channel channel = bootstrap.connect("localhost" , 8899 ).sync().channel(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for (;;) { channel.writeAndFlush(br.readLine() + "\r\n" ); } }finally { eventLoopGroup.shutdownGracefully(); } } }
MyChatClientHandler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.sail.netty.thirdexample;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.util.concurrent.EventExecutorGroup;public class MyChatClientHandler extends SimpleChannelInboundHandler <String > { @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }
MyChatClientInitializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.sail.netty.thirdexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.Delimiters;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import io.netty.util.CharsetUtil;public class MyChatClientInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096 , Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new MyChatClientHandler()); } }
例子4:heartbeat(心跳) 下面演示一个集群中常见的心跳用netty的实现方式。
MyServer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.sail.netty.forthexample;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;public class MyServer { public static void main (String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new MyServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(8899 ).sync(); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerHandler.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package com.sail.netty.forthexample;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.handler.timeout.IdleStateEvent;public class MyServerHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null ; switch (event.state()) { case READER_IDLE: eventType = "读空闲" ; break ; case WRITER_IDLE: eventType = "写空闲" ; break ; case ALL_IDLE: eventType = "读写空闲" ; break ; } System.out.println(ctx.channel().remoteAddress() + " 超时事件: " + eventType); ctx.channel().close(); } } }
MyServerInitializer.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.sail.netty.forthexample;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class MyServerInitializer extends ChannelInitializer <SocketChannel > { @Override protected void initChannel (SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(5 , 7 , 3 , TimeUnit.SECONDS)); pipeline.addLast(new MyServerHandler()); } }