Data collection and storage using Gozirra, Flume and HBase Monday 20th January
The aim of the project was to develop and demonstrate capabilities around technologies emerging in the “big data” technology space, using Network Rail data feeds as the data source. The Network Rail train movement data feed produces several hundred thousand messages per day in JSON format. Our aim was to store the train movement data and to develop ways to visualise it in real-time and historic time. This blog focuses on accessing the feeds and storing the incoming data stream.
Selecting a “big data” application stack
Focusing on big data specifically, we were drawn to the technology bundles of some of the major organisations in the emerging big data space, such as Cloudera and Hortonworks, and eventually decided to develop using components contained within the Cloudera technology stack. The Cloudera CDH and Cloudera Manager products group together Apache Hadoop and other related technologies into an integrated and managed environment, providing user interfaces for managing configuration changes, starting, stopping, and monitoring services, and more. We developed using Cloudera’s “QuickStart VM”. Their VM is designed to allow the user to learn big data techniques usually applied to clusters of machines, using only a single machine (aka “single node” or “pseudo cluster”). The Cloudera VM consists of a preinstalled big data application stack on top of CentOS. The application stack consists of Cloudera Manager, Hadoop (HDFS, MapReduce), Hive, Pig, HBase, Hue, Solr, Impala, Oozie, Sqoop, Yarn, and Zookeeper. Most of these technologies are open source Apache products, and several are part of the Apache Hadoop project. Impala, and Cloudera Manager are Cloudera products.
Subscribing to the Network Rail data feed
One of the first problems we faced was how to subscribe to the Network Rail data feeds and obtain train movement messages. Network Rail have made the data feeds available over the Stomp protocol. A Google search suggested two java Stomp clients we could use to connect and subscribe to the data feeds, namely Gozirra and Stampy. Gozirra was chosen because it appeared a little more straightforward to get started with (plus it sounds a bit like Godzilla). Using the Gozirra library it was possible with a few lines of code to connect and subscribe to the Network Rail data feeds and receive messages.
HBase for data storage and real-time retrieval
We initially experimented with small example datasets stored on HDFS, using Hive for ad hoc querying, but found that our queries were taking many seconds to return. If this was the case with small amounts of data, it was clear that real-time visual presentation of current or historical train movement data would not be possible with Hive. It turns out that Hive is geared towards batch-oriented rather than real-time querying, while the distributed NoSQL database HBase, part of the Apache Hadoop project, and also included as part of the Cloudera stack, is suitable for real-time querying of large quantities of data (see e.g., stackoverflow.com/questions/24179/how-does-hive-compare-to-hbase). As the project evolved, the use cases involving the train messaging data evolved towards a focus on the visual presentation of real-time data, hence HBase, with its real time querying capability, was chosen for persistence. An introduction to HBase is beyond the scope of this blog - an excellent series of introductory articles have been written by Scott Leberknight covering various HBase topics. These include an Introduction, an architectural overview, an article on using the HBase shell, and another on the HBase Java API. Two of our other blogs in this series also focus on HBase topics. HBase row key design becomes central to efficient data retrieval since rows are stored in HBase by ascending order of row key values. Mark Waldron explores how row keys can be designed to facilitate storing geospatial and temporal data in such a way as to facilitate queries across space and time, a topic fundamental to the efficient storing and querying of the train movement feeds. A second blog by Andy Palmer focuses on how to extract data from HBase. HBase is a NoSQL database, and SQL is not used for querying (but see note on Phoenix below). Instead, the querying role is taken by HBase scans and filters, which Andy has written and optimised for geospatial queries – ouch!
Note: If we had discovered it sooner, we might have preferred Phoenix for querying, as it provides low latency, SQL querying on top of HBase, and implements the convenience of JDBC interface semantics for connections, statements, and resultsets. We did not evaluate other NoSQL competitors to HBase such as Cassandra and MongoDB.
Getting the train movement data into HBase using Apache Flume with custom source and sink
Apache Flume architecture consists of ‘sources’, ‘channels’, and ‘sinks’. A source corresponds to an entry point for data into flume. A sink is an exit point for data out of Flume, which is often, but not necessarily a database. A Flume source puts ‘events’ (in the current context events correspond to JSON message blocks arriving into the system via Gozzira), into a Flume channel. A Flume sink is passed events from a given channel and is responsible for doing something with them, such as storing them in a database. Sources, channels, and sinks are configured to work together as ‘agents’ in a Flume configuration file.
Out of the box, Flume comes with a variety of sources and sinks. For example it is possible with just a little configuration to achieve the task of file transfer from a specified directory on one machine to an HDFS directory on a different machine (by configuring a spooldir source and an HDFS sink in flume.conf). For our purposes, however, we needed to develop a custom Flume source to subscribe to the Network Rail messaging service, and listen for new JSON messages, posting each message as an event to a flume channel. We also developed a custom sink. The sink gets passed messages from the Flume channel. It parses geospatial and timestamp information from these incoming messages to construct HBase row keys, and constructs HBase ‘put’ requests to store the data into HBase.
We could have written our own code to consume and store the Network Rail data and store it into HBase using the HBase Java API, so why did we use Flume? There are several reasons. Firstly, Apache Flume is integrated into Cloudera Manager as a service. This means that for the data ingestion and storage part of the application, we could configure the application using the Cloudera Manager Flume configuration GUI, and we could control the stopping and starting of our application by stopping and starting the Flume service using the Cloudera Manager GUI. Secondly, Flume provides a level of recovery from data outages that we could utilise. For example, if there is an HBase failure, and messages are still flowing into the system, then in principle these messages can be stored in a temporary storage area (file channel) until such time as they can be resubmitted to HBase for permanent storage (at least that’s the theory). A third reason is that this was an R&D project and we were actively seeking to use new technologies to help expand the company’s knowledge reach.
The path of data travelling from the Network Rail server to our single node HBase installation may appear rather more complex than it need be. But the effort expended in integrating the data storage component of the application with Cloudera manager via Flume yielded some application benefits in terms of integration with Cloudera Manager, and some recovery features, and also interesting knowledge gains. Perhaps also it illustrates how the choice of an application stack promotes the adoption of certain technologies rather than others. For a blog outlining how the data was used to visualise the train traffic in real and historical time, see Dave Hampton’s forthcoming data visualisation blog.comments powered by Disqus