This is the official Java language binding for the Iris cloud messaging framework. Version v1
of the binding is compatible with Iris v0.3.0
and newer.
If you are unfamiliar with Iris, please read the next introductory section. It contains a short summary, as well as some valuable pointers on where you can discover more.
Iris is an attempt at bringing the simplicity and elegance of cloud computing to the application layer. Consumer clouds provide unlimited virtual machines at the click of a button, but leaves it to developer to wire them together. Iris ensures that you can forget about networking challenges and instead focus on solving your own domain problems.
It is a completely decentralized messaging solution for simplifying the design and implementation of cloud services. Among others, Iris features zero-configuration (i.e. start it up and it will do its magic), semantic addressing (i.e. application use textual names to address each other), clusters as units (i.e. automatic load balancing between apps of the same name) and perfect secrecy (i.e. all network traffic is encrypted).
You can find further infos on the Iris website and details of the above features in the core concepts section of the book of Iris. For the scientifically inclined, a small collection of papers is also available featuring Iris. Slides and videos of previously given public presentations are published in the talks page.
There is a growing community on Twitter @iriscmf, Google groups project-iris and GitHub project-iris.
To get the package, add the following Maven dependency (details for various build systems):
- Group ID: com.karalabe.iris
- Artifact ID: iris
- Version: 1.0.0
To import this package, add the following line to your code:
import com.karalabe.iris.*;
Iris uses a relaying architecture, where client applications do not communicate directly with one another, but instead delegate all messaging operations to a local relay process responsible for transferring the messages to the correct destinations. The first step hence to using Iris through any binding is setting up the local relay node. You can find detailed infos in the Run, Forrest, Run section of the book of Iris, but a very simple way would be to start a developer node.
> iris -dev
Entering developer mode
Generating random RSA key... done.
Generating random network name... done.
2014/06/13 18:13:47 main: booting iris overlay...
2014/06/13 18:13:47 scribe: booting with id 369650985814.
2014/06/13 18:13:57 main: iris overlay converged with 0 remote connections.
2014/06/13 18:13:57 main: booting relay service...
2014/06/13 18:13:57 main: iris successfully booted, listening on port 55555.
Since it generates random credentials, a developer node will not be able to connect with other remote nodes in the network. However, it provides a quick solution to start developing without needing to configure a network name and associated access key. Should you wish to interconnect multiple nodes, please provide the -net
and -rsa
flags.
After successfully booting, the relay opens a local TCP endpoint (port 55555
by default, configurable using -port
) through which arbitrarily many entities may attach. Each connecting entity may also decide whether it becomes a simple client only consuming the services provided by other participants, or a full fledged service, also making functionality available to others for consumption.
Connecting as a client can be done trivially by invoking new Connection
with the port number of the local relay's client endpoint. After the attachment is completed, a Connection
instance is returned through which messaging can begin. A client cannot accept inbound requests, broadcasts and tunnels, only initiate them.
// Connect to the local relay (error handling omitted)
final Connection conn = new Connection(55555);
// Disconnect from the local relay
conn.close();
To provide functionality for consumption, an entity needs to register as a service. This is slightly more involved, as beside initiating a registration request, it also needs to specify a callback handler to process inbound events. First, the callback handler needs to implement the ServiceHandler
interface. After creating the handler, registration can commence by invoking new Service
with the port number of the local relay's client endpoint; sub-service cluster this entity will join as a member; handler itself to process inbound messages and an optional resource cap.
// Implement all the methods defined by ServiceHandler (optional, defaults provided)
static class EchoHandler implements ServiceHandler {
@Override public void init(Connection connection) throws InitializationException { }
@Override public void handleBroadcast(byte[] message) { }
@Override public byte[] handleRequest(byte[] request) throws RemoteException { return request; }
@Override public void handleTunnel(Tunnel tunnel) { }
@Override public void handleDrop(Exception reason) { }
}
public static void main(String[] args) throws Exception {
// Register a new service to the relay (error handling omitted)
final Service service = new Service(55555, "echo", new EchoHandler());
// Unregister the service
service.close();
}
Upon successful registration, Iris invokes the handler's init
method with the live Connection
object - the service's client connection - through which the service itself can initiate outbound requests. The init
is called only once and is synchronized before any other handler method is invoked.
Iris supports four messaging schemes: request/reply, broadcast, tunnel and publish/subscribe. The first three schemes always target a specific cluster: send a request to one member of a cluster and wait for the reply; broadcast a message to all members of a cluster; open a streamed, ordered and throttled communication tunnel to one member of a cluster. The publish/subscribe is similar to broadcast, but any member of the network may subscribe to the same topic, hence breaking cluster boundaries.
<img src="https://dl.dropboxusercontent.com/u/10435909/Iris/messaging_schemes.png" style="height: 175px; display: block; margin-left: auto; margin-right: auto;" >
Presenting each primitive is out of scope, but for illustrative purposes the request/reply was included. Given the echo service registered above, we can send it requests and wait for replies through any client connection. Iris will automatically locate, route and load balanced between all services registered under the addressed name.
final byte[] request = "Some request binary".getBytes();
try {
final byte[] reply = conn.request("echo", request, 1000);
System.out.println("Reply arrived: " + new String(reply));
} catch (TimeoutException | RemoteException e) {
System.out.println("Failed to execute request: " + e.getMessage());
}
The binding uses the idiomatic Java error handling mechanisms of throwing checked exceptions whenever a failure occurs. However, there are a few common cases that need to be individually checkable, hence a few special exception types have been introduced.
Many operations - such as requests and tunnels - can time out. To allow checking for this particular failure, Iris throws a TimeoutException
in such scenarios. Similarly, connections, services and tunnels may fail, in the case of which all pending operations terminate with a ClosedException
.
Additionally, the requests/reply pattern supports sending back an error instead of a reply to the caller. To enable the originating node to check whether a request failed locally or remotely, all remote errors are wrapped in a RemoteException
.
try {
conn.request("echo", request, 1000);
} catch (TimeoutException e) {
// Request timed out
} catch (ClosedException e) {
// Connection terminated
} catch (RemoteException e) {
// Request failed remotely
} catch (IOException e) {
// Requesting failed locally
}
Lastly, if during the initialization of a registered service - while the init
callback is running - the user wishes to abort the registration, he should throw an InitializationException
To prevent the network from overwhelming an attached process, the binding places thread and memory limits on the broadcasts/requests inbound to a registered service as well as on the events received by a topic subscription. The thread limit defines the concurrent processing allowance, whereas the memory limit the maximal length of the pending queue.
The default values - listed below - can be overridden during service registration and topic subscription via ServiceLimits
and TopicLimits
. Any unset fields will default to the preset ones.
// Default limits of the threading and memory usage of a registered service.
public class ServiceLimits {
public int broadcastThreads = 4 * Runtime.getRuntime().availableProcessors();
public int broadcastMemory = 64 * 1024 * 1024;
public int requestThreads = 4 * Runtime.getRuntime().availableProcessors();
public int requestMemory = 64 * 1024 * 1024;
}
// Default limits of the threading and memory usage of a subscription.
public class TopicLimits {
public int eventThreads = 4 * Runtime.getRuntime().availableProcessors();
public int eventMemory = 64 * 1024 * 1024;
}
There is also a sanity limit on the input buffer of a tunnel, but it is not exposed through the API as tunnels are meant as structural primitives, not sensitive to load. This may change in the future.
For logging purposes, the Java binding uses the Simple Logging Facade for Java (version1.7.7). This library is only an abstract façade defining the API, underneath which every developer can place her own preferred logger framework. For demonstrative purposes, we present the use with the LOGBack project, but others are analogous.
The suggested configuration is to collect INFO level logs and print them to stdout. This level allows tracking life-cycle events such as client and service attachments, topic subscriptions and tunnel establishments. Further log entries can be requested by lowering the level to DEBUG, effectively printing all messages passing through the binding.
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %-5level %-40msg %mdc%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
Note, that Iris attaches quite a lot of contextual attributes (Mapped Diagnostic Context) to each log entry, enabling detailed tracking of events. These are printed - as can be seen above - with the help of the %mdc
field in the log output pattern. Additionally, these MDCs are automatically injected into the logger context whenever a callback method of the ServiceHandler
or TopicHandler
is invoked.
@Override public byte[] handleRequest(byte[] request) throws RemoteException {
LoggerFactory.getLogger("My Logger").info("My info log entry");
//...
}
11:48:49.504 INFO My info log entry remote_request=0, service=1
In certain cases, it might be desirable to have access to contextual attributes outside the callback functions (e.g. in the case of a simple client connection, there are no callbacks). For such scenarios, context injection can be specifically requested through an embedded logger (just don't forget to unload afterwards).
try (final Connection conn = new Connection(55555)) {
final Logger logger = LoggerFactory.getLogger("My Logger");
conn.logger().loadContext();
logger.debug("Debug entry, hidden by default");
logger.info("Info entry, client context included");
logger.warn("Warning entry");
logger.error("Critical entry");
conn.logger().unloadContext();
}
As you can see below, all log entries have been automatically tagged with the client
attribute, set to the id of the current connection. Since the default log level is INFO, the logger.debug
invocation has no effect. Additionally, arbitrarily many key-value pairs may be included in the entry using the SLF4J MDC API.
12:20:14.654 INFO Connecting new client client=1, relay_port=55555
12:20:14.691 INFO Client connection established client=1
12:20:14.692 INFO Info entry, client context included client=1
12:20:14.692 WARN Warning entry client=1
12:20:14.692 ERROR Critical entry client=1
12:20:14.692 INFO Detaching from relay client=1
12:20:14.692 INFO Successfully detached client=1
You can find a teaser presentation, touching on all the key features of the library through a handful of challenges and their solutions. The recommended version is the playground, containing modifiable and executable code snippets, but a read only one is also available.
To run the tests, a local Iris node needs to be listening on port 55555
. Please see the first paragraph of the Quickstart section if you need help starting a local Iris node in developer mode.
Although you could run the tests through your favorite IDE, they can also be executed from the command line through the bundled Gradle build system. A detailed report will be generated into build/reports/tests/index.html
.
$ ./gradlew test
[...]
com.kalralabe.iris.TunnelInterruptTest > interruptRemoteSend PASSED
com.kalralabe.iris.TunnelInterruptTest > interruptLocalReceive PASSED
com.kalralabe.iris.TunnelInterruptTest > interruptLocalSend PASSED
com.kalralabe.iris.TunnelInterruptTest > interruptRemoteReceive PASSED
com.karalabe.iris.BroadcastTest > concurrentBroadcasts PASSED
com.karalabe.iris.BroadcastTest > threadLimiting PASSED
com.karalabe.iris.BroadcastTest > memoryLimiting PASSED
com.karalabe.iris.BroadcastTest > terminate PASSED
com.karalabe.iris.HandshakeTest > connection PASSED
com.karalabe.iris.HandshakeTest > service PASSED
com.karalabe.iris.RequestTest > timeout PASSED
com.karalabe.iris.RequestTest > expiration PASSED
com.karalabe.iris.RequestTest > fail PASSED
com.karalabe.iris.RequestTest > threadLimiting PASSED
com.karalabe.iris.RequestTest > memoryLimiting PASSED
com.karalabe.iris.RequestTest > concurrentRequests PASSED
com.karalabe.iris.RequestTest > terminate PASSED
com.karalabe.iris.PublishTest > concurrentPublishes PASSED
com.karalabe.iris.PublishTest > threadLimiting PASSED
com.karalabe.iris.PublishTest > memoryLimiting PASSED
com.karalabe.iris.PublishTest > terminate PASSED
com.karalabe.iris.TunnelGeneralTest > timeout PASSED
com.karalabe.iris.TunnelGeneralTest > concurrentTunnels PASSED
com.karalabe.iris.TunnelGeneralTest > overload PASSED
com.karalabe.iris.TunnelGeneralTest > chunking PASSED
com.karalabe.iris.common.BoundedThreadPoolTest > schedule PASSED
com.karalabe.iris.common.BoundedThreadPoolTest > capacity PASSED
BUILD SUCCESSFUL
Total time: 2 mins 49.985 secs
Benchmarking can be run similarly through Gradle, but since it requires a few additional dependencies and hacks, a separate build file was provided for it. Note, that even on a powerful machine, the benchmarks can easily take 10-15 minutes to complete.
$ ./gradlew -b benchmark.gradle
[...]
Benchmark (threads) Mode Samples Score Score error Units
c.k.i.BroadcastLatencyBenchmark.timeLatency N/A avgt 10 44323.122 825.059 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 1 avgt 10 5710.532 49.099 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 2 avgt 10 5516.166 61.866 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 4 avgt 10 5279.740 47.308 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 8 avgt 10 5252.124 40.061 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 16 avgt 10 5246.738 43.543 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 32 avgt 10 5255.368 41.310 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 64 avgt 10 5255.913 66.850 ns/op
c.k.i.BroadcastThroughputBenchmark.timeThroughput 128 avgt 10 5256.317 45.804 ns/op
c.k.i.PublishLatencyBenchmark.timeLatency N/A avgt 10 44902.253 691.909 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 1 avgt 10 5772.169 77.503 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 2 avgt 10 5554.562 44.429 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 4 avgt 10 5356.768 55.335 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 8 avgt 10 5320.963 40.598 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 16 avgt 10 5311.546 59.996 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 32 avgt 10 5326.008 54.033 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 64 avgt 10 5343.181 45.880 ns/op
c.k.i.PublishThroughputBenchmark.timeThroughput 128 avgt 10 5357.519 45.299 ns/op
c.k.i.RequestLatencyBenchmark.timeLatency N/A avgt 10 87269.227 953.284 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 1 avgt 10 89819.877 764.715 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 2 avgt 10 55718.369 319.352 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 4 avgt 10 38359.248 341.198 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 8 avgt 10 29429.851 375.620 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 16 avgt 10 23240.808 428.840 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 32 avgt 10 19902.494 322.047 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 64 avgt 10 19181.454 296.732 ns/op
c.k.i.RequestThroughputBenchmark.timeThroughput 128 avgt 10 19313.188 274.534 ns/op
c.k.i.TunnelLatencyBenchmark.timeLatency N/A avgt 10 93759.745 1179.995 ns/op
c.k.i.TunnelThroughputBenchmark.timeThroughput N/A avgt 10 14408.706 401.866 ns/op
Currently my development aims are to stabilize the project and its language bindings. Hence, although I'm open and very happy for any and all contributions, the most valuable ones are tests, benchmarks and actual binding usage to reach a high enough quality.
Due to the already significant complexity of the project (Iris in general), I kindly ask anyone willing to pinch in to first file an issue with their plans to achieve a best possible integration :).
Additionally, to prevent copyright disputes and such, a signed contributor license agreement is required to be on file before any material can be accepted into the official repositories. These can be filled online via either the Individual Contributor License Agreement or the Corporate Contributor License Agreement.