Class BufferedWriteHandler

java.lang.Object
org.jboss.netty.channel.SimpleChannelHandler
org.jboss.netty.handler.queue.BufferedWriteHandler
All Implemented Interfaces:
ChannelDownstreamHandler, ChannelHandler, ChannelUpstreamHandler, LifeCycleAwareChannelHandler

public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler
Emulates buffered write operation. This handler stores all write requests into an unbounded Queue and flushes them to the downstream when flush() method is called.

Here is an example that demonstrates the usage:

 BufferedWriteHandler bufferedWriter = new BufferedWriteHandler();
 ChannelPipeline p = ...;
 p.addFirst("buffer", bufferedWriter);

 ...

 Channel ch = ...;

 // msg1, 2, and 3 are stored in the queue of bufferedWriter.
 ch.write(msg1);
 ch.write(msg2);
 ch.write(msg3);

 // and will be flushed on request.
 bufferedWriter.flush();
 

Auto-flush

The write request queue is automatically flushed when the associated Channel is disconnected or closed. However, it does not flush the queue otherwise. It means you have to call flush() before the size of the queue increases too much. You can implement your own auto-flush strategy by extending this handler:
 public class AutoFlusher extends BufferedWriteHandler {

     private final AtomicLong bufferSize = new AtomicLong();

     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) {
         super.writeRequested(ctx, e);

         ChannelBuffer data = (ChannelBuffer) e.getMessage();
         int newBufferSize = bufferSize.addAndGet(data.readableBytes());

         // Flush the queue if it gets larger than 8KiB.
         if (newBufferSize > 8192) {
             flush();
             bufferSize.set(0);
         }
     }
 }
 

Consolidate on flush

If there are two or more write requests in the queue and all their message type is ChannelBuffer, they can be merged into a single write request to save the number of system calls.
 BEFORE consolidation:            AFTER consolidation:
 +-------+-------+-------+        +-------------+
 | Req C | Req B | Req A |------\\| Request ABC |
 | "789" | "456" | "123" |------//| "123456789" |
 +-------+-------+-------+        +-------------+
 
This feature is disabled by default. You can override the default when you create this handler or call flush(boolean). If you specified true when you call the constructor, calling flush() will always consolidate the queue. Otherwise, you have to call flush(boolean) with true to enable this feature for each flush.

The disadvantage of consolidation is that the ChannelFuture and its ChannelFutureListeners associated with the original write requests might be notified later than when they are actually written out. They will always be notified when the consolidated write request is fully written.

The following example implements the consolidation strategy that reduces the number of write requests based on the writability of a channel:

 public class ConsolidatingAutoFlusher extends BufferedWriteHandler {

     public ConsolidatingAutoFlusher() {
         // Enable consolidation by default.
         super(true);
     }

     @Override
     public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         ChannelConfig cfg = e.getChannel().getConfig();
         if (cfg instanceof NioSocketChannelConfig) {
             // Lower the watermark to increase the chance of consolidation.
             cfg.setWriteBufferLowWaterMark(0);
         }
         super.channelOpen(e);
     }

     @Override
     public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
         super.writeRequested(ctx, et);
         if (e.getChannel().isWritable()) {
             flush();
         }
     }

     @Override
     public void channelInterestChanged(
             ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
         if (e.getChannel().isWritable()) {
             flush();
         }
     }
 }
 

Prioritized Writes

You can implement prioritized writes by specifying an unbounded priority queue in the constructor of this handler. It will be required to design the proper strategy to determine how often flush() should be called. For example, you could call flush() periodically, using HashedWheelTimer every second.