Hazelcast Jet 4.0 is Released!

We’re happy to introduce Hazelcast Jet 4.0 and its new features. This release was a significant effort and featured 230 PRs merged which makes it one of our biggest releases in terms of new features.

Distributed Transactions

Jet previously had first-class support for fault tolerance through implementation of the Chandy-Lamport distributed snapshotting algorithm, which requires participation from the whole pipeline, including sources and sinks. Previously, the at-least-once and exactly-once processing guarantees were only limited to replayable sources such as Kafka. Jet 4.0 comes with a full two-phase commit (2PC) implementation, which makes it possible to have end-to-end exactly-once processing with acknowledgment-based sources such as JMS. Jet is now able to work with transactional sinks to avoid duplicate writes, and this version adds transactional file and Kafka sinks, with transactional JMS and JDBC sinks utilizing XA transactions coming in the next release.

We will have additional posts about this topic in the future detailing the mechanism and the results of our tests with 2PC for various message brokers and databases.

Python User-Defined Functions

Python is a popular language with a massive ecosystem of libraries and has especially become popular in the domain of data processing and machine learning. Jet itself is a data processing framework for both streams and batches of data, but the API for defining the pipeline itself was previously limited to Java and Java functions only.

In this version, we have added a native way to execute Python code within a Jet pipeline. Jet can now spawn separate Python processes on each node that communicate back using gRPC. The processes are fully managed by Jet and can make use of techniques such as smart batching of events.

The user defines a mapping stage which takes an input item and transforms it using a supplied Python function. The function can make use of libraries such as scikit, numpy and others, making it possible to use Jet for deploying ML models in production. For example, given this pipeline:

Pipeline p = Pipeline.create();
p.readFrom(TestSources.itemStream(10, (ts, seq) -> bigRandomNumberAsString()))
 .withoutTimestamps()
 .apply(mapUsingPython(new PythonServiceConfig()
 .setBaseDir(baseDir)
 .setHandlerModule("take_sqrt")))
 .writeTo(Sinks.observable(RESULTS));

The user only has to supply the following Python function:

import numpy as np

def transform_list(input_list):
"""
Uses NumPy to transform a list of numbers into a list of their square
roots.
"""
  num_list = [float(it) for it in input_list]
  sqrt_list = np.sqrt(num_list)
  return [str(it) for it in sqrt_list]

For a more in-depth discussion on this topic, I recommend viewing Jet Core Engineer Marko Topolnik’s presentation, Deploying ML Models at Scale.

Observables

When you submit a Jet pipeline, it typically reads the data from a source and writes to a sink (such as a IMap). When the submitter of the pipeline wants to read the results, the sink must be read outside of the pipeline, which is not very convenient.

In Jet 4.0, a new sink type called Observable is added, which can be used to publish messages directly to the caller. It utilizes a Hazelcast Ringbuffer as the underlying data store, which allows the decoupling of the producer and consumer.

Observable<SimpleEvent> o = jet.newObservable();
o.addObserver(event -> System.out.println(event));
p.readFrom(TestSources.itemStream(10))
 .withoutTimestamps()
 .writeTo(Sinks.observable(o));
jet.newJob(p).join();

The Observable can also be used to notify you of a job’s completion and any errors that may occur during processing.

Over the last few releases we’ve been improving the metrics support in Jet, such as being able to get metrics directly from running or completed jobs through the use of Job.getMetrics(). In this release, we’ve made it possible to also add your custom metrics into a pipeline through the use of a simple API:

p.readFrom(TestSources.itemStream(10))
 .withoutTimestamps()
 .map(event -> {
    if (event.sequence % 2 == 0) {
        Metrics.metrics("numEvens").increment();
    }
    return event;
 }).writeTo(Sinks.logger());

These custom metrics will then be available as part of Job.getMetrics() or through JMX along with the rest of the metrics.

Debezium, Kafka Connect and Twitter Connectors

