org.sdnplatform.sync.internal.rpc
Class RPCService

java.lang.Object
  extended by org.sdnplatform.sync.internal.rpc.RPCService

public class RPCService
extends java.lang.Object

A lightweight RPC mechanism built on netty.

Author:
readams

Nested Class Summary
protected  class RPCService.ConnectCFListener
          Wait for the client connection
protected  class RPCService.ConnectTask
          Periodically ensure that all the node connections are alive
protected static class RPCService.MessageWindow
          Maintain state for the pending message window for a given message type
protected static class RPCService.NodeConnection
          Connection state wrapper for node connections
protected static class RPCService.NodeConnectionState
          Various states for connections
protected static class RPCService.NodeMessage
          A pending message to be sent to a particular mode.
protected  class RPCService.SyncMessageWorker
          A worker thread responsible for reading sync messages off the queue and writing them to the appropriate node's channel.
 
Field Summary
protected  java.util.concurrent.ExecutorService bossExecutor
          ExecutorService used for netty boss threads
protected  org.jboss.netty.bootstrap.ClientBootstrap clientBootstrap
          Netty ClientBootstrap used for creating client connections
static int CONNECT_TIMEOUT
          Connect timeout for client connections
protected  java.util.HashMap<java.lang.Short,RPCService.NodeConnection> connections
          Node connections
protected  IDebugCounterService debugCounter
          Debug counter service
protected static org.slf4j.Logger logger
           
protected static int MAX_PENDING_MESSAGES
          The maximum number of outstanding pending messages for messages that use message windows
protected  java.util.concurrent.ConcurrentHashMap<java.lang.Short,RPCService.MessageWindow> messageWindows
          If we want to rate-limit certain types of messages, we can do so by limiting the overall number of outstanding messages.
protected  RPCPipelineFactory pipelineFactory
          ChannelPipelineFactory for creating connections
protected  SingletonTask reconnectTask
          Task to periodically ensure that connections are active
static int SEND_BUFFER_SIZE
          Buffer size for sockets
protected  org.jboss.netty.bootstrap.ServerBootstrap serverBootstrap
          Netty ServerBootstrap used for creating server connections
protected  boolean shutDown
          true after the shutdown() method is called.
protected  boolean started
          True after the run() method is called
protected static int SYNC_MESSAGE_POOL
          Number of workers in the sync message thread pool
protected  java.util.concurrent.ExecutorService syncExecutor
          A thread pool for handling sync messages.
protected  SyncManager syncManager
          Sync manager associated with this RPC service
protected  org.jboss.netty.util.internal.LinkedTransferQueue<RPCService.NodeMessage> syncQueue
          A queue for holding sync messages that are awaiting being written to the channel.
protected  java.util.concurrent.atomic.AtomicInteger transactionId
          Transaction ID used in message headers in the RPC protocol
protected static java.util.EnumSet<MessageType> windowedTypes
           
protected  java.util.concurrent.ExecutorService workerExecutor
          ExecutorService used for netty worker threads
 
Constructor Summary
RPCService(SyncManager syncManager, IDebugCounterService debugCounter)
           
 
Method Summary
 void disconnectNode(short nodeId)
          Remove the connection from the connection registry and clean up any remaining shrapnel
protected  void doNodeConnect(Node n)
          Connect to a remote node if appropriate
 int getTransactionId()
          Get a suitable transaction ID for sending a message
 boolean isConnected(short nodeId)
          Find out if a particular node is connected
 boolean isFullyConnected()
          Check whether all links are established
 void messageAcked(MessageType type, java.lang.Short nodeId)
          Called when a message is acknowledged by a remote node
protected  void nodeConnected(short nodeId, org.jboss.netty.channel.Channel channel)
          Add the node connection to the node connection map
 void run()
          Start the RPC service
 void shutdown()
          Stop the RPC service
protected  void startClientConnections()
          Ensure that all client connections are active
protected  void startClients(org.jboss.netty.channel.ChannelPipelineFactory pipelineFactory)
          Connect to remote servers.
protected  void startServer(org.jboss.netty.channel.ChannelPipelineFactory pipelineFactory)
          Start listening sockets
 boolean writeToNode(java.lang.Short nodeId, SyncMessage bsm)
          Write a message to the node specified
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

logger

protected static final org.slf4j.Logger logger

syncManager

protected SyncManager syncManager
Sync manager associated with this RPC service


debugCounter

protected IDebugCounterService debugCounter
Debug counter service


bossExecutor

protected java.util.concurrent.ExecutorService bossExecutor
ExecutorService used for netty boss threads


workerExecutor

