Robert A Decker Programming Repository


Notes and articles that will reduce the pain

Message Queues Over Websockets With JavaScript

Robert Decker - Saturday, June 18, 2016

Here I present a brief introduction to message queues and how they can be used over Websockets with Javascript and the Message Queuing Telemetry Transport (MQTT) protocol to provide notifications between multiple javascript clients.

1. Introduction

While developing web applications developers will often find the need to have actions or underlying changes by multiple clients and server applications be communicated between all clients live and without user interaction.

One simple way of doing this is by having clients poll the system for changes. However, this quickly leads to performance issues due to every client hitting the server periodically whether or not there are changes that need to be communicated.

A better solution is to use a messaging system to push action/change notifications between the clients. By using messaging a client initiating the change will push a notification to all other clients through a message broker. When a client receives a notification by another client it can then decide if it needs to access the server to retrieve updated information.

Here I provide a simple introduction to message queuing and provide a simple example of using JavaScript to send and receive messages over Websockets using the Message Queuing Telemetry Transport (MQTT) protocol. 

1.1 Broker

A message broker is simply a middleware that validates, transforms, and routes messages between clients. Each client is decoupled from the other clients and is only aware of the broker, simplifying the overall system. 

broker with clients 

fig. 1, broker with clients

Apache ActiveMQ is a powerful open source message broker that supports multiple protocols and communication channels. It has libraries in Java, JavaScript, .NET, Ruby on Rails, and many other languages. ActiveMQ is an ideal broker when you have multiple software modules written in multiple languages communicating over multiple protocols and communication channels.

1.2 Point-to-Point and Publish-Subscribe Messaging

There are two approaches to messaging: point-to-point and publish-subscribe. In point-to-point messaging a message is sent from one application (producer) to one other application (consumer) via a queue. Point-to-point messaging is best for passing a unit of work between applications.

Usually in point-to-point messaging your queues will have names that include their function and clients will subscribe to these queues to pick up the next unit of work. For example:
/application_name/version_number/function,
text_processor/1/pre_processed_a_text
text_processor/1/post_processed_a_text 

Because point-to-point messaging is queue based you are able to run any number of clients at each step in order to keep the system running at full capacity.

fig. 2, point-to-point messaging



In publish-subscribe messaging a message is sent from one application (producer) to multiple applications (subscribers) through a topic. All subscribers to a topic will receive any messages published to that topic. Publish-subscribe messaging is best used for sending logging type information and notifications of actions/changes.

fig. 3, publish-subscribe messaging



In publish-subscribe messaging your topics will have more generic names and the message itself will contain the type of action or change.
topic: /web_app/1/general
payload: {message_type=10, client_id=“web_app-13422”}
 

1.3 Message Queueing Telemetry Transport (MQTT)

Message Queueing Telemetry Transport (MQTT) is a well-established (ISO/IEC PRF 2092) publish-subscribe messaging protocol designed to be extremely simple and lightweight and to be used when a small code footprint is required. Therefore it is an ideal protocol for JavaScript clients and for Machine-to-Machine (M2M) / Internet of Things (IoT) devices. The Paho project at the Eclipse Foundation has released a JavaScript client library that uses the MQTT protocol over the WebSocket communication channel and supports all modern browsers.

Using the Paho JavaScript library you are able to connect your JavaScript clients to a message broker using a publish-subscribe messaging model.  

2. Simple JavaScript Example

 

  1. Download the latest Apache ActiveMQ here.
  2. Within the ActiveMQ directory review the file conf/activemq.xml to confirm that the ws (websocket) protocol is active and on port 61614. At this time, leave the other protocols active as well as I will be showing examples in a future post for connecting from java and ruby. The transportConnectors section should look like:
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            </transportConnectors>
    
  3. Launch ActiveMQ. You can find the appropriate launcher for your operating system in the bin/ directory within the AcitveMQ folder you downloaded in step 2.

    There is no need to do any further configuration of ActiveMQ. You can log into the admin console of your local ActiveMQ here:
    http://127.0.0.1:8161/admin/

    Default login is admin/admin but you can confirm by looking in conf/jetty-realm.properties
  4. Download the eclipse project's Paho MQTT JavaScript client mqttws31.js from here. (Scroll down to the download section and get only the js file - not all of the Paho files). I also include this file in my example files in step #5.
  5. Download my example files here and unzip the directory.
  6. Open the mqtt_example/example_mq.html webpage in two or more browser windows.

    In each window you should see a text area and some debug information like the following:

  7. Now type text into each window and view the output in the other window. Here is an example - window 1 is first and window 2 is second and I am typing in window 2.

    window 1:

    window 2:
     

    Window 2 shows the payload before being sent to the broker. Window 1 shows the 'message' field of the payload after it is received from the broker and decoded from the json payload.

 
