How to Describe, Publish & Consume Real-Time Data that you've discovered within your systems and applications.
In the first post in the series we covered discovering real-time data within your systems and applications. In part two we went through the use cases for your real-time data. In this final section we’ll cover the how: how to describe, publish & consume real-time data from your systems and expose the data so that you can build real-time features.
The main steps we’re going to cover are doing the following with the real-time event data:
The amount of analysis and processing of your data that is required really depends on whether the data is already in a form that you can instantly use. If it’s very simple data and you have everything you need then you can instantly share it to be used within apps. If it’s complex data or requires some additional analysis then you need to do that work before you share it.
Unless there amount of work you’re doing on the data is minimal it’s recommended that you do that work asynchronously. If you got that data from a system interaction, such as a web request, then doing the work there will delay the response to that web request. The same rule applies if you need to put the data into a database in order to perform a query; don’t perform the query within the same context (thread) as the one you received the event and associated data. Instead, do the work in another process.
Here’s one approach to doing this correctly.
RethinkDB is an open source database built for the real-time web. The reason it’s such a good fit is due to a feature called changefeeds. In the following example, where r
is the RethinkDB reference, we first update the number of points
that a player has:
1r.table("players") 2 .get(playerId) 3 .update({points: newPoints}) 4 .run(conn, callback);
We have playerId
to identify a player to update and we’re updating the points
value to be newPoints
.
Elsewhere in code (another file or even another process entirely) we could have the following:
1r.table('players') 2 .orderBy(index=r.desc('points')) 3 .limit(5) 4 .changes().run(conn) 5 .then(top5pointsUpdated);
We’re querying the players
table, ordering the result desc
ending by points and limiting the number of results by 5
. There’s nothing particularly different in this approach to any other database (other than maybe the ReQL syntax). The interesting bit is that the top5pointsUpdated
function passed to then
is called any time that the query result changes. That’s because we’ve identified this query as a changefeed through the use of changes()
. So, any time the update
code we covered earlier is executed the top5pointsUpdated
function will be called.
In a more traditional database these could be considered triggers. The main things are that it’s evented and that it nicely separates the updating code from the changefeed (trigger) handling code. The asynchronous nature of this code allows it to be executed outside of the incoming event thread/context or in a different process. This is just one example and there may be other solutions allow you to achieve similar functionality.
If you’re interested in RethinkDB checkout our post on Using RethinkDB with the Evented Web.
Systems, applications, and developers building real-time applications and features need a way of registering their interest in data. One of the simplest and most commonly used approaches to this is provide a name or identifier for the data that can be used when registering interest. Publish-Subscribe fits very well here, where data is identified by a channel
, topic
or subject
.
In addition to being able to identify what the data is, it’s very useful – especially within evented systems – to know what is happening to the data. What event has taken place that has resulted in the new or changed data? We call this Evented Publish-Subscribe.
To summarise:
If you’ve got relatively simple data within your systems then deciding how to identify the data can be easy. There may be a database schema or document that already exists with a name that you can also use as the channel name. Or if you’re using an ORM solution then the model name may be a good name for the channel. So, for an activity stream where there’s a database table called activities
or in an ORM where there’s a model called Activity
then activities
is a good choice for a channel name.
When it comes to naming events – what has happened to the data – then the standard CRUD terminology may be the right solution; created
, read
, updated
and deleted
/destroyed
. It may be that read
is used much less than the other operations, although building features that inform other users that somebody is accessing data can be really useful.
For more complex data it’s probably best to name the channel based on the analysis or query that’s taking place. In the RethinkDB example we covered earlier we were creating a query representing the top 5 players in a game. So, a channel named top-5-players
would be a good channel name and updated
may be an appropriate event name.
Once you’ve described your data you need a mechanism for distributing that data to interested parties (we’ll cover how those parties register their interest in the next section).
In the following example using the node-orm2 ORM package and the Pusher HTTP Node library we’ll see how this can be achieved for a simple data change:
1Activity.create( 2 {text: "Phil is talking"}, 3 function(err, activity) { 4 if(err) throw err; 5 6 pusher.trigger('activities', 'created', activity); 7 });
In this code a new activity has been created so we’ve triggered an event called created
on the activities
channel passing the newly created activity
as the event data payload.
1Activity.get(activityId, function (err, activity) { 2 activity.text = "Phil is still talking"; 3 activity.save(function (err) { 4 pusher.trigger('activities', 'updated', activity); 5 }); 6});
The code above shows that updating is simply a matter of changing the event name to updated
. You can work out how deleted
and any other event could easily be represented in code and communicated.
For the RethinkDB, more complex example we looked at earlier, the code would look as follows:
1r.table('players') 2 .orderBy(index=r.desc('points')) 3 .limit(5) 4 .changes().run(conn) 5 .then(top5pointsUpdated); 6 7function top5pointsUpdated(cursor) { 8 var top5 = []; 9 cursor.each(function(err, item) { 10 top5.push(item); 11 }); 12 pusher.trigger('top-5-players', 'updated', top5) 13});
Note: In the above code we know the function passed to cursor.each
called synchronously
In the above code we’ve implemented the top5pointsUpdated
function. It loops over the updated result, adds those results to an array and triggers and event via Pusher. The name of the channel is top-5-players
, since that’s the query being made and clearly identifies what the data is. The name of the event is updated
since the result value has been updated.
In this example we’re using RethinkDB for the processing and analysis, and Pusher for the publishing of the data. Other solutions will offer similar abstractions to help you process and then publish your real-time data.
For completeness, here are some of the examples of publishing data for the other scenarios that were mentioned in the discovering real-time data blog post.
In this example using the Pusher Ruby HTTP library the code triggers an event any time the /interact
web endpoint is accessed. It then triggers an event on the endpoints
channel with an event name of new_interactions
. The event data is the parameters that are used when accessing the endpoint.
1post '/interact' do 2 Pusher.trigger('endpoints', 3 'new_interaction', 4 { 5 endpoint:'interact' 6 post_data: params 7 }) 8end
You could create some middleware that intercepts web requests and instead share the endpoint that was interacted with along with the HTTP parameters as the event data. Using this you could build a real-time analytics dashboard.
Setting up a request:
1var request = require('request'); 2var url = 'http://www.google.com'; 3pusher.trigger('web-request', 'outgoing', {url: url});
Making the request, handling the response and sharing the data:
1request(url, function (error, response, body) { 2 if(error) { 3 pusher.trigger('web-request', 4 'incoming-error', 5 {url: url, error: error}); 6 } else { 7 pusher.trigger('web-request', 8 'incoming-success', 9 {url: url, body: body}); 10 } 11});
This example may be useful for some kind of real-time analytics, but could also be useful if you wanted to make a call to a process to kick-off the request and then let you real-time framework – Pusher in this case – inform you when the result is ready (background process completion notifications). In the code above we’ve used the event names, incoming-error
or incoming-error
, to identify if the request was successful or not. This separation also makes it clear that the event data we get for each will be different.
Logging for custom system analytics:
1var winston = require('winston'); 2function log(level, data) { 3 winston.log(level, data); 4 pusher.trigger('logging', level, data); 5} 6log('info', {'some': 'data' });
This example just uses a log
function wrapper to both log using Winston and also trigger an event via Pusher. In real life you may instead create a realtime-log-listener
and configure your logging to use that. In this example the channel is hard-coded to logging
and the event name is the log level. But it could be that using the level as the channel name (e.g. logging-{level}
) may be a nicer approach to partition the data.
Now that it’s clear how the data is published, how do you go about consuming that data within an application? Well, we’ve covered how we identify and describe the data earlier so we use that mechanism for consuming the data. Again, we’ll show an Evented Publish-Subscribe example using the Pusher libraries.
First, let’s take a look at the code for publishing the data from the server:
1pusher.trigger('activities', 'created', activity); 2pusher.trigger('activities', 'updated', activity); 3pusher.trigger('activities', 'deleted', activity);
The corresponding code uses the Pusher JavaScript library in the client (the consumer) to identify the data (using channels) and then register interest in what’s happening to the data (using events):
1var channel = pusher.subscribe('activities'); 2 3channel.bind('created', function(activity) { 4 // Add to the UI 5}); 6 7channel.bind('updated', function(activity) { 8 // Update the UI 9}); 10 11channel.bind('deleted', function(activity) { 12 // Remove from the UI 13});
First we subscribe
to the activities
channel and then we bind to each event we’re interested in (created
, updated
and deleted
). In each case we have as separate callback to handle each of those events which nicely separates our code into functional blocks.
The code really speaks for itself due to its simplicity. Although the activities example is used here I’m sure you can see how it maps to the web endpoints, requests/responses and logging examples that have been shown earlier.
This post has discussed whether you need to analyse and process your real-time data and where you should do that. We’ve then looked at the importance of describing your data and how you can do that. Finally we’ve provided examples of publishing and then consuming your real-time data. From there it’s really up to you to start adding the real-time features – that were covered in the real-time features and use cases post – to your applications.
In this series we’ve covered identifying the real-time data with your apps and systems, the real-time features you can build with your data and in this post we’ve covered how to describe, publish & consume real-time data.
If you’re interested in digging into this a bit further I’ve recorded a screencast that covers some of the information from these posts entitled You have real-time data. You just don’t know it.. You can also find out more about strategies for dealing real-time data and real-time development best practices in a talk I gave at FOWA London, Tools, Tips & Techniques for Developing Real-Time Apps.
I’m really pleased to have finally put together a series on some of the fundamentals of building real-time apps. I’d love to hear your thoughts and experiences on the topics I’ve covered and more. So please get in touch.