Streaming with Spring

Hazelcast provides a library of connectors for streaming data in and out of the platform, but you might wish to write your own if you have some bespoke needs. In this blog, we’ll learn how to write your own using the example of Spring Data and Cassandra.

Our processing

We have two Spring repositories. One holds the mapping between letter and the Greek alphabet, so “B” is for “Beta.” The other holds the radio phonetic alphabet, so now “B” is for “Bravo.” We want to run a join on these two repositories to confirm that “Beta” and “Bravo” pair up.

The challenge here is one set of data is in Cassandra, and the other set is somewhere else.

Cassandra and Spring Data

In Cassandra, we define a table in the keyspace “hazelcast“:

CREATE TABLE letter (
  id TEXT,
  greek TEXT,
  PRIMARY KEY (id)
);

INSERT INTO hazelcast.letter (id, greek)  VALUES ('a', 'Alpha');
INSERT INTO hazelcast.letter (id, greek)  VALUES ('b', 'Beta');
INSERT INTO hazelcast.letter (id, greek)  VALUES ('d', 'Delta');
INSERT INTO hazelcast.letter (id, greek)  VALUES ('o', 'Omega');

The first table, “letter“, records the Greek word for certain letters. “b” is for “Beta“.

Then with Spring Data, we need define the domain model and the repository:

@Table(value = "letter")
public class Letter {
    @PrimaryKey
    private String id;
    @Column
    private String greek;
}

@Repository
public interface LetterRepository extends CassandraRepository {
}

Spring then makes a repository bean available and bound to the relevant Cassandra table, providing CRUD operations and querying in a convenient Spring standard style.

Hazelcast and Spring Data

As mentioned, the data that isn’t in Cassandra is somewhere else. There are several technology choices for Spring repositories, such as Mongo and JPA, but we’ll use Hazelcast as the “somewhere else!”

As before, we define a model and a repository. It’s very similar to the Cassandra repository, but there are minor differences due to the underlying technologies. Nevertheless, Spring hides most of it.

@KeySpace(value = "phonetic")
public class Phonetic implements Serializable {
    @Id
    private String id;
    private String radio;
}

@Repository
public interface PhoneticRepository extends KeyValueRepository {
}

And this is how we insert test data:

Phonetic alfa = new Phonetic();
alfa.setId("A");
alfa.setRadio("Alfa");
Phonetic bravo = new Phonetic();
bravo.setId("B");
bravo.setRadio("Bravo");
Phonetic charlie = new Phonetic();
charlie.setId("C");
charlie.setRadio("Charlie");
            
this.phoneticRepository.save(alfa);
this.phoneticRepository.save(bravo);
this.phoneticRepository.save(charlie);

The processing pipeline

The essence of the processing pipeline is this:

static Pipeline buildPipeline() {
    return Pipeline
            .create()
            .readFrom(mySource())
            .mapUsingService(phoneticRepositoryService, myMapLetterFn())
            .writeTo(Sinks.logger())
            .getPipeline();
}

A customer source is passed into “readFrom“. The data from the source passes through a “mapUsingService” stage that does the join. The output of the join passes to “writeTo(Sinks.logger())” which writes the result to the screen.

An ETL!

The actual output:

