study

netty

4.x 사용자 가이드

User guide for 4.x

Netty 소개

  1. 유지 보수가 가능한 고성능, 고확장성 프로토콜 서버와 클라이언트를 빠르게 개발할 수 있는 비동기 이벤트 기반 네트워크 애플리케이션 프레임워크
  2. 프로토콜 서버 및 클라이언트와 같은 네트워크 애플리케이션을 빠르고 쉽게 개발할 수 있는 NIO 클라이언트 서버 프레임워크
  3. TCP 및 UDP 소켓 서버 개발과 같은 네트워크 프로그래밍을 크게 단순화하고 효율화
  4. FTP, SMTP, HTTP, 그리고 다양한 바이너리 및 텍스트 기반 레거시 프로토콜을 구현하면서 얻은 경험을 바탕으로 신중하게 설계

최소 요구 사항

  1. Netty 최신 버전과
  2. JDK 1.6 이상

Discard 서버 작성

세상에서 가장 단순한 프로토콜은 ‘Hello, World!’가 아니라, 수신된 데이터를 아무런 응답 없이 버리는 프로토콜입니다. DISCARD

Netty에서 생성된 I/O 이벤트를 처리하는 핸들러 구현

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  1. DiscardServerHandlerChannelInboundHandlerAdapter의 구현체입니다.
    • ChannelInboundHandler, ChannelInboundHandler 오버라이드할 수 있는 다양한 이벤트 핸들러 메서드를 제공합니다.
    • 현재로서는 ChannelInboundHandlerAdapter 핸들러 인터페이스를 직접 구현하기보다는 확장하는 것만으로도 충분합니다.
  2. 여기서 이벤트 핸들러 메서드를 재정의합니다
    • channelRead() 이 메서드는 클라이언트로부터 새 데이터를 수신할 때마다 수신된 메시지와 함께 호출됩니다.
    • 이 예제에서 수신된 메시지의 유형은 ByteBuf 입니다.
  3. 프로토콜을 구현하려면 DISCARD핸들러가 수신된 메시지를 무시해야 합니다.
    • ByteBufrelease() 메서드를 통해 명시적으로 해제해야 하는 참조 카운트 객체입니다.
    • 핸들러에 전달된 참조 카운트 객체를 해제하는 것은 핸들러의 책임이라는 점에 유의하세요.
    • 일반적으로 channelRead()핸들러 메서드는 다음과 같이 구현됩니다.
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) {
       try {
         // Do something with msg
       } finally {
         ReferenceCountUtil.release(msg);
       }
      }
      
  4. 이벤트 exceptionCaught()핸들러 메서드는 Netty에서 I/O 오류로 인해 예외가 발생하거나, 이벤트 처리 중 발생한 예외로 인해 핸들러 구현에서 예외가 발생하면 Throwable과 함께 호출됩니다.
    • 대부분의 경우, 발생한 예외는 로깅되고 관련 채널은 여기서 닫힙니다.
    • 하지만 이 메서드의 구현 방식은 예외 상황을 처리하기 위해 수행하려는 작업에 따라 달라질 수 있습니다.
    • 예를 들어, 연결을 닫기 전에 오류 코드가 포함된 응답 메시지를 전송할 수 있습니다.

서버를 시작하는 main()메서드를 작성

package io.netty.example.discard;
    
