com.flat502.rox.processing
Class HttpRpcProcessor

java.lang.Object
  extended by com.flat502.rox.processing.HttpRpcProcessor
Direct Known Subclasses:
HttpRpcClient, HttpRpcServer

public abstract class HttpRpcProcessor
extends java.lang.Object

This abstract base class encapsulates all of the generic logic common to both client and server side communication using RPC over HTTP.

This particular implementation uses a single Thread (this class) to manage all I/O on the underlying collection of sockets. I/O is managed using a Selector instance. This thread may be shared with other instances by means of a ResourcePool.

I/O writes are queued and written whenever the target socket becomes available for writing. I/O reads are buffered in an instance of HttpMessageBuffer until a complete message is available (as determined by the HttpMessageBuffer.isComplete() method.

When a complete message has been received it is placed on a shared queue serviced by a pool of worker threads, themselves instances of HttpMessageHandler created through calls to the ResourcePool.newWorker() factory method on the underlying ResourcePool. A worker pool may be provided when an instance of this class is created. This allows multiple instances to share an underlying worker pool.

All I/O and any state changes on the underlying Selector (like updates to interest operations on channels, or new channel registrations) are handled directly by the thread backing this instance to avoid unexpected blocking due to platform inconsistencies in the underlying NIO implementation.


Field Summary
static java.lang.String ALL_CIPHER_SUITES
          Deprecated. use SSLConfiguration.ALL_CIPHER_SUITES instead.
static java.lang.String ANON_CIPHER_SUITES
          Deprecated. use SSLConfiguration.ANON_CIPHER_SUITES instead.
 
Constructor Summary
protected HttpRpcProcessor(boolean useHttps, ResourcePool workerPool)
          Initializes a new instance of this class.
protected HttpRpcProcessor(boolean useHttps, ResourcePool workerPool, SSLConfiguration sslCfg)
           
 
Method Summary
 int addWorker()
          Create a new worker instance and add it to the underlying thread pool.
 int addWorkers(int count)
          A convenience method for adding multiple worker threads in a single call.
protected  void deregisterChannel(java.nio.channels.SelectableChannel channel)
           
protected  void deregisterSocket(java.net.Socket socket)
           
protected  BlockingQueue getQueue()
          Returns a handle to the shared queue used by worker threads.
protected abstract  HttpMessageBuffer getReadBuffer(java.net.Socket socket)
          Called when data is available on a socket.
protected  java.nio.channels.Selector getSocketSelector()
          Returns a handle to the Selector this thread is using for all I/O.
 SSLConfiguration getSSLConfiguration()
           
protected  javax.net.ssl.SSLSession getSSLSession(java.net.Socket socket)
           
protected  java.util.Timer getTimer()
           
 int getWorkerCount()
          Get the number of worker threads currently responsible for this instance.
protected abstract  java.nio.ByteBuffer getWriteBuffer(java.net.Socket socket)
          Called when a socket becomes available for writing.
protected abstract  void handleMessageException(HttpMessageBuffer msg, java.lang.Exception e)
          An error handler invoked when an attempt to determine if an HTTP message buffer constitutes a complete HTTP message.
protected abstract  void handleProcessingException(java.net.Socket socket, java.lang.Exception e)
          An error handler invoked when an error occurs within the main processing loop.
protected  void handleSelectionKeyOperation(java.nio.channels.SelectionKey key)
          Central dispatch routine for handling I/O events on the underlying Selector.
protected abstract  void handleSSLHandshakeFinished(java.net.Socket socket, javax.net.ssl.SSLEngine engine)
           
protected abstract  void handleTimeout(java.net.Socket socket, java.lang.Exception cause)
           
protected  void initialize()
          Initializes this implementation.
protected  void initSelector(java.nio.channels.Selector selector)
          Create and initialize a Selector for all I/O operations.
protected  javax.net.ssl.SSLEngine initSocketSSLEngine(java.net.Socket socket)
           
protected  boolean isSharedWorkerPool()
           
protected  boolean isStarted()
           
protected abstract  boolean isWriteQueued(java.net.Socket socket)
          Called to find out if data is queued to be written to a socket.
protected  SSLSession newSSLSession(java.net.Socket socket)
           
protected abstract  ResourcePool newWorkerPool()
          Factory method for new worker pools.
protected abstract  void putWriteBuffer(java.net.Socket socket, java.nio.ByteBuffer data)
          Called when data is available to be written to a socket.
protected  void queueCancellation(java.nio.channels.spi.AbstractSelectableChannel channel)
           
protected  void queueRead(java.net.Socket socket)
           
protected  void queueRegistration(java.nio.channels.SocketChannel channel)
          Queue's a new SocketChannel for registration with the underlying Selector.
protected  void queueWrite(java.net.Socket socket)
           
protected  void queueWrite(java.net.Socket socket, byte[] data, boolean close)
          Queue's data to be written on the indicated Socket.
protected  void read(java.nio.channels.SelectionKey key)
          Reads any pending data from the socket indicated by the given SelectionKey.
protected  void registerChannel(java.nio.channels.SelectableChannel channel)
           
 void registerProfiler(Profiler p)
           
protected  void registerSocket(java.net.Socket socket, java.lang.String host, int port, boolean client)
           
 void registerSSLSessionPolicy(SSLSessionPolicy policy)
           
protected abstract  void removeReadBuffer(java.net.Socket socket)
          Called when a complete HTTP message has been identified.
protected abstract  void removeReadBuffers(java.net.Socket socket)
          Called when a a socket is deregistered and any buffers associated with it should be released.
 int removeWorker()
          Removes a worker thread from the thread pool.
protected abstract  void removeWriteBuffer(java.net.Socket socket)
          Called when a all of the data in a pending write buffer has been written to a socket.
protected abstract  void removeWriteBuffers(java.net.Socket socket)
          Called when a a socket is deregistered and any buffers associated with it should be released.
 void setCipherSuitePattern(java.lang.String cipherSuitePattern)
          Set the regular expression used to select the SSL cipher suites to use for all connections from this point on.
 void setSSLHandshakeTimeout(int timeout)
          Configure a timeout value for SSL handshaking.
protected  boolean shouldUseHTTPS()
           
 void start()
           
 void stop()
           
protected abstract  void stopImpl()
           
protected  void write(java.nio.channels.SelectionKey key)
          Writes any pending data to the socket indicated by the given SelectionKey.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

ANON_CIPHER_SUITES

public static final java.lang.String ANON_CIPHER_SUITES
Deprecated. use SSLConfiguration.ANON_CIPHER_SUITES instead.
A regular expression that matches only cipher suites that allow for anonymous key exchange.

See Also:
Constant Field Values

ALL_CIPHER_SUITES

public static final java.lang.String ALL_CIPHER_SUITES
Deprecated. use SSLConfiguration.ALL_CIPHER_SUITES instead.
A regular expression that matches all cipher suites.

See Also:
Constant Field Values
Constructor Detail

HttpRpcProcessor

protected HttpRpcProcessor(boolean useHttps,
                           ResourcePool workerPool)
                    throws java.io.IOException
Initializes a new instance of this class.

Parameters:
useHttps - A flag indicating whether or not the underlying transport should use HTTPS (HTTP over SSL).
Throws:
java.io.IOException
See Also:
setCipherSuitePattern(String)

HttpRpcProcessor

protected HttpRpcProcessor(boolean useHttps,
                           ResourcePool workerPool,
                           SSLConfiguration sslCfg)
                    throws java.io.IOException
Throws:
java.io.IOException
Method Detail

addWorker

public int addWorker()
Create a new worker instance and add it to the underlying thread pool.

Worker threads are responsible for handling complete HTTP messages. All I/O is handled by this thread instance alone.

If an instance of this class is constructed and started without this method having been invoked it will be invoked before processing begins.

Returns:
The number of worker threads backing this instance.

addWorkers

public int addWorkers(int count)
A convenience method for adding multiple worker threads in a single call.

Parameters:
count - The number of worker threads to add.
Returns:
The number of worker threads backing this instance.

getWorkerCount

public int getWorkerCount()
Get the number of worker threads currently responsible for this instance.

Returns:
The number of worker threads backing this instance.

removeWorker

public int removeWorker()
Removes a worker thread from the thread pool.

It's possible to reduce the size of the thread pool to zero, stopping all HTTP message processing. Incoming data will still be accepted, as will new connections (if this applies), but no processing will occur until the thread pool size is increased again.

Returns:
The number of worker threads backing this instance.
Throws:
java.lang.IllegalStateException - if this method is invoked while the thread pool is empty.

setCipherSuitePattern

public void setCipherSuitePattern(java.lang.String cipherSuitePattern)
Set the regular expression used to select the SSL cipher suites to use for all connections from this point on.

Parameters:
cipherSuitePattern - A regular expression for selecting the set of SSL cipher suites. A null value will treated as matching all cipher suites.
Throws:
java.lang.IllegalStateException - If this instance was not configured to use HTTPS.
See Also:
ALL_CIPHER_SUITES, ANON_CIPHER_SUITES

setSSLHandshakeTimeout

public void setSSLHandshakeTimeout(int timeout)
Configure a timeout value for SSL handshaking.

If the remote server is not SSL enabled then it falls to some sort of timeout to determine this, since a non-SSL server is waiting for a request from a client, which is in turn waiting for an SSL handshake to be initiated by the server.

This method controls the length of that timeout.

This timeout defaults to 10 seconds.

The new timeout affects only connections initiated subsequent to the completion of this method call.

Parameters:
timeout - The timeout (in milliseconds). A value of 0 indicates no timeout should be enforced (not recommended).
Throws:
java.lang.IllegalArgumentException - If the timeout provided is negative.

getSSLConfiguration

public SSLConfiguration getSSLConfiguration()

newSSLSession

protected SSLSession newSSLSession(java.net.Socket socket)

getSSLSession

protected javax.net.ssl.SSLSession getSSLSession(java.net.Socket socket)

isStarted

protected boolean isStarted()

start

public void start()
           throws java.io.IOException
Throws:
java.io.IOException

stop

public void stop()
          throws java.io.IOException
Throws:
java.io.IOException

stopImpl

protected abstract void stopImpl()
                          throws java.io.IOException
Throws:
java.io.IOException

shouldUseHTTPS

protected boolean shouldUseHTTPS()

initialize

protected void initialize()
                   throws java.io.IOException
Initializes this implementation.

Sub-classes must invoke this method after their constructor has completed it's initialization.

This implementation invokes initSelector(Selector) to initialize the underlying Selector.

Throws:
java.io.IOException - if an error occurs during initialization.

queueRegistration

protected void queueRegistration(java.nio.channels.SocketChannel channel)
Queue's a new SocketChannel for registration with the underlying Selector.

The update is queued internally and the selecting thread is awoken to apply the change. This removes any risk of platform specific NIO implementation discrepancies from blocking indefinitely.

Parameters:
channel - The SocketChannel to register.

queueCancellation

protected void queueCancellation(java.nio.channels.spi.AbstractSelectableChannel channel)

getTimer

protected java.util.Timer getTimer()

getQueue

protected BlockingQueue getQueue()
Returns a handle to the shared queue used by worker threads.

Returns:
A handle to the shared queue.

getSocketSelector

protected java.nio.channels.Selector getSocketSelector()
Returns a handle to the Selector this thread is using for all I/O.

Returns:
A handle to the Selector.

handleSelectionKeyOperation

protected void handleSelectionKeyOperation(java.nio.channels.SelectionKey key)
                                    throws java.io.IOException
Central dispatch routine for handling I/O events on the underlying Selector.

This implementation defers to the read(SelectionKey) or write(SelectionKey) method appropriately.

If a sub-class overrides this method it should defer to it if it is not interested in the SelectionKey presented.

Parameters:
key - The SelectionKey for the socket on which an I/O operation is pending.
Throws:
java.io.IOException - if an error occurs while processing the pending event.

write

protected void write(java.nio.channels.SelectionKey key)
              throws java.io.IOException
Writes any pending data to the socket indicated by the given SelectionKey.

This implementation checks for data using the getWriteBuffer(Socket) method. If data is available as much as possible is written to the socket.

Parameters:
key - The SelectionKey indicating the socket available for writing.
Throws:
java.io.IOException - If an error occurs while attempting to write to the indicated socket.

read

protected void read(java.nio.channels.SelectionKey key)
             throws java.io.IOException
Reads any pending data from the socket indicated by the given SelectionKey.

This implementation retrieves an HttpMessageBuffer instance for the indicated socket using the getReadBuffer(Socket) method. Data present on the socket is added to this buffer and if a complete HTTP message has been received it is enqueued on the shared queue.

Parameters:
key - The SelectionKey indicating the socket available for writing.
Throws:
java.io.IOException - If an error occurs while attempting to read from the indicated socket.

queueWrite

protected void queueWrite(java.net.Socket socket,
                          byte[] data,
                          boolean close)
Queue's data to be written on the indicated Socket.

The data is queued internally and the interest operations set on the associated SocketChannel is updated to indicate that a write operation is desired.

Parameters:
socket - The socket to which the data should be written.
data - The data to be written
close - The socket should be closed after the write completes

initSelector

protected void initSelector(java.nio.channels.Selector selector)
                     throws java.io.IOException
Create and initialize a Selector for all I/O operations.

Parameters:
selector -
Throws:
java.io.IOException - If an error occurs while attempting to create the Selector.

newWorkerPool

protected abstract ResourcePool newWorkerPool()
Factory method for new worker pools.

This is invoked from the constructor to create a new worker pool when one is not provided.

Returns:
A new ResourcePool.

isSharedWorkerPool

protected boolean isSharedWorkerPool()

handleMessageException

protected abstract void handleMessageException(HttpMessageBuffer msg,
                                               java.lang.Exception e)
                                        throws java.io.IOException
An error handler invoked when an attempt to determine if an HTTP message buffer constitutes a complete HTTP message.

Parameters:
msg - The message buffer the exception is associated with.
e - The exception that was raised.
Throws:
java.io.IOException - Implementations may raise an IOException, since there may be a requirement to notify the remote party of the error (which in turn will require network I/O).

handleProcessingException

protected abstract void handleProcessingException(java.net.Socket socket,
                                                  java.lang.Exception e)
An error handler invoked when an error occurs within the main processing loop.

Parameters:
socket - The socket the exception is associated with. This may be null if the exception is not specific to a particular socket.
e - The exception that was raised.

handleTimeout

protected abstract void handleTimeout(java.net.Socket socket,
                                      java.lang.Exception cause)

removeReadBuffer

protected abstract void removeReadBuffer(java.net.Socket socket)
Called when a complete HTTP message has been identified.

Parameters:
socket - The socket on which a complete message has been received.

removeReadBuffers

protected abstract void removeReadBuffers(java.net.Socket socket)
Called when a a socket is deregistered and any buffers associated with it should be released.

Parameters:
socket - The socket on which a complete message has been received.

getReadBuffer

protected abstract HttpMessageBuffer getReadBuffer(java.net.Socket socket)
Called when data is available on a socket.

Implementations must return a buffer, even if this means creating a new instance. The same buffer should be returned for a given socket until the removeReadBuffer(Socket) method is invoked, ensuring that message fragmentation is correctly handled.

Parameters:
socket - The socket on which data has arrived.
Returns:
A message buffer for the given socket.

putWriteBuffer

protected abstract void putWriteBuffer(java.net.Socket socket,
                                       java.nio.ByteBuffer data)
Called when data is available to be written to a socket.

Parameters:
socket - The socket on which the data should be sent.
data - The data to be written.

isWriteQueued

protected abstract boolean isWriteQueued(java.net.Socket socket)
Called to find out if data is queued to be written to a socket.

Parameters:
socket - The socket on which the data should be sent.
Returns:
true if there is a buffer waiting to be written on the given socket

removeWriteBuffer

protected abstract void removeWriteBuffer(java.net.Socket socket)
Called when a all of the data in a pending write buffer has been written to a socket.

Parameters:
socket - The socket the buffer was associated with.

removeWriteBuffers

protected abstract void removeWriteBuffers(java.net.Socket socket)
Called when a a socket is deregistered and any buffers associated with it should be released.

Parameters:
socket - The socket the buffer was associated with.

getWriteBuffer

protected abstract java.nio.ByteBuffer getWriteBuffer(java.net.Socket socket)
Called when a socket becomes available for writing.

Implementations should return a buffer only if one has been added using putWriteBuffer(Socket, ByteBuffer). The same buffer should be returned for a given socket until the removeWriteBuffer(Socket) method is invoked, ensuring that large messages that must be written in multiple fragments are correctly handled.

Parameters:
socket - The socket that is available for writing.
Returns:
A data buffer for the given socket.

queueRead

protected void queueRead(java.net.Socket socket)

queueWrite

protected void queueWrite(java.net.Socket socket)

registerSSLSessionPolicy

public void registerSSLSessionPolicy(SSLSessionPolicy policy)

registerProfiler

public void registerProfiler(Profiler p)

registerChannel

protected void registerChannel(java.nio.channels.SelectableChannel channel)

deregisterChannel

protected void deregisterChannel(java.nio.channels.SelectableChannel channel)

registerSocket

protected void registerSocket(java.net.Socket socket,
                              java.lang.String host,
                              int port,
                              boolean client)
                       throws java.io.IOException
Throws:
java.io.IOException

deregisterSocket

protected void deregisterSocket(java.net.Socket socket)

initSocketSSLEngine

protected javax.net.ssl.SSLEngine initSocketSSLEngine(java.net.Socket socket)
                                               throws javax.net.ssl.SSLException
Throws:
javax.net.ssl.SSLException

handleSSLHandshakeFinished

protected abstract void handleSSLHandshakeFinished(java.net.Socket socket,
                                                   javax.net.ssl.SSLEngine engine)