Review the full code below and the comments to get a better understanding of what is happening. 

See http://127.0.0.1:8161/admin/topics.jsp to see your broker topic and the number of messages that have been transferred.

 

3. Conclusion

Here I presented an introduction to message queues and brokers and provided an example of using a publish-subscribe messaging system to send information between multiple JavaScript clients with the MQTT protocol over Websockets through a broker.

I chose to use ActiveMQ as the broker in my example because next I will provide additional examples in Ruby and Java which will require communication with the broker over protocols other than MQTT over Websockets, specifically STOMP and OpenWire.

Later I will also provide more detail on how this system would be configured and used in a secure production environment.

4. Full Code



HTML:
<html>
	<head>
		<script type="text/javascript">
			//<![CDATA[
			// our broker runs on our local machine and without ssl
			var mqHost = 'ws://localhost:61614/'
			var mqUseSSL = false;
			var isWebsocketsSupported = false;
			//]]>
		</script>
	</head>
	<body>
		<!-- load html content first so no appearance of delay to user -->
		
		<!-- messages from other clients appear here -->
		<div id="client_messages" border="1">
			
		</div>
		
		<!-- messages we want to send to other clients are typed here -->
		<textarea id="textarea"></textarea>
		
		<!-- a div to present debugging information -->
		<div>
			debug:
			<pre id="mq_debug"></pre>
		</div>
		
		<script type='text/javascript'>
		//<![CDATA[
			// turn on the javascript for the textarea so that it sends any changes in the textarea through our MYMQ library
			var textarea = document.getElementById("textarea");
			textarea.oninput = function() {
				if (isWebsocketsSupported) {
					MYMQ.send(textarea.value);
				}
			}
			
			// make our websockect connection
			window.onload = function() {
				if( !window.WebSocket) {
					// websockets are not support by this client
					isWebsocketsSupported = false;
				} else {
					isWebsocketsSupported = true;
					// connect
					MYMQ.connect(mqHost, mqUseSSL);
					// when disconnecting, close the connection first
					window.onbeforeunload = function() {
					    MYMQ.disconnect();
					};
				}
			}
		//]]>
		</script>
		<script type="text/javascript" src="mqttws31.js"></script>			
		<script type="text/javascript" src="example_mq.js"></script>			
	</body>
</html>

        