import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
    
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}
  1. NioEventLoopGroup는 I/O 작업을 처리하는 다중 스레드 이벤트 루프입니다.
    • Netty는 EventLoopGroup 다양한 종류의 전송에 대해 다양한 구현을 제공합니다.
    • 이 예제에서는 서버 측 애플리케이션을 구현하므로 두 가지 NioEventLoopGroup 구현을 사용합니다.
    • ‘boss’라고 불리는 첫 번째 구현은 들어오는 연결을 수락합니다.
    • ‘worker’라고 불리는 두 번째 구현은 보스(boss)가 연결을 수락하고 수락된 연결을 워커(worker)에 등록하면 수락된 연결의 트래픽을 처리합니다.
    • 사용되는 스레드 수와 생성된 Channel 스레드에 매핑되는 방식은 구현에 따라 다르며, EventLoopGroup 생성자를 통해 구성할 수도 있습니다.
  2. ServerBootstrap 서버를 설정하는 유틸리티 클래스입니다.
    • Channel 를 사용하여 서버를 직접 설정할 수 있습니다. 하지만 이 과정은 번거로우므로 대부분의 경우 직접 설정할 필요가 없습니다.
  3. 여기에서는 들어오는 연결을 수락하기 위한 새로운 Channel을 생성하는 NioServerSocketChannel 클래스를 사용하도록 지정합니다.

  4. 여기에 지정된 핸들러는 새로 수락된 Channel마다 항상 실행됩니다.
    • ChannelInitializer는 사용자가 새로운 Channel을 설정할 수 있도록 도와주는 특별한 핸들러입니다.
    • 보통은 DiscardServerHandler 같은 핸들러를 추가해서, 새로운 ChannelChannelPipeline을 구성하여 네트워크 애플리케이션을 구현하게 됩니다.
    • 애플리케이션이 복잡해질수록, 파이프라인에 더 많은 핸들러를 추가하게 되고, 결국 이 익명 클래스를 별도의 최상위 클래스(top-level class)로 분리하게 될 가능성이 높습니다.
  5. 또한 Channel 구현체에 따라 특정 매개변수를 설정할 수도 있습니다.
    • 여기서는 TCP/IP 서버를 작성하고 있으므로, tcpNoDelay, keepAlive 같은 소켓 옵션을 설정할 수 있습니다.
    • 지원되는 ChannelOptions에 대한 개요는 ChannelOption의 API 문서와 각 ChannelConfig 구현체를 참고하세요.
  6. 여기서 option()childOption()의 차이에 주목해야 합니다.
    • option()은 들어오는 연결을 수락하는 NioServerSocketChannel에 적용됩니다.
    • childOption()은 부모 ServerChannel이 수락한 Channel(이 경우 NioSocketChannel)에 적용됩니다.
  7. 이제 준비가 끝났습니다.
    • 마지막으로 할 일은 포트에 바인딩해서 서버를 시작하는 것입니다.
    • 여기서는 서버가 실행되는 모든 네트워크 인터페이스 카드(NIC)의 8080 포트에 바인딩합니다.
    • 필요하다면 bind() 메서드를 여러 번 호출해, 다른 바인드 주소로도 서버를 띄울 수 있습니다.

수신된 데이터 살펴보기, 테스트

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    try {
        while (in.isReadable()) { // (1)
            System.out.print((char) in.readByte());
            System.out.flush();
        }
    } finally {
        ReferenceCountUtil.release(msg); // (2)
    }
}
  1. 이 비효율적인 루프는 실제로 다음과 같이 간단하게 만들 수 있습니다: System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 또는, 여기서 in.release()를 호출할 수도 있습니다. 다시 telnet 명령어를 실행하면, 서버가 받은 데이터를 출력하는 것을 확인할 수 있습니다.
    discard 서버의 전체 소스 코드는 배포판(distribution)의 io.netty.example.discard 패키지에 위치해 있습니다.

에코 서버 작성

이번에는 ECHO 프로토콜을 구현하여, 받은 데이터를 다시 클라이언트로 보내는 방법을 배워보겠습니다.
이전 섹션에서 구현한 discard 서버와의 유일한 차이점은, 받은 데이터를 콘솔에 출력하는 대신 받은 데이터를 다시 클라이언트로 보내는 것입니다.

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg); // (1)
        ctx.flush(); // (2)
    }
  1. ChannelHandlerContext 객체는 다양한 I/O 이벤트와 작업을 트리거할 수 있는 여러 연산을 제공합니다.
    • 여기서는 write(Object)를 호출하여 받은 메시지를 그대로(write verbatim) 씁니다.
    • 참고로, DISCARD 예제에서처럼 받은 메시지를 직접 해제(release)하지 않았습니다. 그 이유는 Netty가 메시지를 네트워크로 전송할 때 자동으로 해제해주기 때문입니다.
  2. ctx.write(Object)만 호출한다고 해서 메시지가 바로 네트워크로 전송되는 것은 아닙니다.
    • 내부적으로 버퍼에 저장되고, ctx.flush()를 호출해야 실제로 전송됩니다. 또는, 한 줄로 간단히 ctx.writeAndFlush(msg)를 호출할 수도 있습니다.

