Netty案例研究:Transport 的遷移

2021-12-02 09:42 更新

本節(jié)將會(huì)通過(guò)一個(gè)簡(jiǎn)單的應(yīng)用程序來(lái)讓您看看在 Netty  Transport(傳輸) 是如何工作的,這個(gè)應(yīng)用程序要做的事情非常簡(jiǎn)單,只需要連接好客戶端,并且向客戶端發(fā)送字符串“Hi!”的信息,信息發(fā)送完之后連接就斷開了。

不用 Netty 實(shí)現(xiàn) I/O 和 NIO

我們將不用 Netty 實(shí)現(xiàn) I/O 和 NIO,而是使用 JDK API 來(lái)實(shí)現(xiàn) I/O 和 NIO。下面這個(gè)例子,是使用阻塞 IO 實(shí)現(xiàn)的例子:

Listing 4.1 Blocking networking without Netty

public class PlainOioServer {

    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);     //1
        try {
            for (;;) {
                final Socket clientSocket = socket.accept();    //2
                System.out.println("Accepted connection from " + clientSocket);

                new Thread(new Runnable() {                        //3
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                            out.flush();
                            clientSocket.close();                //5

                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();                                        //6
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.綁定服務(wù)器到指定的端口。

2.接受一個(gè)連接。

3.創(chuàng)建一個(gè)新的線程來(lái)處理連接。

4.將消息發(fā)送到連接的客戶端。

5.一旦消息被寫入和刷新時(shí)就 關(guān)閉連接。

6.啟動(dòng)線程。

上面的方式可以工作正常,但是這種阻塞模式在大連接數(shù)的情況就會(huì)有很嚴(yán)重的問(wèn)題,如客戶端連接超時(shí),服務(wù)器響應(yīng)嚴(yán)重延遲,性能無(wú)法擴(kuò)展。為了解決這種情況,我們可以使用異步網(wǎng)絡(luò)處理所有的并發(fā)連接,但問(wèn)題在于 NIO 和 OIO 的 API 是完全不同的,所以一個(gè)用 OIO 開發(fā)的網(wǎng)絡(luò)應(yīng)用程序想要使用 NIO 重構(gòu)代碼幾乎是重新開發(fā)。

下面代碼是使用 NIO 實(shí)現(xiàn)的例子:

Listing 4.2 Asynchronous networking without Netty

public class PlainNioServer {
    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);                                            //1
        Selector selector = Selector.open();                        //2
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (;;) {
            try {
                selector.select();                                    //4
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {                //6
                        ServerSocketChannel server =
                                (ServerSocketChannel)key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE |
                                SelectionKey.OP_READ, msg.duplicate());    //7
                        System.out.println(
                                "Accepted connection from " + client);
                    }
                    if (key.isWritable()) {                //8
                        SocketChannel client =
                                (SocketChannel)key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer)key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {        //9
                                break;
                            }
                        }
                        client.close();                    //10
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // 在關(guān)閉時(shí)忽略
                    }
                }
            }
        }
    }
}

1.綁定服務(wù)器到制定端口

2.打開 selector 處理 channel

3.注冊(cè) ServerSocket 到 ServerSocket ,并指定這是專門用來(lái)接受連接的。

4.等待新的事件來(lái)處理。這將阻塞,直到一個(gè)事件是傳入。

5.從收到的所有事件中 獲取 SelectionKey 實(shí)例。

6.檢查該事件是一個(gè)新的連接準(zhǔn)備好接受。

7.接受客戶端,并用 selector 進(jìn)行注冊(cè)。

8.檢查 socket 是否準(zhǔn)備好寫數(shù)據(jù)。

9.將數(shù)據(jù)寫入到所連接的客戶端。如果網(wǎng)絡(luò)飽和,連接是可寫的,那么這個(gè)循環(huán)將寫入數(shù)據(jù),直到該緩沖區(qū)是空的。

10.關(guān)閉連接。

如你所見,即使它們實(shí)現(xiàn)的功能是一樣,但是代碼完全不同。下面我們將用 Netty 來(lái)實(shí)現(xiàn)相同的功能。

采用 Netty 實(shí)現(xiàn) I/O 和 NIO

下面代碼是使用 Netty 作為網(wǎng)絡(luò)框架編寫的一個(gè)阻塞 IO 例子:

Listing 4.3 Blocking networking with Netty

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();        //1

            b.group(group)                                    //2
             .channel(OioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {//3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();  //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();        //7
        }
    }
}

1.創(chuàng)建一個(gè) ServerBootstrap

2.使用 OioEventLoopGroup 允許阻塞模式(OIO)

3.指定 ChannelInitializer 將給每個(gè)接受的連接調(diào)用

4.添加的 ChannelHandler 攔截事件,并允許他們作出反應(yīng)

5.寫信息到客戶端,并添加 ChannelFutureListener 當(dāng)一旦消息寫入就關(guān)閉連接

6.綁定服務(wù)器來(lái)接受連接

7.釋放所有資源

下面代碼是使用 Netty NIO 實(shí)現(xiàn)。

Netty NIO 版本

下面是 Netty NIO 的代碼,只是改變了一行代碼,就從 OIO 傳輸 切換到了 NIO。

Listing 4.4 Asynchronous networking with Netty

public class NettyNioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();    //1
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())   //2
             .channel(NioServerSocketChannel.class)
             .localAddress(new InetSocketAddress(port))
             .childHandler(new ChannelInitializer<SocketChannel>() {    //3
                 @Override
                 public void initChannel(SocketChannel ch) 
                     throws Exception {
                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {    //4
                         @Override
                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
                             ctx.writeAndFlush(buf.duplicate())                //5
                                .addListener(ChannelFutureListener.CLOSE);
                         }
                     });
                 }
             });
            ChannelFuture f = b.bind().sync();                    //6
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();                    //7
        }
    }
}

1.創(chuàng)建一個(gè) ServerBootstrap

2.使用 NioEventLoopGroup 允許非阻塞模式

3.指定 ChannelInitializer 將給每個(gè)接受的連接調(diào)用

4.添加的 ChannelInboundHandlerAdapter() 接收事件并進(jìn)行處理

5.寫信息到客戶端,并添加 ChannelFutureListener 當(dāng)一旦消息寫入就關(guān)閉連接

6.綁定服務(wù)器來(lái)接受連接

7.釋放所有資源

我們之前提到過(guò) Netty 使用的是統(tǒng)一的 API,所以 Netty 中實(shí)現(xiàn)的每個(gè)傳輸都是用了同樣的 API,你使用什么來(lái)實(shí)現(xiàn)并不在它的關(guān)心范圍內(nèi)。Netty 通過(guò)操作接口 Channel 、ChannelPipeline 和 ChannelHandler 來(lái)實(shí)現(xiàn)。

使用基于 Netty 傳輸的好處顯而易見了吧,那么接下來(lái)我們就來(lái)看看傳輸?shù)?API。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)