JavaScript:
/**
* MYMQ encapsulates communication with message queues over a Paho MQTT client. 
* Connects to a general queue for the application
* Automatically reconnects the client if disconnected
*
* public functions
*    connect: connect to the message broker
*      mqhost - the message broker host
*      useSSL - boolean for if ssl enabled
*      returns: a PAHO mqtt client
*
*    disconnect: disconnect the client from the broker
*
*    send: send a message to the broker. wraps the message into a json object before sending
*      txt - the text message to send to the other clients
*/
var MYMQ = (function() {
	
	// private vars
	var appName = "mqexample"; // to separate different application topics
	var mq_v = 1; // message queue version. should only use whole numbers because '.' can be escaped by different libraries
	var client; // the mqttws client that does all of the work
	
	// we hold these vars so that we can reconnect automatically when a connection is lost
	var host; // the message broker location
	var uSSL;
	var destination; // the topic being subsbcribed to
	
	// the message types
	var TYPE_USER_SENT_MESSAGE = 10;
	
	// the json keys in the payload
	var PAYLOAD_KEY_TYPE = "type";
	var PAYLOAD_KEY_MESSAGE = "message";
	var PAYLOAD_KEY_CLIENTID = "clientid";
	
	// private functions. scroll to bottom to see the public functions
	
	// we have a private connect that is called from the public connect
	// method and is also used to reconnect automatically when 
	// connnection is lost
	function internalConnect(mqHost, useSSL) {
		host = mqHost;
		uSSL = useSSL;
		// set up our topic/destination that we subscribe to
		destination = 'jms/astra/mq/' + mq_v + '/general'; 
		
		// need to create a unique clientId for each client
		var clientId = appName + "-" + (Math.floor(Math.random() * 100000)); 
		
		debug("opening " + (uSSL ? "ssl" : "non-ssl") + " websocket to host:" + host + " as clientId:" + clientId);

		client = new Paho.MQTT.Client(host, clientId);
		
		// set our callbacks
		client.onConnect = onConnect;
		client.onMessageArrived = onMessageArrived;
		client.onConnectionLost = onConnectionLost;
		
		// now do our connection to the broker
		client.connect({
			useSSL:uSSL,
			onSuccess:onConnect, 
			onFailure:onFailure
		}); 
		return client;
	}
	
	// the client is notified when it is connected to the server. 
	// Once connected we subscribe to our topic
	function onConnect(frame) {
		debug("connected to activemq. client:" + client + " subscribing to " + destination);
		client.subscribe(destination);
	}
	
	// just print the error message
	function onFailure(failure) {
	  debug("failure: " + failure.errorMessage);
	}
	
	// the following connect codes come from mqttws.js
	//		OK: {code:0, text:"AMQJSC0000I OK."},
	//		CONNECT_TIMEOUT: {code:1, text:"AMQJSC0001E Connect timed out."},
	//		SUBSCRIBE_TIMEOUT: {code:2, text:"AMQJS0002E Subscribe timed out."}, 
	//		UNSUBSCRIBE_TIMEOUT: {code:3, text:"AMQJS0003E Unsubscribe timed out."},
	//		PING_TIMEOUT: {code:4, text:"AMQJS0004E Ping timed out."},
	//		INTERNAL_ERROR: {code:5, text:"AMQJS0005E Internal error. Error Message: {0}, Stack trace: {1}"},
	//		CONNACK_RETURNCODE: {code:6, text:"AMQJS0006E Bad Connack return code:{0} {1}."},
	//		SOCKET_ERROR: {code:7, text:"AMQJS0007E Socket error:{0}."},
	//		SOCKET_CLOSE: {code:8, text:"AMQJS0008I Socket closed."},
	//		MALFORMED_UTF: {code:9, text:"AMQJS0009E Malformed UTF data:{0} {1} {2}."},
	//		UNSUPPORTED: {code:10, text:"AMQJS0010E {0} is not supported by this browser."},
	//		INVALID_STATE: {code:11, text:"AMQJS0011E Invalid state {0}."},
	//		INVALID_TYPE: {code:12, text:"AMQJS0012E Invalid type {0} for {1}."},
	//		INVALID_ARGUMENT: {code:13, text:"AMQJS0013E Invalid argument {0} for {1}."},
	//		UNSUPPORTED_OPERATION: {code:14, text:"AMQJS0014E Unsupported operation."},
	//		INVALID_STORED_DATA: {code:15, text:"AMQJS0015E Invalid data in local storage key={0} value={1}."},
	//		INVALID_MQTT_MESSAGE_TYPE: {code:16, text:"AMQJS0016E Invalid MQTT message type {0}."},
	//		MALFORMED_UNICODE: {code:17, text:"AMQJS0017E Malformed Unicode string:{0} {1}."},
	function onConnectionLost(responseObject) {
		var errorCode = responseObject.errorCode;
		if (errorCode !== 0) {
			debug("connection lost: " + client.clientId + " errocCode:" + errorCode + " msg:" + responseObject.errorMessage);
			switch(errorCode) {
				case 5:
					/* This is probably an unsupported browser version. Don't try to reconnect.
					Provide a message to your user like:
						var msg = "Unable to connect to message queue server. You will not receive real-time automatic UI updates.";
						msg = msg + "<br/>Mininum browser versions to use this application are:";
						msg = msg + "<br/>	Internet Explorer 10";
						msg = msg + "<br/>	Firefox 21";
						msg = msg + "<br/>	Chrome 21";
						msg = msg + "<br/>	Safari 6";
						msg = msg + "<br/>	Opera 12.1";
						msg = msg + "<br/>	iOS (Safari) 6";
						msg = msg + "<br/>	Android OS 4.4";
						msg = msg + "<br/>The exact error is:" + responseObject.errorMessage;
					*/
					break;
				default:
					// reconnect to broker
					internalConnect(host, uSSL);
					break;
			}
		}
	}
	
	// this allows us to display debug logs directly on the web page
	var debug = function(str) {
		var mq_debug = document.getElementById("mq_debug")
		if (mq_debug) {
			mq_debug.insertAdjacentHTML("afterbegin", Math.floor(Date.now() / 1000) + " " + str + "\n");
		}
	};
	
	function onMessageArrived(jsonMessage) {
		debug("jsonMessage arrived:" + jsonMessage);
		try {
			// the payload comes in json format. See public send method.
			var json = JSON.parse(jsonMessage.payloadString);
			
			// ignore messages coming from our own client
			var clientid = json[PAYLOAD_KEY_CLIENTID]
			if (clientid == client.clientId) {
				debug(".onMessageArrived ignoring message from self clientId:" + clientid);
				return;
			}
			
			// find the message type and do the appropriate action based on the type
			var type = json[PAYLOAD_KEY_TYPE];
			switch(type) {
				case TYPE_USER_SENT_MESSAGE:
					// a user sent a message that we've received
					var message = json[PAYLOAD_KEY_MESSAGE];
					messageReceived(message, clientid);
					break;
				default:
					debug(".onMessageArrived cannot understand type:" + type);
					break;
			}
		} catch(err) {
		  	var str = "error processing jms message err:" + err;
			str = str + "<br/>destination/source:" + jsonMessage.destinationName;
			str = str + "<br/>payload:" + jsonMessage.payloadString;
		  	debug(str);
		}
	}
	
	// called from onMessageArrived callback based on the message type TYPE_USER_SENT_MESSAGE
	function messageReceived(message, clientid) {
		debug(".messageReceived message:" + message);
		var message_box = document.getElementById(clientid);
		if (message_box == null) {
			// insert the message box for this from client because it doesn't exist on the page yet
			var clients_box = document.getElementById("client_messages");
			clients_box.insertAdjacentHTML("beforeend", clientid + ':' + '<pre id=' + '"' + clientid + '"' + '></pre><br/>'); // ' here to fix bug appearance of code
			message_box = document.getElementById(clientid);
		}
		if (message_box) {
			// replace the contents of the message box of this client with the message
			message_box.innerHTML = message;
		}
	}

	// public methods and vars go here
	return {
		connect: function(mqHost, useSSL) {
			// we have a private internalConnect method that we call from here.
			return internalConnect(mqHost, useSSL);
		},
		disconnect: function() {
			client.disconnect();
		},
		send: function(txt) { 
			// build the json string for the payload. Little hacky because I'm
			// not using any third-party libraries to build the json
			var jsonPayload = '{'
				+ '"'+PAYLOAD_KEY_CLIENTID+'"' + ':' + '"'+client.clientId+'"' + ','
				+ '"'+PAYLOAD_KEY_TYPE+'"'     + ':' + TYPE_USER_SENT_MESSAGE + ','
				+ '"'+PAYLOAD_KEY_MESSAGE+'"'  + ':' + '"'+txt+'"'
				+'}';
			// {"clientid":"mqexample-72178","type":10,"message":"hi there"}
			
			debug("sending payload:" + jsonPayload);
			var message = new Paho.MQTT.Message(jsonPayload);
			message.destinationName = destination; // the topic/queue
			client.send(message); // sends to the destination/topic on the broker
		}
	};
	
	
})();