Basic RPCs in Node.js with a Java Backend
The Node.js people provide a really nice library for AMQP interactions in Java, with really well-developed remote procedure call (RPC) libraries that work wonderfully if you’re using Java on all ends of your project. Unfortunately I’m not, so they are useless to me, and too complex for me to care about enough to implement in another language. My current project involves a Node.js frontend that handles API requests, and farries off the heavy-lifting to Java backend services when necessary. With that in mind, and keeping with the idea of simple, clean implementations, which has been really popular theme within the Javascript community (and a damn good one), I set out to make an RPC library that wasn’t quite as complex.
At 30,000 Feet
I’ll recommend taking a look at the sixth tutorial in the RabbitMQ documentation for a really nice introduction to the mechanics of a RPC request. I’ll shamelessly steal their diagram and present it below:

The sequence looks something like this:
- The client, upon connecting to the broker server, creates a queue with a server-generated name for receiving the eventual replies to RPCs. This queue name is stored in a variable called
responseQueue, and a receiving function is bound to it to handle a response from a RPC. We also create a field calledcorrelationIdto store an incrementing counter, which allows us to later match up requests with responses. - When a RPC request needs to be made a message is published to a request queue, in the message headers we set the
replyToandcorrelationIdparameters, using the established values above. We store a callback function in a map, keyed with the currentcorrelationId, and also set a timeout timer to ensure that an action is taken if the RPC request isn’t processed in time. - A server bound to the
requestQueuereceived the request, does any necessary processing, constructs an appropriate response to be sent to theresponseQueue, which has been specified in thereplyToheader, and finally acknowledges receipt of the request. - Assuming the response is received in time, the client will finally look up the
correlationIdin the map of stored callbacks, disable the timeout timer, and finally fire off the callback function
Node.js couples really nicely with this model, there is very little impedance mismatch between this ideal model and the typical callback pattern observed in Node, so implementation is really straight forward. Let’s take a look at how this is put together.
Connecting in Node to a RabbitMQ Server
First thing is first, let’s create a connection. I’ll be using the excellent node-amqp library for communication. Setting up a connection to a RabbitMQ broker involves a function with a ready callback:
var responseQueue = null;
var correlationId = 0;
var rpcRequestMap = {};
var connection = null;
function connect() {
// Setup the connection object, with associated callbacks for connection events.
connection = amqp.createConnection();
connection.on('ready', function() {
console.log('message.connect: ampq connection established');
// by not specifying a queue name, the server will assign us one randomly
// by specifying the exclusive option we ensure the queue will be cleaned up
// upon application exit
connection.queue('', {exclusive: true}, function(queue) {
console.log('message.connect: rpc queue created: ' + queue.name);
queue.subscribe(handleRpcResponse);
responseQueue = queue;
});
});
connection.on('error', function() {
console.log('message.connect: connection error');
});
}
The node-amqp library provides default connection parameters for a standard RabbitMQ installation. You’ll notice that upon creating the queue we call the subscribe event with a callback function. We’ll come back to that later, first let’s send a request off to an RPC server:
function doRpc(requestQueue, payload, callback) {
var thisId = correlationId;
correlationId = correlationId + 1;
// setup the object in the callback map for
// storing our callback function and the unique timer id
// node assigns
rpcRequestMap[thisId] = {};
rpcRequestMap[thisId].callback = callback;
// setTimeout is part of the node core and will return
// a unique timer handle that we can use to control
// the created timer. we set up this timer in case the
// rpc server isn't running or has crashed
rpcRequestMap[thisId].timer = setTimeout(function() {
console.log('rpc timeout');
var fn = rpcRequestMap[thisId].callback;
delete rpcRequestMap[thisId];
fn('rpc timeout', null);
}, 5000);
client.publish(requestQueue, payload,
{replyTo: rpcQueue.name, 'correlationId': String(thisId), mandatory: true});
console.log('message.doRpc: message published, requestMap binding created, id:' +
String(thisId));
}
This function takes in a set of function arguments in the payload variable, a request queue name, which can be thought of as the RPC function name, and a callback function, which must follow the typical Node pattern of function(err, data). This will send off the request to the broker, which, assuming a RPC server is running, will deliver the message.
Take notice of the implied and explicit queue and message parameters. Out queue is setup with the exclusive parameter defined, ensuring that only this instance of our client can consume messages from it, and ensuring that it will be deleted when the connection is closed. Our messages are sent without persistence flags and with the mandatory flag set. If the RPC server is unavailable it’s unlikely we’d want the message to be processed, so this allows the broker to drop it instead.
Using the official Java API for Rabbit, let’s build the server now:
package fusao.tangifo.backend;
import java.io.IOException;
import com.rabbitmq.client.*;
public class Processor {
public static void main(String [] args) {
System.out.println("Processor: initializing");
// Our main AMQP connection, we'll open
// a channel per thread later with a threadpool.
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
// Let's connect and setup our basic queues.
System.out.println("connecting to AMQP server...");
Connection conn = null;
final Channel channel;
try {
conn = factory.newConnection();
channel = conn.createChannel();
} catch (IOException e) {
System.out.println("failed to create channel or connect to server");
e.printStackTrace();
return;
}
System.out.println("connected.");
try {
// Setup the queue, if it's not already declared this will
// create it.
// durable - false
// exclusive - false
// autoDelete - true
// arguments - none
channel.queueDeclare("image", false, false, true, null);
// Add a callback for when messages arrive at the queue
// autoAck - false
channel.basicConsume("image", false,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String correlationId = properties.getCorrelationId();
String responseQueue = properties.getReplyTo();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body);
System.out.println("message received");
System.out.println("correlationId: " +
correlationId +
" responseQueue: " +
responseQueue);
System.out.println(message);
AMQP.BasicProperties b = (new AMQP.BasicProperties.Builder())
.correlationId(correlationId)
.build();
channel.basicPublish("", responseQueue, b, "{}".getBytes("UTF-8"));
channel.basicAck(deliveryTag, false);
}
});
} catch (IOException e) {
e.printStackTrace();
System.out.println("Something went horribly wrong.");
return;
}
}
}
The Java implementation isn’t terribly exciting, take a look at where we extract the correlationId and responseQueue from in the DefaultConsumer delivery handler. We publish the message to the default exchange, and just serialize an empty JSON object.
This server will display the parameters to the console, and issue a response. The final piece of the puzzle is the response handler in Node:
var handleRpcResponse = function (message, headers, deliveryInfo) {
console.log(headers);
console.log(deliveryInfo);
if (!deliveryInfo.hasOwnProperty('correlationId') ||
!rpcRequestMap.hasOwnProperty(deliveryInfo.correlationId) ||
rpcRequestMap[deliveryInfo.correlationId] === null) {
console.log('message.handleRpcResponse: stray rpc message received');
return;
}
var thisId = deliveryInfo.correlationId;
clearTimeout(rpcRequestMap[thisId].timer);
var cb = rpcRequestMap[thisId].callback;
delete rpcRequestMap[thisId];
cb(null, message);
};
We delete the map item once we’ve received the response, call the callback, and log some debug parameters to the console.
So there you have it, this is a really trimmed down version of a RPC pattern for Node.js and Java. I’ve been using it in testing for a few days now with good results to sling JSON requests back and forth between my frontend and backend server. I really like Node.js for building RESTful APIs, and with a solid messaging layer and backend it forms a really nice stack that has a fast response time, and can scale horizontally with a nice decoupling between heavy-lifting backend services, and the web-facing frontend.
Hi
Nice code. It is better than the previous samples which run on an endless while loop server side.
I think you have an error in the doRPC function. you have the line:
client.publish(requestQueue, payload,
{replyTo: rpcQueue.name, ‘correlationId’: String(thisId), mandatory: true});
You have not defined the client or rpcQueue
Otherwise looks great
John
John
28 May 12 at 7:51 am
Hi,
looks quite nice. I’m currently looking for a connection between Java and Node.js, because i’ve got some stuff to do with java while i’ve got a frontend which should support push notifications (using socket.io there because i did not found any other implementation with various fallbacks). This here seems to be the best variant to connect java and node. Do you have any other suggestions how to connect them? The best other variant i found so far was with plain sockets.
Keep up the good work!
Denis
Denis
30 May 12 at 7:48 am
Hi,
thanks for showing how to use node.js with a java backend with rabbitmq. It looks great!
I am wondering if it is necessary to have a layer of rabbitmq as a bridge to the java backend. I wanted to build something similar but I just don’t feel like maintaining multiple layers of different applications just to find a solution.
I chanced upon a npm that could directly bridge between nodejs and java, but I’m not clear on how to use it, even with the examples given:
https://github.com/nearinfinity/node-java
Any other suggestions of how to bridge between nodejs and java backend?
Thanks!
Jonathan
Jonathan
15 Oct 12 at 5:53 am