[127.0.0.1]:5701 [dev] [5.1.2] [MyCassandraJob/loggerSink#0] (Omega, )
[127.0.0.1]:5701 [dev] [5.1.2] [MyCassandraJob/loggerSink#0] (Delta, )
[127.0.0.1]:5701 [dev] [5.1.2] [MyCassandraJob/loggerSink#0] (Beta, Bravo)
[127.0.0.1]:5701 [dev] [5.1.2] [MyCassandraJob/loggerSink#0] (Alpha, Alfa)

But, we need to understand how this works in practice and examine the custom coding.

How this is actually run

The first thing to understand is what happens when you do “this.hazelcastInstance.getJet().newJob(pipeline)“.

Hazelcast is clustered. Typically this means several Hazelcast member processes rather than just one. So the “pipeline” has to be serialized to send to all of them and deserialized to execute on all of them in parallel.

Typically in Hazelcast, we think of sending data to the cluster. A domain model has fields and methods (getters, setters). We might focus on the fields, but really it’s a Java class. Well, that’s what a pipeline is.

As a consequence of this, the pipeline can’t have any state that can’t be serialized. So, this here means the Spring context. Specifically, we don’t want to include the Spring ApplicationContext from where the job is submitted; we need to retrieve the application context at each place where the job runs.

The Custom Source

As we saw above, the pipeline uses a custom source, “readFrom(mySource())“. How does this work ?

static BatchSource mySource() {
    return SourceBuilder
            .batch("letter",
                    jobContext -> new MyLetterSource(jobContext.hazelcastInstance().getConfig().getManagedContext()))
            .fillBufferFn(MyLetterSource::fillBufferFn)
            .build();
}

@SpringAware
static class MyLetterSource implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    MyLetterSource(ManagedContext managedContext) {
        SpringManagedContext springManagedContext = (SpringManagedContext) managedContext;
        springManagedContext.initialize(this);
    }

    void fillBufferFn(SourceBuilder.SourceBuffer buffer) {
        LetterRepository letterRepository = this.applicationContext.getBean(LetterRepository.class);
        List letters = letterRepository.findAll();
        letters.forEach(letter -> buffer.add(letter));
        buffer.close();
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

We use “SourceBuilder” to build the pipeline source from an instance of the “MyLetterSource” class, which has a fillBufferFn” method to add data to Jet’s data buffer. This source instance is built when the job executes.

As this is “@SpringAware” the Spring application context is plugged into the instance, which can then use the Spring application context to retrieve the “LetterRepository” bean to access its “findAll()” method.

Once we have the “LetterRepository” bean, the business logic really is just:

        List letters = letterRepository.findAll();
        letters.forEach(letter -> buffer.add(letter));

The sort of Spring simplicity we like. Read from Cassandra with one line. Iterator with another. This could even be a one-liner.

If you didn’t want to use Spring, you could pass the “MyLetterSource” constructor to the database login, password, etc., and build the connection yourself. That’s the boilerplate coding that Spring tries to handle for you.

Look Aside

We could code the pipeline as a full join with two input sources, one from the Greek alphabet and one from the phonetic alphabet. But two inputs would shed no more light than one.

Instead, let’s make it a left join using a look aside into another Spring repository. It happens this is a Hazelcast instance wrapped as a Spring repository, but here we won’t look inside.

In our processing stage, “mapUsingService(phoneticRepositoryService, myMapLetterFn())” we use a service to apply a bi-function.

ServiceFactory phoneticRepositoryService =
        ServiceFactories.sharedService(context
                -> new MyPhoneticService(context.hazelcastInstance().getConfig().getManagedContext()));

The above service provides an instance of the class below at runtime. Here “MyPhoneticService” takes the same approach to obtain Spring by being
@SpringAware” and capturing the reference to the (Hazelcast!) phonetic data repository.

@SpringAware
static class MyPhoneticService implements ApplicationContextAware {
    private PhoneticRepository phoneticRepository;

    MyPhoneticService(ManagedContext managedContext) {
        SpringManagedContext springManagedContext = (SpringManagedContext) managedContext;
        springManagedContext.initialize(this);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.phoneticRepository = applicationContext.getBean(PhoneticRepository.class);
    }
        
    public PhoneticRepository getPhoneticRepository() {
        return this.phoneticRepository;
    }
}

The last piece of the puzzle is the bi-function that implements the mapping stage.
   
private static BiFunctionEx<MyPhoneticService, Letter, Tuple2>
        myMapLetterFn() {
    return (myPhoneticService, letter) -> {
        String id = letter.getId();
        Optional optional = myPhoneticService.getPhoneticRepository().findById(id);
        if (optional.isEmpty()) {
            return Tuple2.tuple2(letter.getGreek(), "");
        } else {
            return Tuple2.tuple2(letter.getGreek(), optional.get().getRadio());
        }
    };
}

As a bi-function, it takes the instance of the “MyPhoneticService” and is called with for letter found by “MyLetterSource“. It attempts to look up the data with the matching primary key and returns the value if found or an empty String.

Summary

If you need to, you can write your sources and sinks for processing jobs, using the SourceBuilder and SinkBuilder interfaces that build class instances at runtime where the processing job runs.

If helpful, these can utilize Spring by passing in a SpringManagedContext to gain access to other beans.