Bridging Between Java 8 Streams and Hazelcast Jet

Hazelcast Jet allows you to distribute stream processing over several cluster nodes. While it comes with several out-of-the-box sources to read from (Hazelcast IMap, JMS, JDBC, etc.), and sinks to write to, there’s no Java 8 streams source. In this post, we are going to create an adapter to bridge this gap.

A Look at the SourceBuilder API

The com.hazelcast.jet.pipeline.Sources class contains the available sources mentioned above. That should be your first stop because nobody wants to reinvent the wheel.

To create a custom source, the entry point is the SourceBuilder class. It contains all the necessary plumbing to create one such source.

SourceBuilder class diagram

The usage is quite straightforward:

SourceBuilder
    .batch("name", new CreateFunction())
    .fillBufferFn(new FillBufferFunction())
    .build();

This deserves some explanation. The batch() method requires two arguments: a name, as well as a placeholder where the state can safely be stored. Remember, Jet is distributed; hence the state cannot be kept in places that Jet doesn’t know. That prevents the usage of fields of “standard” objects.

The fillBufferFn() method requires a single argument, a function that will read data and put said data in a buffer for Jet’s consumption.

Finally, notice the slight difference between the names used in the API and the names of the standard Java 8 functional interfaces, e.g., BiConsumerEx instead of BiConsumer. The latter cannot be used, as regular functional interfaces are not serializable. Most parameters in Jet methods need to be sent over the wire; hence they need to implement Serializable.

It’s easy to move from the batch API to the streaming API. It requires two changes:

  1. To use the stream()  method instead of batch()
  2. To add a timestamp to each element before calling build(). Several methods are available.

Implementing the Adapter

With that understanding, let’s implement the adapter.

The create function

The create function is a FunctionEx that accepts a Processor.Context type and returns any type.

Remember that the latter should contain the state. We could implement a cursor over the stream to keep track of what was the last element read. Yet this is the textbook definition of an Iterator!

The first draft looks like the following:

public class Java8StreamSource<T> implements FunctionEx<Processor.Context, Iterator<T>> {

    private final Stream<T> stream;

    public Java8StreamSource(Stream<T> stream) {
        this.stream = stream;
    }

    @Override
    public Iterator<T> applyEx(Processor.Context context) {
        return stream.iterator();
    }
}

Unfortunately, this is not good enough. The main issue with the code is that Java’s Stream is not Serializable! Hence, Jet won’t be able to send it over the wire. Even if it started in embedded mode on a single node with no network hops involved, Jet would throw an exception at runtime to avoid any unwanted surprise in the future.

We have to first collect elements in a serializable collection, such as ArrayList. But this cannot be achieved if the stream is of unbounded size—we need to keep the size within limits. This results in the following improved code:

public class Java8StreamSource<T> implements FunctionEx<Processor.Context, Iterator<T>> {

    private final List<T> elements;

    public Java8StreamSource(Stream<T> stream, int limit) {
        this.elements = stream.limit(limit).collect(Collectors.toList());
    }

    @Override
    public Iterator<T> applyEx(Processor.Context context) {
        return elements.iterator();
    }
}

The fill function

The fill function is bi-consumer of the state object—the iterator, and the buffer to fill provided by Jet.

Let’s try that:

public class Java8StreamFiller<T> implements BiConsumerEx<Iterator<T>, SourceBuffer<T>> {

    @Override
    public void acceptEx(Iterator<T> iterator, SourceBuffer<T> buffer) {
        while (iterator.hasNext()) {
            buffer.add(iterator.next());
        }
    }
}

That’s pretty good if the iterator contains a limited number of items. Barring that, we could potentially overflow the buffer. To prevent overflow, let’s add a limit to the number of elements added in one call:

public class Java8StreamFiller<T> implements BiConsumerEx<Iterator<T>, SourceBuffer<T>> {

    @Override
    public void acceptEx(Iterator<T> iterator, SourceBuffer<T> buffer) {
        for (var i = 0; i < Byte.MAX_VALUE && iterator.hasNext(); i++) {
            buffer.add(iterator.next());
        }
    }
}

Byte.MAX_VALUE has the benefits of being quite low.

Putting it all together

The final code looks like the following:

public static void main(String[] args) {
    var stream = Stream.iterate(1, i -> i + 1);
    var pipeline = Pipeline.create();
    var batch = SourceBuilder
            .batch("java-8-stream", new Java8StreamSource<>(stream))
            .fillBufferFn(new Java8StreamFiller<>())
            .build();
    pipeline.drawFrom(batch)
            .drainTo(Sinks.logger());
    var jet = Jet.newJetInstance(new JetConfig());
    try {
        jet.newJob(pipeline).join();
    } finally {
        jet.shutdown();
    }
}

Improving the Draft Implementation

While working “in general,” the above code suffers from a huge limitation: it doesn’t transfer the Stream itself over the wire – because Stream is not Serializable but a list of the elements that belong to the stream. With this approach, it’s currently not possible to cope with infinite streams which is the reason for the limit parameter in the constructor of Java8StreamSource. Surely, we can do better!

Actually, we can. Like in many cases, we can wrap the stream in a SupplierEx<T>. This is a Hazelcast specific Supplier<T> that also inherits from Serializable.

The updated code looks like this:

public class Java8StreamSource<T> implements FunctionEx<Processor.Context, Iterator<T>> {

    private final SupplierEx<Stream<T>> supplier;

    public Java8StreamSource(SupplierEx<Stream<T>> supplier) {
        this.supplier = supplier;
    }

    @Override
    public Iterator<T> applyEx(Processor.Context context) {
        return stream.get().iterator();
    }
}

The pipeline just needs to be updated accordingly:

SourceBuilder
    .batch("java-8-stream", new Java8StreamSource<>(() -> Stream.iterate(1, i -> i + 1)))
    .fillBufferFn(new Java8StreamFiller<>())
    .build();

Notice how the stream is wrapped in a lambda on line 2.

Conclusion

In this post, we gave a quick glance at the extensibility of Jet by creating a custom source. The source wraps a Java 8 stream and makes it available for Jet to consume. The above example can easily be adapted to your context so that you can create your own sources (and sinks). Happy integration!

The source code for this post is available on Github.