protected java.util.concurrent.ExecutorService workerExecutor
ExecutorService used for netty worker threads


clientBootstrap

protected org.jboss.netty.bootstrap.ClientBootstrap clientBootstrap
Netty ClientBootstrap used for creating client connections


serverBootstrap

protected org.jboss.netty.bootstrap.ServerBootstrap serverBootstrap
Netty ServerBootstrap used for creating server connections


pipelineFactory

protected RPCPipelineFactory pipelineFactory
ChannelPipelineFactory for creating connections


connections

protected java.util.HashMap<java.lang.Short,RPCService.NodeConnection> connections
Node connections


transactionId

protected java.util.concurrent.atomic.AtomicInteger transactionId
Transaction ID used in message headers in the RPC protocol


SEND_BUFFER_SIZE

public static final int SEND_BUFFER_SIZE
Buffer size for sockets

See Also:
Constant Field Values

CONNECT_TIMEOUT

public static final int CONNECT_TIMEOUT
Connect timeout for client connections

See Also:
Constant Field Values

started

protected boolean started
True after the run() method is called


shutDown

protected volatile boolean shutDown
true after the shutdown() method is called.


reconnectTask

protected SingletonTask reconnectTask
Task to periodically ensure that connections are active


messageWindows

protected java.util.concurrent.ConcurrentHashMap<java.lang.Short,RPCService.MessageWindow> messageWindows
If we want to rate-limit certain types of messages, we can do so by limiting the overall number of outstanding messages. The number of such messages will be stored in the RPCService.MessageWindow


windowedTypes

protected static final java.util.EnumSet<MessageType> windowedTypes

syncExecutor

protected java.util.concurrent.ExecutorService syncExecutor
A thread pool for handling sync messages. These messages require a separate pool since writing to the node can be a blocking operation while waiting for window capacity, and blocking the I/O threads could lead to deadlock

See Also:
RPCService.SyncMessageWorker

syncQueue

protected org.jboss.netty.util.internal.LinkedTransferQueue<RPCService.NodeMessage> syncQueue
A queue for holding sync messages that are awaiting being written to the channel.

See Also:
RPCService.SyncMessageWorker

SYNC_MESSAGE_POOL

protected static final int SYNC_MESSAGE_POOL
Number of workers in the sync message thread pool

See Also:
Constant Field Values

MAX_PENDING_MESSAGES

protected static final int MAX_PENDING_MESSAGES
The maximum number of outstanding pending messages for messages that use message windows

See Also:
Constant Field Values
Constructor Detail

RPCService

public RPCService(SyncManager syncManager,
                  IDebugCounterService debugCounter)
Method Detail

run

public void run()
Start the RPC service


shutdown

public void shutdown()
Stop the RPC service


getTransactionId

public int getTransactionId()
Get a suitable transaction ID for sending a message

Returns:
the unique transaction iD

writeToNode

public boolean writeToNode(java.lang.Short nodeId,
                           SyncMessage bsm)
                    throws java.lang.InterruptedException
Write a message to the node specified

Parameters:
nodeId - the node ID
bsm - the message to write
Returns:
true if the message was actually written to the channel. Note this is not the same as having been sent to the other node.
Throws:
java.lang.InterruptedException

disconnectNode

public void disconnectNode(short nodeId)
Remove the connection from the connection registry and clean up any remaining shrapnel

Parameters:
nodeId -

isFullyConnected

public boolean isFullyConnected()
Check whether all links are established

Returns:

isConnected

public boolean isConnected(short nodeId)
Find out if a particular node is connected

Parameters:
nodeId -
Returns:
true if the node is connected

messageAcked

public void messageAcked(MessageType type,
                         java.lang.Short nodeId)
Called when a message is acknowledged by a remote node

Parameters:
type - the message type
nodeId - the remote node

startServer

protected void startServer(org.jboss.netty.channel.ChannelPipelineFactory pipelineFactory)
Start listening sockets


nodeConnected

protected void nodeConnected(short nodeId,
                             org.jboss.netty.channel.Channel channel)
Add the node connection to the node connection map

Parameters:
nodeId - the node ID for the channel
channel - the new channel

startClients

protected void startClients(org.jboss.netty.channel.ChannelPipelineFactory pipelineFactory)
Connect to remote servers. We'll initiate the connection to any nodes with a lower ID so that there will be a single connection between each pair of nodes which we'll use symmetrically


doNodeConnect

protected void doNodeConnect(Node n)
Connect to a remote node if appropriate

Parameters:
bootstrap - the client bootstrap object
n - the node to connect to

startClientConnections

protected void startClientConnections()
Ensure that all client connections are active