Announcing the First Release of Hazelcast Jet Extension Modules

We are excited to announce the new repository for extension modules of Hazelcast Jet. This repository will be home for the Hazelcast Jet extension modules, including connectors (including both sources and sinks), custom aggregations, and context factories. All of these modules will help make the integration easier for 3rd-party products in your pipelines. The main idea of having a separate repository for these extensions is to have a faster software development lifecycle.

We have more flexibility for prototyping, developing, and releasing new extensions with this approach. Any contributions, ideas, or implementations are always appreciated and encouraged!

The goal of this repository is to act as an incubator for the main project in terms of extensions. In order for an extension to graduate, it must meet the standards of the main project, which includes having a standalone code sample, a documentation/reference manual, and a test suite that verifies stability and performance characteristics over an extended period of time (soak tests). Then it will be a part of the main distribution.

Today, I’m happy to announce the first release (v0.1) of the following connectors and aggregations:

InfluxDB Connector

The Hazelcast Jet connector for InfluxDB enables Jet pipelines to read/write data points from/to InfluxDB. A time-series database, InfluxDB is frequently used in conjunction with Grafana to visualise time-series data using charts. Jet pipelines are used to ingest raw streams of data points and aggregate them before storing to InfluxDB.

Usage as a Source

The InfluxDB batch source, InfluxDbSources.influxDb(), executes the query and emits the results as they arrive.

The following is an example pipeline that queries from InfluxDB, maps the first and second column values on the row to a tuple and logs them.

Pipeline p = Pipeline.create();
p.drawFrom(
    InfluxDbSources.influxDb("SELECT * FROM db..cpu_usages", 
        DATABASE_NAME, 
        INFLUXDB_URL, 
        USERNAME, 
        PASSWORD,
        (name, tags, columns, row) -> tuple2(row.get(0), row.get(1))))
)
 .drainTo(Sinks.logger());

Usage as a Sink

The InfluxDb sink,InfluxDbSinks.influxDb(), is used to write data points from a Hazelcast Jet pipeline to InfluxDB.

The following is an example pipeline that reads out measurements from a Hazelcast list, maps them to point instances, and writes them to InfluxDB.

Pipeline p = Pipeline.create();
p.drawFrom(Sources.list(measurements))
 .map(index -> Point.measurement("mem_usage")
                    .time(System.nanoTime(), TimeUnit.NANOSECONDS)
                    .addField("value", index)
                    .build()
 )
 .drainTo(InfluxDbSinks.influxDb(DB_URL, DATABASE_NAME, USERNAME, PASSWORD));

Elasticsearch Connectors

The Hazelcast Jet connector for Elasticsearch (v5 – v7) is for querying/indexing objects from/to Elasticsearch.

There are 3 major versions of Elasticsearch we support:

Practically, the usage flow is similar for all.

Usage as a Source

The Elasticsearch source, ElasticsearchSources.elasticsearch(), executes the query and retrieves the results using scroll. Please refer to the Elasticsearch documentation regarding scroll.

The Jet connector can be used to join and analyse the data stored in the index.

Usage as a Sink

The Elasticsearch sink, ElasticsearchSinks.elasticsearch(), is used to index objects from a Hazelcast Jet pipeline to Elasticsearch. Jet pipelines are mostly used to join, enrich, and pre-aggregate data before storing it to the index.

The configuration and usage of these connectors are slightly different from each other. Please refer to corresponding connector page for version of your Elasticsearch deployment.

Probabilistic Aggregations

The new repository also will have collections of probabilistic aggregations. Currently, it contains the implementation for the HyperLogLog++ aggregation.

HyperLogLog is a probabilistic data structure to estimate cardinality. Translated to plain English, it means you feed HyperLogLog with items and it can tell you how many unique items it received. Imagine you are processing a web server access log stream and you want to tell how many unique IPs visited your site. HyperLogLog does it with constant space complexity.

Usage

The aggregation is typically used in two stages:

  1. A mapping step to transform your stream entries into 64-bit hashes
  2. An aggregation step to estimate the number of unique hashes

In practice it looks like this:

Pipeline p = Pipeline.create();
pipeline.drawFrom(Sources.mySource())
        .mapUsingContext(HashingSupport.hashingContextFactory(), HashingSupport.hashingFn()) // hash items
        .aggregate(ProbabilisticAggregations.hyperLogLog()) // actual aggregation
        .drainTo(Sinks.mySink()); // write cardinality into sink

Wrapping Up

We are actively working on delivering new extension modules for Hazelcast Jet. Please let us know if you want to see a particular one in the next batch. We are also looking forward to seeing your contributions. With the Source and Sink Builder API, you can easily implement a connector for your favorite tool. Write your module now and be an active member of the open source community. Stay tuned for more updates!