哦哇資訊網

netty入門到彈幕實戰

由 青槓山上 發表于 美食2022-12-01

一、IO模型

阻塞式IO模型

非阻塞式IO模型

IO複用

訊號驅動式IO

非同步IO

Linux IO流程

2。 各個IO模型的比較

NIO的優勢

事件驅動模型 避免多執行緒 單執行緒處理多工

非阻塞,IO讀寫不再阻塞,而是返回0

基於通道的傳輸,比基於流更有效率

更高階的IO函式,零複製

IO多路複用大大提高了java網路應用的可伸縮性和實用性

NIO的缺點

程式設計困難

陷阱重重

3。TCP粘包,拆包問題

問題原因:TCP協議傳送時造成

Netty解決方案:

4。Netty的零複製

傳統複製方式

資料從磁碟讀取到核心的read buffer

資料從核心緩衝區複製到使用者緩衝區

資料從使用者緩衝區複製到核心的socket buffer

資料從核心的socket buffer複製到網絡卡介面(硬體)的緩衝區

零複製

呼叫transferTo,資料從檔案由DMA引擎複製到核心read buffer

接著DMA從核心read buffer將資料複製到網絡卡介面buffer

Netty中的零複製體現在這三個方面:

1。bytebuffer

Netty傳送和接收訊息主要使用bytebuffer,bytebuffer使用對外記憶體(DirectMemory)直接進行Socket讀寫。

2。Composite Buffers

傳統的ByteBuffer,如果需要將兩個ByteBuffer中的資料組合到一起,我們需要首先建立一個size=size1+size2大小的新的陣列,然後將兩個陣列中的資料複製到新的陣列中。但是使用Netty提供的組合ByteBuf,就可以避免這樣的操作,因為CompositeByteBuf並沒有真正將多個Buffer組合起來,而是儲存了它們的引用,從而避免了資料的複製,實現了零複製。

3。對於FileChannel。transferTo的使用

Netty中使用了FileChannel的transferTo方法,該方法依賴於作業系統實現零複製。

二、Netty元件

Channel – 對應NIO中的channel

EventLoop ——對應NIO中的while迴圈

ChannelHandler和ChannelPipline ——對應NIO客戶邏輯實現handleRead和handleWrite

ByteBuf ——對應Nio中的ByteBuffer

BootStrap和BootServerStrap ——對應NIO中的Selecter、ServerSocketChannel等的建立、配置、啟動

Reactor執行緒模型

Rector執行緒模型有三種形式:

1。單執行緒模型:

2。多執行緒模型:

netty入門到彈幕實戰

3。mutiple模型

netty入門到彈幕實戰

Netty對這三種模式都有支援

三、簡單的例子

1。引入pom包

io。netty netty-all 4。1。42。Final

2。伺服器端

public class TimeServer { public void bind(int port) throws InterruptedException { EventLoopGroup boosGroup=new NioEventLoopGroup(); EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ ServerBootstrap b=new ServerBootstrap(); b。group(boosGroup,workerGroup) 。channel(NioServerSocketChannel。class) 。option(ChannelOption。SO_BACKLOG,1024) 。childHandler(new ChildChannelHandler()); //繫結埠,同步等待成功 ChannelFuture future = b。bind(port)。sync(); //等待服務端監聽關閉 future。channel()。closeFuture()。sync(); }finally { //優雅退出,釋放執行緒池資源 boosGroup。shutdownGracefully(); workerGroup。shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new TimeServer()。bind(8888); } private class ChildChannelHandler extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel。pipeline()。addLast(new TimeServerHandler()); } }}

public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf=(ByteBuf)msg; byte[] req=new byte[buf。readableBytes()]; buf。readBytes(req); String body=new String(req,“UTF-8”); System。out。println(body); SimpleDateFormat simpleDateFormat = new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss”); String currentTime=simpleDateFormat。format(new Date()); ByteBuf resp = Unpooled。copiedBuffer(currentTime。getBytes()); ctx。write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx。flush(); }}

實戰彈幕系統

java:

public class WebSocketDanmuServer { private int port; public WebSocketDanmuServer(int port) { this。port = port; } public void run(){ EventLoopGroup bossGroup=new NioEventLoopGroup(1); EventLoopGroup workGroup=new NioEventLoopGroup(8); try { ServerBootstrap b=new ServerBootstrap(); b。group(bossGroup,workGroup) 。channel(NioServerSocketChannel。class) 。childHandler(new WebsocketDanmuServerInitializer()) 。option(ChannelOption。SO_BACKLOG,128) 。childOption(ChannelOption。SO_KEEPALIVE,true); System。out。println(“彈幕系統啟動了 ”+port); ChannelFuture future = b。bind(port)。sync(); future。channel()。closeFuture()。sync(); } catch (InterruptedException e) { e。printStackTrace(); }finally { workGroup。shutdownGracefully(); bossGroup。shutdownGracefully(); } } public static void main(String[] args) { new WebSocketDanmuServer(8080)。run(); }}

