public class SocketServerTCPIP extends Thread implements SocketServerInterface
TCPChannel
object representing the channel. The constructor automatically
runs itself as a thread. Objects of this class will Accept, Read, and Write MLMessage
objects, and
maintain the socket connections until they are broken at the far end.Copyright: Copyright 2003-2014, Knowledge Science Group, University of Calgary. Permission to use, copy, modify, distribute and sell this software and its documentation for any purpose is hereby granted without fee, provided that the above copyright notice appear in all copies and that both that copyright notice and this permission notice appear in supporting documentation. The Knowledge Science Group makes no representations about the suitability of this software for any purpose. It is provided "as is" without express or implied warranty.
Thread.State, Thread.UncaughtExceptionHandler
Modifier and Type | Field and Description |
---|---|
private static Vector<SocketServerTCPIP> |
all
All the socket servers in the process
|
private static Charset |
ENCODING
Used in
#readAMessage(SocketChannel) and writeAMessage(SelectionKey)
to decode/encode the incoming message buffer. |
(package private) static int |
HEADER_SIZE
The size of header.
|
private InetAddress |
IPaddress
the local IP address of
socket |
private int |
IPport
The port number the
socket is listening on |
private static int |
numberOfPortsToTry
When the IP port given in negative, it means to try ports n++ until an open port is found, this limits the number of tries
|
private LimitedSet<MLMessage> |
recentMessages
A buffer to remember recently received messages to detect possible endless loops of resending messages.
|
private boolean |
recoveryMode
True if the we are lost in the channel and scanning for a valid header.
|
private ConcurrentLinkedQueue<Pair<SocketChannel,TCPChannel>> |
registerQueue
Used by
registerChannel(SocketChannel, TCPChannel) and run()
to safely do new registrations without generating an interrupt on the socket listener thread. |
private Selector |
selector
The selector object used in the
run() method |
private static int |
SIGNATURE
This is a 4-byte signature to be prepended to a message send/receive to identify the start of a message.
|
private ServerSocket |
socket
The server socket listing on port
IPport |
private static boolean |
useLoopbackAsFallback
If we fail to get a local host address, setting this to true will use the loopback address instead.
|
MAX_PRIORITY, MIN_PRIORITY, NORM_PRIORITY
Constructor and Description |
---|
SocketServerTCPIP(int IPport)
Constructs a new SocketServer listening on the specified port.
|
Modifier and Type | Method and Description |
---|---|
private TCPChannel |
addConnection(URLDescriptor url,
SelectionKey key,
InfiniteReadWriteByteBuffer initBuffer)
Creates a new
TCPChannel object, and links it to url. |
void |
closePort()
Method to close the socket server's open port and return it to the system.
|
void |
exit()
This method is ignored.
|
static SocketServerTCPIP[] |
get() |
String |
getHostAddress()
Retrieves the address of the listening socket
|
int |
getLocalPort()
Retrieves the port of the listening socket
|
String |
getLocalPortAsString()
Retrieves the port of the listening socket
|
void |
queueAWrite(SelectionKey key)
External threads may call this method to queue a write on the channel of key.
|
private boolean |
queueMessage(MLMessage message)
Queue a message that has been read to a (probably) local agent.
|
private int |
readABlockHeader(byte[] buffer)
Read a header, checking the signature (first 4bytes) and returning the size (in bytes)
of the body that follows the header.
|
private int |
readData(SelectionKey key)
Attempt to read an arbitrary number of bytes from key.channel(), and places it
in key.attachment().buffer, where key.attachment() is a
TCPChannel object
or a InfiniteReadWriteByteBuffer . |
private void |
readMessages(SelectionKey key,
Vector<SocketChannel> orphanedChannels)
Does a read from the key's TCP/IP channel and tries to interpret messages
and queues them to the relevant agent's event queue.
|
private static String |
readyOpsToString(int code)
Utility method to print out the OP codes in a more legible manner in the log messages.
|
private void |
recover(InfiniteReadWriteByteBufferInterface buffer)
Attempt to recover from corrupt data by scanning for a legal header in the
byte buffer.
|
(package private) void |
registerChannel(SocketChannel channel,
TCPChannel tcpChannel)
External threads may call this method to register the channel.
|
private void |
removeConnection(SelectionKey key)
Removed the URL as an attachment from the key, cancels the key,
and removes the key/channel from the URL.
|
void |
run()
The runnable obligation for this thread.
|
private void |
setPort(int IPport)
Set the port to listen on.
|
private void |
writeAMessage(SelectionKey key)
Called when the key.channel is ready for writing, but there may be nothing there to write.
|
activeCount, checkAccess, clone, countStackFrames, currentThread, destroy, dumpStack, enumerate, getAllStackTraces, getContextClassLoader, getDefaultUncaughtExceptionHandler, getId, getName, getPriority, getStackTrace, getState, getThreadGroup, getUncaughtExceptionHandler, holdsLock, interrupt, interrupted, isAlive, isDaemon, isInterrupted, join, join, join, resume, setContextClassLoader, setDaemon, setDefaultUncaughtExceptionHandler, setName, setPriority, setUncaughtExceptionHandler, sleep, sleep, start, stop, stop, suspend, toString, yield
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
interrupt, isAlive
private static final int numberOfPortsToTry
private static final boolean useLoopbackAsFallback
private static final Charset ENCODING
#readAMessage(SocketChannel)
and writeAMessage(SelectionKey)
to decode/encode the incoming message buffer.private static final int SIGNATURE
writeAMessage(SelectionKey)
,
#readABlockHeader(SocketChannel)
,
Constant Field Valuesprivate static Vector<SocketServerTCPIP> all
static final int HEADER_SIZE
readABlockHeader(byte[])
,
Constant Field Valuesprivate ServerSocket socket
IPport
private InetAddress IPaddress
socket
private int IPport
socket
is listening onprivate boolean recoveryMode
private ConcurrentLinkedQueue<Pair<SocketChannel,TCPChannel>> registerQueue
registerChannel(SocketChannel, TCPChannel)
and run()
to safely do new registrations without generating an interrupt on the socket listener thread.private LimitedSet<MLMessage> recentMessages
queueMessage(MLMessage)
public SocketServerTCPIP(int IPport) throws IPSocketException
numberOfPortsToTry
. When using this
constructor the thread is started.IPport
- the port to listen on. A negative number indicates to try other ports.IPSocketException
- if (IPport is negative) a valid port in the range IPport -> IPport+numberOfPortsToTry
is not
found, or if (IPport is positive) IPport is not available.public static SocketServerTCPIP[] get()
private void setPort(int IPport) throws IOException
IPport
- the port to listen onIOException
- when either 'e' is an instance of IOException or
when getLocalHost() fails. <-- In any of these cases, the port
will fail to listen.public String getHostAddress()
getHostAddress
in interface SocketServerInterface
public String getLocalPortAsString()
getLocalPortAsString
in interface SocketServerInterface
public int getLocalPort()
getLocalPort
in interface SocketServerInterface
public void run()
selector
to ACCEPT
connections to channels, as well as to READ and WRITE through the selected channels.void registerChannel(SocketChannel channel, TCPChannel tcpChannel)
channel
- tcpChannel
- public void queueAWrite(SelectionKey key)
key
- private void writeAMessage(SelectionKey key) throws ClosedChannelException
TCPChannel
are written, and
the key's interest is set back to OP_READ.
The message actually send on the channel is in the following format:
SIGNATURE
constant of value -1498131167.
ENCODING
of
registerChannel(SocketChannel, TCPChannel)
and queueAWrite(SelectionKey)
.key
- ClosedChannelException
private void removeConnection(SelectionKey key)
registerChannel(SocketChannel, TCPChannel)
and queueAWrite(SelectionKey)
.key
- private void recover(InfiniteReadWriteByteBufferInterface buffer)
HEADER_SIZE
(8) bytes left in the buffer. This behaviour allows
a recovery to span multiple channel reads.This is a VERY inefficient implementation of recover(), but fairly robust.
buffer
- private void readMessages(SelectionKey key, Vector<SocketChannel> orphanedChannels) throws ClosedChannelException
URLDescriptor
.key
- orphanedChannels
- ClosedChannelException
private int readData(SelectionKey key) throws IOException
TCPChannel
object
or a InfiniteReadWriteByteBuffer
.the
- key containing the channel and attachment objects.IOException
private int readABlockHeader(byte[] buffer) throws SignatureException
buffer
- The bytes to read; must be HEADER_SIZE
(8).SignatureException
- if the signature in the header doesn't match.private boolean queueMessage(MLMessage message) throws ClosedChannelException
toURL
- message
- ClosedChannelException
private TCPChannel addConnection(URLDescriptor url, SelectionKey key, InfiniteReadWriteByteBuffer initBuffer) throws IOException
TCPChannel
object, and links it to url.
Also logs the creation through Trace.log(String, String)
using the "msg" tag.
This method is synchronized with to avoid interrupts (which would close the channel) from
registerChannel(SocketChannel, TCPChannel)
and queueAWrite(SelectionKey)
.url
- The URL under which to register the new connection. (The URL holds the connection object, since
it is unique to the agent, whether local or remote.)channel
- The channel.TCPChannel
object.IOException
- if TCPChannel#TCPChannel(URLDescriptor, SelectionKey, SocketServerInterface)
throws an exception.public void closePort()
closePort
in interface SocketServerInterface
public void exit()
exit
in interface SocketServerInterface
SocketServerInterface.exit()
private static String readyOpsToString(int code)
code
- The OP code