Netty tutorial – High speed custom codecs with ReplayingDecoder

Published on — Filed under bare metal

The full project for this tutorial can be found on GitHub

Been a while since I've written about anything else but windroplr's updates so today I'm going back to Netty.

Instead of covering some exotic advanced feature, this time we're going back to basics: creating a custom I/O protocol.

One of the most awesome features of Netty is the ReplayingDecoder – which is why I don't understand the lack of a proper tutorial in the examples.

The less curious won't RTFM unless they really have to, which means that all-things-awesome must be made very visible. This is an attempt to do just that.

ReplayingDecoder 101

First off, the ReplayingDecoder only makes sense in TCP connections. Basically what it does is give you the ability to create checkpoints when reading structured variable-sized messages without having to manually check for the available bytes.

What that means is that, unlike other decoders, you can request as many bytes as you want from the buffer. If they are not available the operation will silently fail until the socket reads more data — and nothing gets changed. If enough bytes are available, the buffer is drained and you mark a checkpoint. Having this checkpoint set means that if the next read operation fails (less bytes than the ones you're requesting), it will start at the last saved checkpoint.

And all that happens automagically!

Now on to the code...

The message structure

The message being passed is extremely simple:

VERSION        - 1 byte
TYPE           - 1 byte
PAYLOAD_LENGTH - 4 bytes
PAYLOAD        - N bytes (depends on the value read at PAYLOAD_LENGTH)

As you can see it's basically an envelope to carry other stuff. There's no point in going into details as to what gets transported inside the PAYLOAD, as it's really not of the essence here. If it helps, think of it as the bytes of a string ("someString".getBytes()).

This message would yield the following enum to define the states:

public enum DecodingState {
    VERSION,
    TYPE,
    PAYLOAD_LENGTH,
    PAYLOAD,
}

And here’s the corresponding Envelope class:

public class Envelope {
    private Version version;
    private Type type;
    private byte[] payload;
 
    public Envelope() {
    }
 
    public Envelope(Version version, Type type, byte[] payload) {
        this.version = version;
        this.type = type;
        this.payload = payload;
    }
 
    // getters & setters
}

Encoding

Encoding is the easy part, as you have your POJO whose fields you've filled and want to turn it into a ChannelBuffer. To accomplish this we use the good old OneToOneEncoder:

@ChannelHandler.Sharable
public class Encoder extends OneToOneEncoder {
    // ...
    public static ChannelBuffer encodeMessage(Envelope message)
            throws IllegalArgumentException {
        // verify that no fields are set to null
 
        // version(1b) + type(1b) + payload length(4b) + payload(nb)
        int size = 6 + message.getPayload().length;
 
        ChannelBuffer buffer = ChannelBuffers.buffer(size);
        buffer.writeByte(message.getVersion().getByteValue());
        buffer.writeByte(message.getType().getByteValue());
        buffer.writeInt(message.getPayload().length);
        buffer.writeBytes(message.getPayload());
 
        return buffer;
    }
 
    @Override
    protected Object encode(ChannelHandlerContext channelHandlerContext,
                            Channel channel, Object msg) throws Exception {
        if (msg instanceof Envelope) {
            return encodeMessage((Envelope) msg);
        } else {
            return msg;
        }
    }
 
    // ...
}

Nothing fancy here, so on to the decoder.

Decoding

First, the definition:

public class Decoder extends ReplayingDecoder<Decoder.DecodingState> {
    // ....
}

The parametrization of the ReplayingDecoder is an Enum whose constant fields represent the decoding states. The one being used is the DecodingState, as shown above on "The message structure".

And here's the juice:

    // ...
 
    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel,
                            ChannelBuffer buffer, DecodingState state)
            throws Exception {
 
        switch (state) {
            case VERSION:
                this.message.setVersion(Version.fromByte(buffer.readByte()));
                checkpoint(DecodingState.TYPE);
            case TYPE:
                this.message.setType(Type.fromByte(buffer.readByte()));
                checkpoint(DecodingState.PAYLOAD_LENGTH);
            case PAYLOAD_LENGTH:
                int size = buffer.readInt();
                if (size <= 0) {
                    throw new Exception("Invalid content size");
                }
                byte[] content = new byte[size];
                this.message.setPayload(content);
                checkpoint(DecodingState.PAYLOAD);
            case PAYLOAD:
                buffer.readBytes(this.message.getPayload(), 0,
                                 this.message.getPayload().length);
 
                try {
                    return this.message;
                } finally {
                    this.reset();
                }
            default:
                throw new Exception("Unknown decoding state: " + state);
        }
    }
 
    private void reset() {
        checkpoint(DecodingState.VERSION);
        this.message = new Envelope();
    }
 
    // ...

The first thing you should notice is the fall-through in the switch. The (intended) consequence is that the only way to exit the decode() method is either by completely reading a message or by that special exception that the buffer will throw in case there is not enough data.

Also, notice that after each read, there's a call to checkpoint(). This is what tells the ReplayingDecoder from where to restart in case the next read fails.

Special exception? But aren’t exceptions slow?

Slow or not, it's certainly faster to simply return null... But in this case, the ReplayingDecoder uses a cached exception (i.e. always the same, so there is no overhead of filling the stack every time it is thrown). This clever technique allows you to code as if you always had the buffer full — no manual checking of available bytes — without incurring in the performance penalties of exception stack creation.

The rest of the project

The main part of this tutorial focuses on the encoding/decoding logic. The rest of the project is a simple test for you to assert the amazing throughput you can achieve when using Netty.

There's an echo server that reflects back every Envelope that's sent at it and a client that floods the server with Envelope messages.

I've also included a version that uses the traditional Java serialization so you can compare the results yourself, both in terms of generated traffic and throughput.

Here's what I got, running this in two VAIOs VGN-SR19VN (Core 2 Duo P8400, 2.26GHz) over GB ethernet — after proper warmup:

The payload field of the Envelope contains 175 bytes

Custom Codec:

Sent and received 100000 in 2.983s
That's 33523.297 echoes per second!
(...) sent: 27118868b, recv: 22967814b

Java serialization:

Sent and received 100000 in 24.057s
That's 4156.7944 echoes per second!
(...) sent: 51848520b, recv: 51410040b

I'll let the figures speak for themselves :)

If you're looking to decode more complex structured messages, take a look at this other post.