As part of Jet 4.0, we’re also releasing three new connectors:

Debezium

Debezium is a Change Data Capture (CDC) platform and the new Debezium connector for Jet allows you to stream changes directly from databases, such as MySQL and PostgreSQL, without requiring any other dependencies.

Although Debezium typically requires the use of Kafka and Kafka Connect, the native Jet integration means you can directly stream changes without having to use Kafka. The integration also supports fault-tolerance so that when a Jet job is scaled up or down, old changes do not need to be replayed.

This makes it suitable to build an end-to-end solution where, for example, an in-memory cache supported by IMap is always kept up to date with the latest changes in the database.

Configuration configuration = Configuration.create()
 .with("name", "mysql-inventory-connector")
 .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
 /* begin connector properties */
 .with("database.hostname", mysql.getContainerIpAddress())
 .with("database.port", mysql.getMappedPort(MYSQL_PORT))
 .with("database.user", "debezium")
 .with("database.password", "dbz")
 .with("database.server.id", "184054")
 .with("database.server.name", "dbserver1")
 .with("database.whitelist", "inventory")
 .with("database.history.hazelcast.list.name", "test")
 .build();

Pipeline p = Pipeline.create();
p.readFrom(DebeziumSources.cdc(configuration))
 .withoutTimestamps()
 .map(record -> Values.convertToString(record.valueSchema(), record.value()))
 .writeTo(Sinks.logger());

The Debezium connector is currently available in the hazelcast-jet-contrib repository, along with a demo application.

Kafka Connect

The Kafka Connect source allows you to use any existing Kafka Connect source natively with Jet, without requiring the presence of a Kafka Cluster. The records will be streamed as Jet events instead, which can be processed further with the full support for fault-tolerance and replaying. A complete list of connectors can is available through the Confluent Hub.

Twitter

We’ve also released a simple Twitter source that uses the Twitter client to process a stream of tweets.

Properties credentials = new Properties();
properties.setProperty("consumerKey", "???"); // OAuth1 Consumer Key
properties.setProperty("consumerSecret", "???"); // OAuth1 Consumer Secret
properties.setProperty("token", "???"); // OAuth1 Token
properties.setProperty("tokenSecret", "???"); // OAuth1 Token Secret
List<String> terms = Arrays.asList("term1", "term2");
StreamSource streamSource = TwitterSources.stream(credentials, 
    () -> new StatusesFilterEndpoint().trackTerms(terms)
);
Pipeline p = Pipeline.create();
p.readFrom(streamSource)
 .withoutTimestamps()
 .writeTo(Sinks.logger());

These connectors are currently under incubation and will be part of a future release.

Improved Jet Installation

We’ve also made many improvements to the Jet installation package. It has been cleaned up to reduce the size and now supports the following:

  • Default config format is now YAML and many of the common options are in the default configuration
  • A rolling file logger which writes to the log folder is now the default logger
  • Support for daemon mode through jet-start -d switch
  • Improved readme and a new “hello world” application which can be submitted right after installation
  • Improved JDK9+ support to avoid illegal import warnings

Hazelcast IMDG 4.0

Another change that’s worth noting is that Jet is now based on Hazelcast IMDG 4.0 – which in itself was a major release and brought many new features and technical improvements, including better performance, Intel Optane DC Support and encryption at rest.

Breaking Changes and Migration Guide

As part of 4.0, we’ve also done some house cleaning which moved things. All the changes are listed as part of the migration guide in the reference manual.

We are committed to backwards compatibility going forward and any interfaces or classes which are subject to change will be marked as @Beta or @EvolvingApi going forwards.

Wrapping Up

Hazelcast Jet 4.0 is a big release and we have many more exciting features in the pipeline (pun intended), including SQL support, extended support for 2PC, improved Serialization support, even more connectors, Kubernetes Operators and much more. We will also be aiming to make shorter, more frequent releases to bring new features to users much quicker.