다시 telnet 명령어를 실행하면, 서버가 클라이언트가 보낸 데이터를 그대로 다시 보내는 것을 확인할 수 있습니다.
ECHO 서버의 전체 소스 코드는 배포판(distribution)의 io.netty.example.echo 패키지에 위치해 있습니다.

타임 서버 작성

이번 섹션에서 구현할 프로토콜은 TIME 프로토콜입니다.

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
    final ByteBuf time = ctx.alloc().buffer(4); // (2)
    time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
    
    final ChannelFuture f = ctx.writeAndFlush(time); // (3)
    f.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) {
            assert f == future;
            ctx.close();
        }
    }); // (4)
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
} } ``` 1. 설명했듯이, channelActive() 메서드는 연결이 수립되어 트래픽을 생성할 준비가 되었을 때 호출됩니다. - 이 메서드에서 현재 시간을 나타내는 32비트 정수를 작성해 보겠습니다.
  1. 새로운 메시지를 보내려면, 메시지를 담을 새로운 버퍼를 할당해야 합니다.
    • 우리는 32비트 정수를 쓸 것이므로, 최소 4바이트 용량의 ByteBuf가 필요합니다.
    • ChannelHandlerContext.alloc()을 통해 현재 ByteBufAllocator를 가져오고, 새 버퍼를 할당합니다.
  2. 그다음, 평소처럼 메시지를 작성(write)합니다.

    그런데, 여기서 flip은 어디 있나요?
    NIO에서는 메시지를 보내기 전에 java.nio.ByteBuffer.flip()을 호출하곤 했죠.
    ByteBuf에는 이런 메서드가 없습니다. 그 이유는 ByteBuf가 읽기(read)와 쓰기(write)를 위해 두 개의 포인터를 가지고 있기 때문입니다.
    쓰기(write) 시에는 writer index가 증가하고, 읽기(read) 시에는 reader index가 변하지 않습니다.
    reader index와 writer index는 각각 메시지의 시작 위치와 끝 위치를 나타냅니다.

    반면, NIO 버퍼는 flip을 호출하지 않으면 메시지 내용의 시작과 끝을 정확히 알 수 없어서, flip을 잊으면 데이터가 전송되지 않거나 잘못 전송되는 문제가 발생할 수 있습니다.
    Netty에서는 서로 다른 작업 유형에 대해 포인터가 분리되어 있기 때문에 이러한 문제가 발생하지 않습니다.
    익숙해지면 flip을 고민하지 않아도 되어 훨씬 편리합니다 — flip 없는 삶이 시작되는 것이죠!

  1. 그렇다면, 쓰기 요청이 완료되었을 때 어떻게 알 수 있을까요?
    • 방법은 간단합니다. 반환된 ChannelFutureChannelFutureListener를 추가하면 됩니다.
    • 여기서는 작업이 완료되면 채널을 닫는 익명 ChannelFutureListener를 생성했습니다.
    • 또는, 미리 정의된 리스너를 사용해 코드를 더 간단히 할 수도 있습니다: f.addListener(ChannelFutureListener.CLOSE);
    • 우리의 Time 서버가 제대로 동작하는지 테스트하려면, UNIX의 rdate 명령어를 사용할 수 있습니다: $ rdate -o <port> -p <host>
    • 여기서 는 main() 메서드에서 지정한 포트 번호이며, 는 보통 localhost입니다.

시간 클라이언트 작성

DISCARD나 ECHO 서버와 달리, TIME 프로토콜에서는 클라이언트가 필요합니다. 그 이유는 사람이 32비트 바이너리 데이터를 달력상의 날짜로 바로 변환할 수 없기 때문입니다. 이번 섹션에서는 서버가 제대로 동작하는지 확인하는 방법과, Netty로 클라이언트를 작성하는 방법을 배웁니다.

Netty에서 서버와 클라이언트의 가장 큰 차이점은 서버와 클라이언트에서 사용하는 Bootstrap과 Channel 구현체가 다르다는 것입니다. 다음 코드를 참고해 보세요:

package io.netty.example.time;

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        
        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
  1. BootstrapServerBootstrap과 유사하지만, 서버가 아닌 채널(클라이언트 측 채널이나 연결이 없는 채널)을 위해 사용됩니다.
  2. 하나의 EventLoopGroup만 지정하면, 이는 boss 그룹과 worker 그룹 모두에 사용됩니다. 다만, 클라이언트 측에서는 boss 그룹이 실제로 사용되지는 않습니다.
  3. 서버에서 NioServerSocketChannel을 사용했던 것과 달리, 클라이언트 측 채널을 생성할 때는 NioSocketChannel을 사용합니다.
  4. 클라이언트 측 SocketChannel에는 부모가 없기 때문에, ServerBootstrap에서 사용했던 childOption()은 사용하지 않습니다.
  5. 또한 서버에서 bind()를 호출했던 것과 달리, 클라이언트에서는 connect() 메서드를 호출해야 합니다.

보시다시피, 서버 측 코드와 크게 다르지 않습니다. 그렇다면 ChannelHandler 구현은 어떻게 해야 할까요? 서버로부터 32비트 정수를 수신하고, 사람이 읽을 수 있는 형식으로 변환하며, 변환된 시간을 출력하고, 연결을 종료해야 합니다.

package io.netty.example.time;

import java.util.Date;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg; // (1)
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. TCP/IP에서 Netty는 피어(peer)로부터 전송된 데이터를 ByteBuf로 읽습니다. 겉보기에는 매우 간단해 보이고, 서버 측 예제와 크게 다르지 않아 보입니다. 하지만, 이 핸들러는 가끔 IndexOutOfBoundsException을 발생시키며 제대로 동작하지 않을 수 있습니다. 이 문제가 왜 발생하는지는 다음 섹션에서 다룹니다.

스트림 기반 전송 처리

소켓 버퍼의 작은 주의점

TCP/IP와 같은 스트림 기반 전송에서, 수신된 데이터는 소켓 수신 버퍼에 저장됩니다.
안타깝게도, 스트림 기반 전송의 버퍼는 패킷 큐가 아니라 바이트 큐입니다.
즉, 두 개의 메시지를 독립적인 패킷으로 전송하더라도, 운영체제는 이를 두 개의 메시지로 처리하지 않고 단순히 바이트의 집합으로 처리합니다.
따라서, 읽어들인 데이터가 원격 피어가 작성한 것과 정확히 일치한다고 보장할 수 없습니다.

예를 들어, 운영체제의 TCP/IP 스택이 세 개의 패킷을 수신했다고 가정해 봅시다: |ABC|DEF|GHI|
스트림 기반 프로토콜의 일반적인 특성 때문에, 애플리케이션에서는 다음과 같이 조각(fragmented) 형태로 읽힐 가능성이 높습니다: |AB|CDEFG|H|I|
따라서 수신 측(서버든 클라이언트든 상관없이)은, 수신 데이터를 애플리케이션 로직에서 이해할 수 있는 하나 이상의 의미 있는 프레임으로 합쳐야 합니다.
위 예제의 경우, 수신된 데이터는 다음과 같이 프레임으로 정리되어야 합니다: |ABC|DEF|GHI|

첫 번째 해결 방법

이제 TIME 클라이언트 예제로 돌아가 봅시다. 여기서도 같은 문제가 발생할 수 있습니다. 32비트 정수는 매우 작은 데이터이므로 자주 조각화되지 않지만, 조각화 가능성은 존재하며 트래픽이 많아질수록 그 가능성은 증가합니다.

간단한 해결책은 내부 누적 버퍼(cumulative buffer)를 생성하고, 4바이트 전체가 수신될 때까지 기다리는 것입니다. 다음은 이 문제를 해결한 수정된 TimeClientHandler 구현 예제입니다.

package io.netty.example.time;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        buf = ctx.alloc().buffer(4); // (1)
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        buf.release(); // (1)
        buf = null;
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        buf.writeBytes(m); // (2)
        m.release();
        
        if (buf.readableBytes() >= 4) { // (3)
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. ChannelHandler에는 두 가지 생명주기(lifecycle) 리스너 메서드가 있습니다:
    • handlerAdded()와 handlerRemoved()
    • 이 메서드 안에서는 임의의 초기화/종료 작업을 수행할 수 있지만, 오래 블로킹(block)되는 작업은 피해야 합니다.
  2. 먼저, 모든 수신 데이터를 buf에 누적(cumulate)해야 합니다.
  3. 그 다음, 핸들러는 buf에 충분한 데이터가 있는지 확인해야 합니다.
    • 이번 예제에서는 4바이트가 필요하며, 충분하다면 실제 비즈니스 로직을 진행합니다.
    • 만약 충분하지 않다면, Netty는 추가 데이터가 도착할 때 channelRead()를 다시 호출하고, 결국 모든 4바이트가 누적됩니다.

두 번째 해결 방법

첫 번째 해결책으로 TIME 클라이언트 문제는 해결되었지만, 수정된 핸들러는 깔끔해 보이지 않습니다. 길이가 가변적인 필드와 같이 여러 필드로 구성된 더 복잡한 프로토콜을 상상해 보세요. 단일(ChannelInboundHandler) 구현은 금세 유지보수가 어려워집니다.

이미 눈치채셨겠지만, ChannelPipeline에 여러 ChannelHandler를 추가할 수 있습니다. 따라서 하나의 거대한 ChannelHandler를 여러 모듈화된 핸들러로 분리하면 애플리케이션의 복잡도를 줄일 수 있습니다. 예를 들어, TimeClientHandler를 다음과 같이 두 개의 핸들러로 나눌 수 있습니다:

다행히도, Netty는 첫 번째 핸들러를 즉시 작성할 수 있도록 돕는 확장 가능한 클래스를 제공합니다.

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
        if (in.readableBytes() < 4) {
            return; // (3)
        }
        
        out.add(in.readBytes(4)); // (4)
    }
}
  1. ByteToMessageDecoderChannelInboundHandler를 구현한 클래스이며, 조각(fragmentation) 문제를 쉽게 처리할 수 있도록 도와줍니다.
  2. ByteToMessageDecoder는 새로운 데이터가 수신될 때마다 내부적으로 유지되는 누적 버퍼(cumulative buffer)를 이용하여 decode() 메서드를 호출합니다.
  3. decode()는 누적 버퍼에 충분한 데이터가 없으면 out에 아무 것도 추가하지 않을 수 있습니다.
    • 이 경우, 더 많은 데이터가 수신되면 ByteToMessageDecoderdecode()를 다시 호출합니다.
  4. decode()가 out에 객체를 추가하면, 이는 디코더가 메시지를 성공적으로 디코딩했다는 의미입니다.
    • 이때, ByteToMessageDecoder는 누적 버퍼에서 이미 읽은 부분을 제거합니다.
    • 여러 메시지를 한 번에 디코딩할 필요는 없다는 점을 기억하세요.
    • ByteToMessageDecoder는 out에 아무 것도 추가하지 않을 때까지 계속 decode()를 호출합니다.

이제 ChannelPipeline에 삽입할 또 다른 핸들러가 생겼으므로, TimeClient의 ChannelInitializer 구현을 수정해야 합니다.

b.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
    }
});

만약 모험심이 있는 사람이라면, ReplayingDecoder를 사용해 디코더를 훨씬 더 간단하게 만들 수도 있습니다. 자세한 내용은 API 레퍼런스를 참고해야 합니다.

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

또한, Netty는 대부분의 프로토콜을 매우 쉽게 구현할 수 있는 즉시 사용 가능한 디코더들을 제공합니다. 이러한 디코더를 사용하면 단일 거대한 핸들러로 인해 유지보수가 어려워지는 문제를 피할 수 있습니다. 자세한 예제는 다음 패키지를 참고하세요:

ByteBuf 대신 POJO 사용하기

지금까지 살펴본 모든 예제는 프로토콜 메시지의 기본 데이터 구조로 ByteBuf를 사용했습니다. 이번 섹션에서는 TIME 프로토콜의 클라이언트와 서버 예제를 개선하여, ByteBuf 대신 POJO를 사용하도록 해보겠습니다.

ChannelHandler에서 POJO를 사용하는 장점은 명확합니다. ByteBuf에서 정보를 추출하는 코드를 핸들러에서 분리함으로써, 핸들러가 더 유지보수 가능하고 재사용 가능하게 됩니다.

TIME 클라이언트와 서버 예제에서는 32비트 정수 하나만 읽기 때문에 ByteBuf를 직접 사용하는 것이 큰 문제가 되지 않습니다. 하지만 실제 프로토콜을 구현할 때는 이러한 분리가 필요함을 알게 될 것입니다.

먼저, UnixTime이라는 새로운 타입을 정의해 봅시다.

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;
    
    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }
    
    public UnixTime(long value) {
        this.value = value;
    }
        
    public long value() {
        return value;
    }
        
    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

이제 TimeDecoder를 수정하여 ByteBuf 대신 UnixTime 객체를 생성하도록 할 수 있습니다.

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    if (in.readableBytes() < 4) {
        return;
    }

    out.add(new UnixTime(in.readUnsignedInt()));
}

업데이트된 디코더를 사용하면, TimeClientHandler는 더 이상 ByteBuf를 사용하지 않습니다:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    UnixTime m = (UnixTime) msg;
    System.out.println(m);
    ctx.close();
}

훨씬 간단하고 우아하죠? 같은 기법을 서버 측에도 적용할 수 있습니다. 이번에는 먼저 TimeServerHandler를 업데이트해 봅시다:

@Override
public void channelActive(ChannelHandlerContext ctx) {
    ChannelFuture f = ctx.writeAndFlush(new UnixTime());
    f.addListener(ChannelFutureListener.CLOSE);
}

이제 남은 부분은 인코더(encoder)입니다. 인코더는 ChannelOutboundHandler를 구현하며, UnixTime 객체를 다시 ByteBuf로 변환합니다. 메시지를 인코딩할 때는 패킷 조각화(fragmentation)나 조립(assembly)을 처리할 필요가 없으므로, 디코더보다 훨씬 간단합니다.

package io.netty.example.time;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        ctx.write(encoded, promise); // (1)
    }
}
  1. 이 한 줄에 몇 가지 중요한 내용이 있습니다:
    • 원래의 ChannelPromise를 그대로 전달하여, Netty가 실제로 데이터를 네트워크로 쓸 때 성공 또는 실패로 표시할 수 있도록 합니다.
    • ctx.flush()는 호출하지 않았습니다. 별도의 핸들러 메서드 void flush(ChannelHandlerContext ctx)가 flush 동작을 오버라이드하도록 제공됩니다.

더 간단하게 만들기 위해, MessageToByteEncoder를 사용할 수도 있습니다:

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

마지막으로 남은 작업은 서버 측 ChannelPipeline에 TimeEncoder를 TimeServerHandler 앞에 삽입하는 것입니다. 이 부분은 간단한 연습 문제로 남겨둡니다.

애플리케이션 종료

Netty 애플리케이션을 종료하는 것은 일반적으로, 생성한 모든 EventLoopGroupshutdownGracefully()로 종료하면 간단합니다. 이 메서드는 Future를 반환하며, EventLoopGroup이 완전히 종료되고 해당 그룹에 속한 모든 채널이 닫혔을 때 알림을 받을 수 있습니다.

요약

이번 장에서는 Netty를 간단히 살펴보고, Netty 위에서 완전히 동작하는 네트워크 애플리케이션을 작성하는 방법을 시연했습니다. 더 자세한 Netty 정보는 다음 장에서 확인할 수 있습니다. 또한, io.netty.example 패키지에 있는 Netty 예제들을 직접 살펴보는 것을 권장합니다.

마지막으로, 커뮤니티는 항상 여러분의 질문과 아이디어를 기다리고 있으며, 이를 통해 Netty와 그 문서가 지속적으로 개선됩니다.