Class OrderedMemoryAwareThreadPoolExecutor
- java.lang.Object
-
- java.util.concurrent.AbstractExecutorService
-
- java.util.concurrent.ThreadPoolExecutor
-
- org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
-
- org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
-
- All Implemented Interfaces:
Executor
,ExecutorService
- Direct Known Subclasses:
OrderedDownstreamThreadPoolExecutor
public class OrderedMemoryAwareThreadPoolExecutor extends MemoryAwareThreadPoolExecutor
AMemoryAwareThreadPoolExecutor
which makes sure the events from the sameChannel
are executed sequentially.NOTE: This thread pool inherits most characteristics of its super type, so please make sure to refer to
MemoryAwareThreadPoolExecutor
to understand how it works basically.Event execution order
For example, let's say there are two executor threads that handle the events from the two channels:-------------------------------------> Timeline ------------------------------------> Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) ---> \ / X / \ Thread Y: --- Channel B (Event B1) --' '-- Channel A (Event A2) --- Channel A (Event A3) --->
As you see, the events from different channels are independent from each other. That is, an event of Channel B will not be blocked by an event of Channel A and vice versa, unless the thread pool is exhausted.Also, it is guaranteed that the invocation will be made sequentially for the events from the same channel. For example, the event A2 is never executed before the event A1 is finished. (Although not recommended, if you want the events from the same channel to be executed simultaneously, please use
MemoryAwareThreadPoolExecutor
instead.)However, it is not guaranteed that the invocation will be made by the same thread for the same channel. The events from the same channel can be executed by different threads. For example, the Event A2 is executed by the thread Y while the event A1 was executed by the thread X.
Using a different key other than
Channel
to maintain event orderOrderedMemoryAwareThreadPoolExecutor
uses aChannel
as a key that is used for maintaining the event execution order, as explained in the previous section. Alternatively, you can extend it to change its behavior. For example, you can change the key to the remote IP of the peer:public class RemoteAddressBasedOMATPE extends
Please be very careful of memory leak of the child executor map. You must callOrderedMemoryAwareThreadPoolExecutor
{ ... Constructors ...@Override
protected ConcurrentMap<Object, Executor> newChildExecutorMap() { // The default implementation returns a special ConcurrentMap that // uses identity comparison only (seeIdentityHashMap
). // Because SocketAddress does not work with identity comparison, // we need to employ more generic implementation. return new ConcurrentHashMap<Object, Executor> } protected Object getChildExecutorKey(ChannelEvent
e) { // Use the IP of the remote peer as a key. return ((InetSocketAddress) e.getChannel().getRemoteAddress()).getAddress(); } // Make public so that you can call from anywhere. public boolean removeChildExecutor(Object key) { super.removeChildExecutor(key); } }removeChildExecutor(Object)
when the life cycle of the key ends (e.g. all connections from the same IP were closed.) Also, please keep in mind that the key can appear again after callingremoveChildExecutor(Object)
(e.g. a new connection could come in from the same old IP after removal.) If in doubt, prune the old unused or stall keys from the child executor map periodically:RemoteAddressBasedOMATPE executor = ...; on every 3 seconds: for (Iterator<Object> i = executor.getChildExecutorKeySet().iterator; i.hasNext();) { InetAddress ip = (InetAddress) i.next(); if (there is no active connection from 'ip' now && there has been no incoming connection from 'ip' for last 10 minutes) { i.remove(); } }
If the expected maximum number of keys is small and deterministic, you could use a weak key map such as ConcurrentWeakHashMap or synchronizedWeakHashMap
instead of managing the life cycle of the keys by yourself.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
OrderedMemoryAwareThreadPoolExecutor.ChildExecutor
-
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
-
-
Field Summary
Fields Modifier and Type Field Description protected ConcurrentMap<Object,Executor>
childExecutors
-
Constructor Summary
Constructors Constructor Description OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
Creates a new instance.OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
Creates a new instance.OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
Creates a new instance.OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
Creates a new instance.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
doExecute(Runnable task)
Executes the specified task concurrently while maintaining the event order.protected Executor
getChildExecutor(ChannelEvent e)
protected Object
getChildExecutorKey(ChannelEvent e)
protected Set<Object>
getChildExecutorKeySet()
protected ConcurrentMap<Object,Executor>
newChildExecutorMap()
protected boolean
removeChildExecutor(Object key)
protected boolean
shouldCount(Runnable task)
Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption.-
Methods inherited from class org.jboss.netty.handler.execution.MemoryAwareThreadPoolExecutor
beforeExecute, decreaseCounter, doUnorderedExecute, execute, getMaxChannelMemorySize, getMaxTotalMemorySize, getNotifyChannelFuturesOnShutdown, getObjectSizeEstimator, increaseCounter, remove, setMaxChannelMemorySize, setNotifyChannelFuturesOnShutdown, setObjectSizeEstimator, shutdownNow, shutdownNow, terminated
-
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toString
-
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
-
-
-
-
Field Detail
-
childExecutors
protected final ConcurrentMap<Object,Executor> childExecutors
-
-
Constructor Detail
-
OrderedMemoryAwareThreadPoolExecutor
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.
-
OrderedMemoryAwareThreadPoolExecutor
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
-
OrderedMemoryAwareThreadPoolExecutor
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
threadFactory
- theThreadFactory
of this pool
-
OrderedMemoryAwareThreadPoolExecutor
public OrderedMemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory)
Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
threadFactory
- theThreadFactory
of this poolobjectSizeEstimator
- theObjectSizeEstimator
of this pool
-
-
Method Detail
-
newChildExecutorMap
protected ConcurrentMap<Object,Executor> newChildExecutorMap()
-
getChildExecutorKey
protected Object getChildExecutorKey(ChannelEvent e)
-
removeChildExecutor
protected boolean removeChildExecutor(Object key)
-
doExecute
protected void doExecute(Runnable task)
Executes the specified task concurrently while maintaining the event order.- Overrides:
doExecute
in classMemoryAwareThreadPoolExecutor
-
getChildExecutor
protected Executor getChildExecutor(ChannelEvent e)
-
shouldCount
protected boolean shouldCount(Runnable task)
Description copied from class:MemoryAwareThreadPoolExecutor
Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption. To override this method, you must callsuper.shouldCount()
to make sure important tasks are not counted.- Overrides:
shouldCount
in classMemoryAwareThreadPoolExecutor
-
-