Hazelcast – Low Latency Datastores for IoT
DataMountaineer recently published a blog post entitled, “Hazelcast – Low Latency Datastores for IOT“. In the post, DataMountaineer presents a strategy for building low latency datastores for Internet of Things using Kafka and Hazelcast.
The Internet of Things is on the rise, it was certainly a buzzword of 2016. Gartner thinks so, they say there will be 20 billion devices online by 2020 with all of them transmitting (streaming) data. These devices are not limited to new devices, more and more we are seeing our clients want to connect into manufacturing control systems such as SCADA. Take a utility company for example, they might want to collect and analyse wind turbine or other asset data and perform forecasting or real time steering in combination with smart home meter data.
A Streaming solution with Kafka is the ideal platform to feed this never-ending flow of data into and Kafka Connect makes connecting these sources and sinks easy. So DataMountaineer built connectors for IoT, both CoAP (Constrained Application Protocol) and MQTT.
While being able to easily ingest this data by simply passing a config file to Connect is great we still need to process the incoming messages. We could use a stream processor like Kafka Streams or we could simply configure a sink to write to a In-Memory grid like Hazelcast, or both. At Datamountaineer we have support for Hazelcast.
What does this architecture look like for IoT? We need to able to capture, process and persist the deluge of sensor data. Combining Kafka with Hazelcast makes this simple. You need four components to achieve this:
- Gateways Kafka Connect simplifies the loading and unloading of data in and out of Kafka.
- Pipeline Persistence Kafka, a scalable, durable commit log, is capable of ingesting the flood of data, supporting consumers tapping into the flow and acting as a buffer for the long term storage layer. It handles back pressure (fast producer, slow consumers) out of the box.
- Processing In flight processing, the heart of any stream reactor, cleansing, aggregations and real-time analytics with Kafka Streams, Spark or Flink.
- Storage Longer term storage for further processing.
Kafka Connect SOURCES SINKS PROCESSORS Connect Kafka
Hazelcast
Hazelcast, the leading open source in-memory data grid, provides a rich architecture and feature set allowing it to receive high velocity writes for data storage, perform distributed processing and act as a perfect Sink. For example Hazelcast supports distributed events, execution callbacks, entry processor and a wide array of data structures such as map, queue, reliable topics, ringbuffers and JCaches.
It also has a variety of uses cases in many sectors such as IoT, Financial, Gaming and Media. These include:
- Caching
- NoSQL
- In-Memory Grid
- Web Session Clustering
One of the key differentiators from other data stores, besides it’s incredible ease of use, is the ability to use Hazelcast as a caching layer. For example providing Database Caching, Caching as a Service, Memcached replacement or plugin and more. A very versatile and easy to use to say the least! Keep your materialized views and caches up-to-date and continuously feed from Kafka.
Another use case is as an Oracle Coherence replacement. At DataMountaineer are working on a Source connector for this, stream all your data live out of Coherence and into Hazelcast? Just a thought.
Building a flow
Now onto the flow we want to setup.
We’ll use the CoAP source to subscribe to an observable CoAP server resource and publish the messages into Kafka.The CoAP Source automatically converts the incoming COAPResponse to Avro or Json and registers the schema with the Schema Registry.
Finally we’ll use the DataMountaineer Hazelcast Sink to subscribe to the CoAP topic and stream events to a Queue in the Hazelcast Cluster.
CoAP Source
The Internet of Things has several protocols, the most notable are MQTT and CoAP, DataMountaineer has connectors for both. CoAP is the Constrained Application Protocol from the CoRE (Constrained Resource Environments) IETF group. More information and a comparison of MQTT vs CoAP is available here. The CoAP Source Connector supports observable CoAP resources and secure DTLS clients, we have blogged in more detail about our CoAP and MQTT source here.
Hazelcast Sink
The Hazelcast sink supports the following features;
KCQL
DataMountaineers SQL like connector syntax. This simplifies mappings and features of our connectors while keeping the configuration clean and stops it becoming too verbose. I’m a big fan of Flume NG and Morphlines but the configurations quickly become verbose and I go blind writing them.
For Hazelcast KCQL supports;
- Topic to Hazelcast data structure mapping.
- Field selection from the topic, requires the payload to be Avro or JSON with a schema.
- Ability to choose the storage structure types.
- Format types, JSON or Avro.
For example, to select the sensor_id
, timestamp
and temperature
field from the coap_sensor_topic
, store into a RingBuffer called sensor_ringbuffer
with the payload as JSON
the KCQL statement would look like this;
INSERT INTO sensor_ringbuffer SELECT sensor_id, timestamp, temperature FROM coap_sensor_topic WITH FORMAT JSON
make confluent home folder mkdir confluent download confluent wget http://packages.confluent.io/archive/3.2/confluent-3.2.0-2.11.tar.gz extract archive to confluent folder tar -xvf confluent-3.2.0-2.11.tar.gz -C confluent setup variables export CONFLUENT_HOME=~/confluent/confluent-3.2.0 cd $CONFLUENT_HOME
Start the confluent platform, we need kafka, zookeeper and the schema registry
bin/zookeeper-server-start etc/kafka/zookeeper.properties & sleep 10 && bin/kafka-server-start etc/kafka/server.properties & sleep 10 && bin/schema-registry-start etc/schema-registry/schema-registry.properties &
Additionally, we need to start Kafka Connect. We will do this in distributed mode which is straight forward but we need the CoAP Source and Hazelcast Sink on the CLASSPATH. Download the Stream Reactor. Unpack the archive and start Kafka Connect in distributed mode with the Connectors on the CLASSPATH.
We can use Kafka Connect’s Rest API to confirm that our Sink class is available.
wget https://github.com/datamountaineer/stream-reactor/releases/download/v0.2.5/stream-reactor-0.2.5-3.2.0.tar.gz mkdir stream-reactor tar xvf stream-reactor-0.2.5-3.2.0.tar.gz -C stream-reactor cd stream-reactor/stream-reactor-0.2.5-3.2.0 bin/start-connect.sh
In a new terminal check the plugins available.
bin/cli.sh plugins Class name: ConnectorPlugins(com.datamountaineer.streamreactor.connect.coap.source.CoapSourceConnector) Class name: ConnectorPlugins(com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector)
CoAP
We built a small test CoAP Server which can be downloaded here for testing or you can use a Eclipse testing server. CoAP also has a FireFox plugin called Copper which you can use to inspect servers and their resources. If you want to use our test CoAP server download and start the server in a new terminal window.
wget https://github.com/datamountaineer/coap-test-server/releases/download/v1.0/start-server.sh chmod +x start-server.sh ./start-server.sh
Hazelcast
Hazelcast, while extremely powerful and flexible is also really easy to use. Setting up a cluster takes no time at all. This is all the code you need to start a Cluster node, start multiple instances and they find each other via multicast to form a cluster.
import com.hazelcast.core.*; import com.hazelcast.config.*; import java.util.Map; import java.util.Queue; public class GettingStarted { public static void main(String[] args) { Config cfg = new Config(); HazelcastInstance instance = Hazelcast.newHazelcastInstance(cfg); Queue<String> queueCustomers = instance.getQueue("customers"); queueCustomers.offer("Tom"); queueCustomers.offer("Mary"); queueCustomers.offer("Jane"); System.out.println("First customer: " + queueCustomers.poll()); System.out.println("Second customer: "+ queueCustomers.peek()); System.out.println("Queue size: " + queueCustomers.size()); } }
Even less for the Java client!
package com.hazelcast.test; import com.hazelcast.client.config.ClientConfig; import com.hazelcast.client.HazelcastClient; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IMap; public class GettingStartedClient { public static void main(String[] args) { ClientConfig clientConfig = new ClientConfig(); clientConfig.addAddress("127.0.0.1:5701"); HazelcastInstance client = HazelcastClient.newHazelcastClient(clientConfig); IMap map = client.getMap("customers"); System.out.println("Map Size:" + map.size()); } }
But we don’t need to write code to get this flow going!
Lets download Hazelcast and start a server node and a client, the server will by default set its socket address as its public address so you can either modify the hazelcast.xml
in the hazelcast-3.7.4/bin/
or update the connect.hazelcast.sink.cluster.members
configuration option in the Sink configuration later. If you want to change the public address of the node add the following to the network section of the hazelcast.xml file.
<public-address>localhost</public-address> view rawhc-public-address.xml hosted with ❤ by GitHub #start hazeclast server node hazelcast-3.7.4/bin/start.sh
In a new terminal start the console app.
#start the console app (client node) export CLASSPATH=~/Downloads/hazelcast-3.7.5/bin/hazelcast.xml cd demo clientConsole.sh
Now in the terminal you started the server node you should see something like this:
INFO: [127.0.0.1]:5701 [dev] [3.7.4] TcpIpConnectionManager configured with Non Blocking IO-threading model: 3 input threads and 3 output threads Jan 19, 2017 11:21:20 AM com.hazelcast.internal.cluster.impl.MulticastJoiner INFO: [127.0.0.1]:5701 [dev] [3.7.4] Members [1] { Member [127.0.0.1]:5701 - 2cc01773-6525-48a0-a9a3-09ba943fd478 this } Jan 19, 2017 11:21:20 AM com.hazelcast.core.LifecycleService
The Hazelcast client console allows you to interact with with the cluster. For this demo we will listen to a queue for events written from Kafka. In the Hazecast client console terminal, switch to the dev namespace/group and listen to the queue:
hazelcast[default] > ns dev namespace: dev hazelcast[dev] > q.take
Starting the flow
Now that we have a Connect and Hazelcast Cluster up and running let’s give it the configurations for the CoAP Source and Hazelcast Sink. Lets create a properties file for the sink called coap-hazelcast-sink.properties.
Add the following configuration:
name=hazelcast-sink connector.class=com.datamountaineer.streamreactor.connect.hazelcast.sink.HazelCastSinkConnector max.tasks=1 topics = coap_sensor_topic connect.hazelcast.sink.cluster.members=localhost connect.hazelcast.sink.group.name=dev connect.hazelcast.sink.group.password=dev-pass connect.hazelcast.sink.kcql=INSERT INTO dev SELECT * FROM coap_sensor_topic WITHFORMAT JSON STOREAS QUEUE
This configuration is straightforward;
- Defines the name of the sink as hazelcast-sink.
- Sets the connector class to use which must be on the CLASSPATH of all workers in the Connect cluster.
- Sets the number of tasks the Connector is allowed to spawn across the cluster.
- The Hazelcast group name to use. Our Hazelcast console app listening on the other end.
- The password for the group name.
- The KCQL statement. This is saying we want to select all fields from the coap_sensor_topic topic and write them to a Queue called dev as JSON. We are using dev here since we haven’t added a new group to the hazelcast.xml config and we are listening via the demo console app.
For completeness lets create the CoAP Source config. Here we define the CoAP server and sensor resource to subscribe to. Create a file called coap-hazelcast-source.properties with the following contents.
name = coap-source connector.class = com.datamountaineer.streamreactor.connect.coap.source.CoapSourceConnector max.tasks = 1 connect.coap.source.uri = coap://localhost:5633 connect.coap.kcql = INSERT INTO coap_sensor_topic SELECT * FROM sensors
Pushing configurations to Connect
DataMountaineer has a CLI for interacting with Kafka Connect.This is a tiny command line interface (CLI) around the Kafka Connect REST Interface to manage connectors. It is used in a git like fashion where the first program argument indicates the command.
./cli.sh [ps|get|rm|create|run|status|status|plugins|describe|validate|restart|pause|resume]
The CLI is meant to behave as a good unix citizen: input from stdin; output to stdout; out of band info to stderr and non-zero exit status on error. Commands dealing with configuration expect or produce data in .properties style: key=value lines and comments start with a #.
We will use the CLI to push in the configurations to the Connect Cluster, at which point you should see the connectors starting up. If you’re using the Confluent Control Center you can also add the connectors that way.
bin/cli.sh create coap-source < conf/coap-hazelcast-source.properties
This will start the Coap Source Connector to feed entries into Kafka. Next start the Hazelcast Sink:
bin/cli.sh create hazelcast-sink < conf/hazelcast-coap-sink.properties
If you check the logs of the terminal where you started the Kafka Connect cluster you should see both Connectors start loading data into Kafka and Hazelcast.
Lets check the data in Kafka via the console consumer.
./kafka-avro-console-consumer --zookeeper localhost --topic coap_sensor_topic {"message_id":{"int":62517},"type":{"string":"NON"},"code":"2.05","raw_code":{"int":69},"rtt":{"long":998},"is_last":{"boolean":true},"is_notification":{"boolean":true},"source":{"string":"localhost:5633"},"destination":{"string":""},"timestamp":{"long":0},"token":{"string":"d77d"},"is_duplicate":{"boolean":false},"is_confirmable":{"boolean":false},"is_rejected":{"boolean":false},"is_acknowledged":{"boolean":false},"is_canceled":{"boolean":false},"accept":{"int":-1},"block1":{"string":""},"block2":{"string":""},"content_format":{"int":0},"etags":[],"location_path":{"string":""},"location_query":{"string":""},"max_age":{"long":2},"observe":{"int":0},"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":{"string":""},"uri_query":{"string":""},"payload":{"string":"{\"sensorId\":\"Sensor4\",\"timestamp\":1486997194029,\"temperature\":23.60440030439427,\"message\":\"Sensor message 98\"}"}} {"message_id":{"int":62525},"type":{"string":"NON"},"code":"2.05","raw_code":{"int":69},"rtt":{"long":999},"is_last":{"boolean":true},"is_notification":{"boolean":true},"source":{"string":"localhost:5633"},"destination":{"string":""},"timestamp":{"long":0},"token":{"string":"d77d"},"is_duplicate":{"boolean":false},"is_confirmable":{"boolean":false},"is_rejected":{"boolean":false},"is_acknowledged":{"boolean":false},"is_canceled":{"boolean":false},"accept":{"int":-1},"block1":{"string":""},"block2":{"string":""},"content_format":{"int":0},"etags":[],"location_path":{"string":""},"location_query":{"string":""},"max_age":{"long":2},"observe":{"int":0},"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":{"string":""},"uri_query":{"string":""},"payload":{"string":"{\"sensorId\":\"Sensor5\",\"timestamp\":1486997199029,\"temperature\":25.7178981915713,\"message\":\"Sensor message 106\"}"}}
Back in our Hazelcast console app we should see data arriving:
hazelcast[default] > ns dev namespace: dev hazelcast[dev] > q.take {"ingest_time":1487166342000,"message_id":34158,"type":"ACK","code":"2.05","raw_code":69,"rtt":65,"is_last":true,"is_notification":true,"source":"localhost:5633","destination":"","timestamp":0,"token":"6b3e66","is_duplicate":false,"is_confirmable":false,"is_rejected":false,"is_acknowledged":false,"is_canceled":false,"accept":-1,"block1":"","block2":"","content_format":0,"etags":[],"location_path":"","location_query":"","max_age":2,"observe":0,"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":"","uri_query":"","payload":"{\"sensorId\":\"Sensor5\",\"timestamp\":1487166341936,\"temperature\":28.33645994241467,\"message\":\"Sensor message 1\"}"} hazelcast[dev] > q.take {"ingest_time":1487166346002,"message_id":34159,"type":"ACK","code":"2.05","raw_code":69,"rtt":3,"is_last":true,"is_notification":true,"source":"localhost:5633","destination":"","timestamp":0,"token":"6b3e66","is_duplicate":false,"is_confirmable":false,"is_rejected":false,"is_acknowledged":false,"is_canceled":false,"accept":-1,"block1":"","block2":"","content_format":0,"etags":[],"location_path":"","location_query":"","max_age":2,"observe":0,"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":"","uri_query":"","payload":"{\"sensorId\":\"Sensor4\",\"timestamp\":1487166346000,\"temperature\":22.77444653747327,\"message\":\"Sensor message 2\"}"} hazelcast[dev] > q.iterator 1 {"ingest_time":1487166350005,"message_id":34160,"type":"ACK","code":"2.05","raw_code":69,"rtt":2,"is_last":true,"is_notification":true,"source":"localhost:5633","destination":"","timestamp":0,"token":"6b3e66","is_duplicate":false,"is_confirmable":false,"is_rejected":false,"is_acknowledged":false,"is_canceled":false,"accept":-1,"block1":"","block2":"","content_format":0,"etags":[],"location_path":"","location_query":"","max_age":2,"observe":0,"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":"","uri_query":"","payload":"{\"sensorId\":\"Sensor4\",\"timestamp\":1487166350004,\"temperature\":24.549225553626442,\"message\":\"Sensor message 4\"}"} 2 {"ingest_time":1487166354011,"message_id":34161,"type":"ACK","code":"2.05","raw_code":69,"rtt":3,"is_last":true,"is_notification":true,"source":"localhost:5633","destination":"","timestamp":0,"token":"6b3e66","is_duplicate":false,"is_confirmable":false,"is_rejected":false,"is_acknowledged":false,"is_canceled":false,"accept":-1,"block1":"","block2":"","content_format":0,"etags":[],"location_path":"","location_query":"","max_age":2,"observe":0,"proxy_uri":null,"size_1":null,"size_2":null,"uri_host":null,"uri_port":null,"uri_path":"","uri_query":"","payload":"{\"sensorId\":\"Sensor4\",\"timestamp\":1487166354009,\"temperature\":24.316168206573426,\"message\":\"Sensor message 7\"}"}
Conclusion
Kafka Connect provides a common framework to load and unload data to and from Kafka, it takes care of the hard parts of data ingestion for you;
- Delivery semantics
- Offset management
- Serialization / de-serialization
- Partitioning / scalability
- Fault tolerance / failover
- Data model integration
- CI/CD
- Metrics / monitoring
DataMountaineer has covered IoT with both CoAP and MQTT, coupled with the Hazelcast as a processing engine and storage layer it is easy to construct, simple, reliable and scalable dataflows to handle IoT stream processing and analytics.