public class WebsocketDanmuServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel。pipeline(); pipeline。addLast(“http-decodec”,new HttpRequestDecoder()); pipeline。addLast(“http-aggregator”,new HttpObjectAggregator(65536)); pipeline。addLast(“http-encodec”,new HttpResponseEncoder()); pipeline。addLast(“http-chunked”,new ChunkedWriteHandler()); /* pipeline。addLast(new HttpServerCodec()); pipeline。addLast(new HttpObjectAggregator(64*1024)); pipeline。addLast(new ChunkedWriteHandler()); */ pipeline。addLast(“http-request”,new HttpRequestHandler(“/ws”)); pipeline。addLast(“WebSocket-protocol”,new WebSocketServerProtocolHandler(“/ws”)); pipeline。addLast(“WebSocket-request”,new TextWebSocketFrameHandler()); }}

public class HttpRequestHandler extends SimpleChannelInboundHandler { private final String wsUri; private static final File INDEX; static { URL location=HttpRequestHandler。class。getProtectionDomain()。getCodeSource()。getLocation(); try { String path = location。toURI() + “WebsocketDanMu。html”; path = !path。contains(“file:”) ? path : path。substring(5); INDEX = new File(path); } catch (URISyntaxException e) { throw new IllegalStateException(“Unable to locate WebsocketChatClient。html”, e); } } public HttpRequestHandler(String wsUri) { this。wsUri = wsUri; } private static void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion。HTTP_1_1, HttpResponseStatus。CONTINUE); ctx。writeAndFlush(response); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { if(wsUri。equalsIgnoreCase(request。getUri())){ ctx。fireChannelRead(request。retain()); }else { if(HttpHeaders。is100ContinueExpected(request)){ send100Continue(ctx); } RandomAccessFile file = new RandomAccessFile(INDEX, “r”);//4 HttpResponse response = new DefaultHttpResponse(request。getProtocolVersion(), HttpResponseStatus。OK); response。headers()。set(HttpHeaders。Names。CONTENT_TYPE, “text/html; charset=UTF-8”); boolean keepAlive = HttpHeaders。isKeepAlive(request); if (keepAlive) { //5 response。headers()。set(HttpHeaders。Names。CONTENT_LENGTH, file。length()); response。headers()。set(HttpHeaders。Names。CONNECTION, HttpHeaders。Values。KEEP_ALIVE); } ctx。write(response); //6 if (ctx。pipeline()。get(SslHandler。class) == null) { //7 ctx。write(new DefaultFileRegion(file。getChannel(), 0, file。length())); } else { ctx。write(new ChunkedNioFile(file。getChannel())); } ChannelFuture future = ctx。writeAndFlush(LastHttpContent。EMPTY_LAST_CONTENT); //8 if (!keepAlive) { future。addListener(ChannelFutureListener。CLOSE); //9 } file。close(); } }}

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler { public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor。INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { Channel incoming=ctx。channel(); for (Channel channel:channels){ if(channel!=incoming){ channel。writeAndFlush(new TextWebSocketFrame(msg。text())); }else { channel。writeAndFlush(new TextWebSocketFrame(“我傳送的 ”+msg。text())); } } } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx。channel(); // Broadcast a message to multiple Channels channels。writeAndFlush(new TextWebSocketFrame(“[SERVER] - ” + incoming。remoteAddress() + “ 加入”)); channels。add(incoming); System。out。println(“Client:”+incoming。remoteAddress() +“加入”); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx。channel(); // Broadcast a message to multiple Channels channels。writeAndFlush(new TextWebSocketFrame(“[SERVER] - ” + incoming。remoteAddress() + “ 離開”)); System。err。println(“Client:”+incoming。remoteAddress() +“離開”); // A closed Channel is automatically removed from ChannelGroup, // so there is no need to do “channels。remove(ctx。channel());” } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx。channel(); System。out。println(“Client:”+incoming。remoteAddress()+“線上”); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx。channel(); System。err。println(“Client:”+incoming。remoteAddress()+“掉線”); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) // (7) throws Exception { Channel incoming = ctx。channel(); System。err。println(“Client:”+incoming。remoteAddress()+“異常”); // 當出現異常就關閉連線 cause。printStackTrace(); ctx。close(); }}

html:

<!DOCTYPE html PUBLIC “-//W3C//DTD HTML 4。01 Transitional//EN” “http://www。w3。org/TR/html4/loose。dtd”> 彈幕網站 開啟彈幕<!—— dm start ——>

<!—— d_screen start ——>
X
<!—— end d_screen ——> <!—— send start ——>
<!—— end send ——>
<!—— end dm——><!————>

TAG: newCTXpublicfunctionTOP