Hazelcast Platform and CDC: A Perfect Pair
In a world of constant and rapid changes, everyone has to react faster and faster to events. However, many services weren’t created with streaming capabilities in mind, and owners had a difficult choice: should they lose potential money due to slower response or risk rewriting otherwise perfectly working software? This choice can be made less challenging if you look at Change Data Capture (CDC), a technology that allows you to stream data from your database, almost like a streaming system. Starting from Hazelcast Platform 5.5.2, we are happy to announce that Hazelcast Platform Enterprise CDC connector is now generally available, which makes using CDC in your application even more effortless.
In this article, I will explain CDC and show you a real-world (okay, okay, simplified) application of this technology. Let’s get started!
What is CDC?
CDC is a technique involving reading databases’ Write-Ahead-Log (or its equivalent like binlog in MySQL) and streaming changes based on WAL, instead of reading the database in batches. An important thing to remember is that since CDC involves reading low-level Write-Ahead-Log, it does not use typical SQL queries, which makes it less of a problem in terms of resources. Reading WAL also makes them follow database transactions – changes are detected only if a transaction is committed without acquiring row-level locks. What a great option!
To archive at-least-once semantic guarantees, CDC libraries like Debezium remember the last read position in the WAL (e.g. LSN in PostgreSQL) and can resume reading from this position if there is any communication interruption.
More information on the Debezium 1.9 CDC Connector can is available in a previous blog post.
Hazelcast ♥️ CDC
Hazelcast has provided a CDC connector since Hazelcast Platform 5.0. The latest release of Hazelcast Platform 5.5.2 Enterprise Edition will also contain a new CDC connector based on Debezium 2.7.x and the brand-new Debezium Engine.
Why should you care about the new connector? Here are just a few reasons:
- A refreshed API with more options
- Small QoL improvements, like a dedicated nonNullValue function in ChangeRecord
- Access to the whole source field
- Ability to map records to ChangeRecords, JSON, or to provide custom mapping straight from SourceRecord, which will avoid the overhead of double conversion
- Ability to use the standard Debezium Engine (default) and new Asynchronous Engine.
- We’ve opened SourceExtractor SPI – you can (officially) provide your own SourceExtractor implementation, giving every event a distinct sequence number. Previously it was a hidden interface used by MySQL and PostgreSQL facades, but now we’re opening it to our users.
- New databases and versions are supported with Debezium 2.7.x engine
- Specifically, newer MySQL versions, PostgreSQL versions, DB2, and more…
- Internals were refactored a lot to make them better. Just kidding, we know that users do not care about internals—they shouldn’t! One promise we make is that the new connector is mostly a drop-in replacement for the old connector despite many internal changes. We made sure the changes are minimal, and you will mostly notice just other artifact names and package names (with the same API class structure in them).
General Availability
No more @Beta annotations. We do not plan any further wholesale changes in the API, and it’s tested like never before. To graduate from @Beta, it has to pass not only normal unit/integration tests but also robust soak tests and stress tests. Our test suites now also verify DB2, even temporal tests—which are painful to do (handling dates is painful, let’s be honest)—but everything to make it work flawlessly in Production!
Let’s start coding
As the author, I could talk and talk about this feature; you can probably do the same with the features you develop, especially when they make you proud. However, I think that a small example would be a better way of showing why it is a great addition.
Example’s topic
One of the more prominent industries using the Hazelcast Platform is finance and banking. Therefore, my example will involve a bank payment processing system. Many banks work on a mix of modern and legacy systems, which cannot be easily upgraded due to their complexity and uptime guarantees.
Here, we will show you how to add streaming of payment details to other services without interrupting* source database workloads.
As it’s the year of AI, we will use another Hazelcast Platform 5.5 feature—Vector Collections. Streaming payments will be sent to a service that compares data with historical records to identify the 10 most similar recommendations. If there is a match, we will consider it a fraudulent payment and mark the transaction. Thieves, be aware—we are coming!
Prerequisites
Let’s make sure we start at the same point. In this example, we will use:
- Java 23
- Maven 3.9
- Hazelcast Platform 5.5.2
- Docker with MySQL image
- Hazelcast Platform license
The database will be based on Debezium’s example-mysql container. We won’t use the original tables; we will use them just because they are preconfigured to use CDC.
If you don’t have an existing license, you can request a new trial license here and set it as HZ_LICENSEKEY environment variable or -Dhz.licensekey system variable.
Database and data model setup
We will use the following tables:
CREATE TABLE inventory.payments ( paymentId BIGINT NOT NULL primary key, sourceAccountNo VARCHAR(255) NOT NULL, targetAccountNo VARCHAR(255) NOT NULL, title VARCHAR(255) NOT NULL );
This table will have the following Java representation:
record Payment ( long paymentId, String sourceAccountNo, String targetAccountNo, String title );
Additional used model classes will be:
enum Classifier { LEGIT, SUSPICIOUS } record ProcessedPayment (Classifier classifier, Payment payment) { @Override public String toString() { return String.format("%-10s :: %s", classifier, payment); } }
Our source will be created using the MySqlCdcSources builder:
var source = MySqlCdcSources .mysql("payments stream") .setDatabaseAddress(container.getHost(), container.getMappedPort(MYSQL_PORT)) .setDatabaseCredentials("debezium", "dbz") .setDatabaseName("mysql") .setTableIncludeList("inventory.payments") .setSnapshotMode(INITIAL) .setProperty("heartbeat.interval.ms", 1000) .changeRecord();
We declare databaseClientId (required by Debezium), database address, and credentials and specify which table will be streamed.
Now, we must think more about our use case. How will we compare the similarities in the payments? The answer is simple: vector similarity search. The first thing to consider in such a case is the vector representation of the payment.
To make things easier, for this example we use AllMiniLmL6V2EmbeddingModel and Langchain4J library. We transform the toString() representation of the Payment object into an embedding. It’s far from perfect for real-world applications, you should use some more sophisticated algorithms there, but for this example it should be good enough.
Let’s add the following method to the Payment record:
VectorValues toVector() { return VectorValues.of(embeddingModel.embed(this.toString()).content().vector()); }
NOTE: In this demo, I am using the Testcontainers MySQL module. It’s very simple to set up – you add dependency and then in your main method you go with:
try (var container = new MySQLContainer<>(DEBEZIUM_MYSQL_IMAGE) .withDatabaseName("mysql") .withUsername("mysqluser") .withPassword("mysqlpw") .withExposedPorts(3306) .withNetworkAliases("inputDatabase")) { container.start();
The code is available for download here.
Now it’s the time to build our pipeline – in Hazelcast Platform terminology, a pipeline can be seen as a set of commands arranged in connected stages. It’s very similar to Java Stream API in many cases, so looking at it will be easier than writing about it:
var classifiedPayments = pipeline .readFrom(source) (1) .withIngestionTimestamps() (2) .filter(changeRecord -> changeRecord.operation() != Operation.UNSPECIFIED) (3) .map(changeRecord -> changeRecord.nonNullValue().toObject(Payment.class)) (4) .apply(mapUsingVectorSearch("processedPayments", SearchOptions.of(10, true, true), Payment::toVector, Main::classify)); (5)
In this pipeline:
- We read from a previously defined source (CDC)
- We specify what timestamps will be used as an event time. Since it is a streaming source, we must provide timestamps to enable future aggregation support. In this example, we can use withoutTimestamps too.
- We filter out events that do not represent typical operations. The Debezium connector will also send messages like heartbeat as ChangeRecords; operation UNSPECIFIED will allow us to filter non-data-change events.
- In the ChangeRecord’s value, we have the value of the updated object—new values for INSERT, SYNC, and UPDATE and old values for DELETE. We get the value (nonNullValue will add null checks, so your IDE won’t complain) and use toObject to map underlying JSON into the Payment record automatically.
- This step is the most critical transformation for us. We specify processedPayments as the VectorCollection name with classified objects, then set the SearchOptions and the function that maps Payment objects into vectors (creates embedding). Lastly, we specify the result mapping function—in our case it will be a function called classify, which will take an incoming Payment and match it with the vectors most similar to those in the VectorCollection.
The classify function is quite simple: We create initial scores for both options (1), then iterate over the results and update the total score of option (2). Lastly, we compare which option has the higher total score and return this classification (3).
private static ProcessedPayment classify(Payment payment, SearchResults<Long, ProcessedPayment> searchResults) { float[] scores = { 0, 0 }; // (1) searchResults.results().forEachRemaining(result -> { ProcessedPayment value = result.getValue(); assert value != null; scores[value.classifier().ordinal()] += result.getScore(); // (2) }); Classifier classifier = scores[0] > scores[1] ? Classifier.LEGIT : Classifier.SUSPICIOUS; // (3) return new ProcessedPayment(classifier, payment); }
That’s basically it! There is some additional code needed to start the Hazelcast Platform instance and insert initial data into the processedPayments collection and if you want to check out the full code, it’s available here
Now we can launch our example, insert some data and see the logs:
[0c66-802b-08c0-0001/loggerSink#0] SUSPICIOUS :: 1014 | 51268mafia -> 74517mafia | extortion [0c66-802b-08c0-0001/loggerSink#0] SUSPICIOUS :: 1012 | 23764mafia -> 0794791404 | extortion [0c66-802b-08c0-0001/loggerSink#0] LEGIT :: 1013 | 9026410217 -> 8563318726 | pizza [0c66-802b-08c0-0001/loggerSink#0] LEGIT :: 1015 | 3114649574 -> 5601091690 | pizza [0c66-802b-08c0-0001/loggerSink#0] SUSPICIOUS :: 1016 | 29666mafia -> 9272863155 | extortion
Main Class
To make things easier, here is the full Main class:
package com.hazelcast; import com.hazelcast.config.Config; import com.hazelcast.config.vector.*; import com.hazelcast.core.*; import com.hazelcast.enterprise.jet.cdc.Operation; import com.hazelcast.enterprise.jet.cdc.mysql.MySqlCdcSources; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.spi.properties.ClusterProperty; import com.hazelcast.vector.*; import org.apache.commons.lang3.RandomUtils; import org.testcontainers.containers.MySQLContainer; import org.testcontainers.utility.DockerImageName; import java.sql.* import java.time.Duration; import java.util.List; import static com.hazelcast.jet.datamodel.Tuple2.tuple2; import static com.hazelcast.vector.jet.VectorTransforms.mapUsingVectorSearch; @SuppressWarnings("SqlResolve") public class Main { public static final DockerImageName DEBEZIUM_MYSQL_IMAGE = DockerImageName.parse("debezium/example-mysql:2.7.1.Final") .asCompatibleSubstituteFor("mysql"); enum Classifier { LEGIT, SUSPICIOUS } record ProcessedPayment (Classifier classifier, Payment payment) { @Override public String toString() { return String.format("%-10s :: %s", classifier, payment); } } public static void main(String[] args) throws InterruptedException { //noinspection resource try (var container = new MySQLContainer<>(DEBEZIUM_MYSQL_IMAGE) .withDatabaseName("mysql") .withUsername("mysqluser") .withPassword("mysqlpw") .withExposedPorts(3306)) { container.start(); createTable(container); HazelcastInstance hz = startHazelcast(); VectorCollection<Long, ProcessedPayment> processedPayments = VectorCollection.getCollection(hz, "processedPayments"); insertTrainingData(processedPayments); Pipeline pipeline = Pipeline.create(); var source = MySqlCdcSources .mysql("payments") .setDatabaseAddress(container.getHost(), container.getMappedPort(MySQLContainer.MYSQL_PORT)) .setDatabaseCredentials("debezium", "dbz") .setTableIncludeList("inventory.payments") .build(); var classifiedPayments = pipeline .readFrom(source) .withIngestionTimestamps() .filter(changeRecord -> changeRecord.operation() != Operation.UNSPECIFIED) .map(changeRecord -> changeRecord.nonNullValue().toObject(Payment.class)) .apply(mapUsingVectorSearch("processedPayments", SearchOptions.of(10, true, true), Payment::toVector, Main::classify)); classifiedPayments.writeTo(Sinks.logger()); classifiedPayments .map(procPay -> tuple2(procPay.payment.paymentId(), procPay)) .writeTo(Sinks.map("results")); hz.getJet().newJob(pipeline); Thread.sleep(Duration.ofSeconds(3)); insertPayment(container, Payment.generatePayment(Classifier.SUSPICIOUS)); insertPayment(container, Payment.generatePayment(Classifier.LEGIT)); RandomUtils randomUtils = RandomUtils.insecure(); //noinspection InfiniteLoopStatement while (true) { Classifier classifier = randomUtils.randomBoolean() ? Classifier.LEGIT : Classifier.SUSPICIOUS; insertPayment(container, Payment.generatePayment(classifier)); Thread.sleep(Duration.ofSeconds(2)); } } } private static ProcessedPayment classify(Payment payment, SearchResults<Long, ProcessedPayment> searchResults) { float[] scores = { 0, 0 }; searchResults.results().forEachRemaining(result -> { ProcessedPayment value = result.getValue(); assert value != null; scores[value.classifier().ordinal()] += result.getScore(); }); Classifier classifier = scores[0] > scores[1] ? Classifier.LEGIT : Classifier.SUSPICIOUS; return new ProcessedPayment(classifier, payment); } private static HazelcastInstance startHazelcast() { Config config = Config.load(); VectorCollectionConfig vcc = new VectorCollectionConfig("processedPayments") .addVectorIndexConfig(new VectorIndexConfig("processedPayments-fraud", Metric.COSINE, 384, 40, 100, false)); config.addVectorCollectionConfig(vcc); config.getJetConfig().setEnabled(true); config.setProperty(ClusterProperty.PARTITION_COUNT.getName(), "7"); config.setProperty(ClusterProperty.LOGGING_TYPE.getName(), "log4j2"); return Hazelcast.newHazelcastInstance(config); } private static void createTable(MySQLContainer<?> container) { try (Connection conn = DriverManager.getConnection(container.getJdbcUrl(), container.getUsername(), container.getPassword()); Statement stmt = conn.createStatement()) { stmt.execute(""" CREATE TABLE inventory.payments ( paymentId BIGINT NOT NULL primary key, sourceAccountNo VARCHAR(255) NOT NULL, targetAccountNo VARCHAR(255) NOT NULL, title VARCHAR(255) NOT NULL ); """); } catch (SQLException e) { throw new RuntimeException(e); } } private static void insertPayment(MySQLContainer<?> container, Payment payment) { try (Connection conn = DriverManager.getConnection(container.getJdbcUrl(), container.getUsername(), container.getPassword()); PreparedStatement stmt = conn.prepareStatement( """ INSERT INTO inventory.payments(paymentId, sourceAccountNo, targetAccountNo, title) VALUES (?, ?, ?, ?); """ )) { stmt.setLong(1, payment.paymentId()); stmt.setString(2, payment.sourceAccountNo()); stmt.setString(3, payment.targetAccountNo()); stmt.setString(4, payment.title()); stmt.execute(); } catch (SQLException e) { throw new RuntimeException(e); } } private static void insertTrainingData(VectorCollection<Long, ProcessedPayment> processedPayments) { var sus = List.of( new Payment("0001mafia", "0002mafia", "totally not from robbery"), new Payment("0002mafia", "0001mafia", "robbery"), new Payment("000111111", "0002mafia", "extortion"), new Payment("00011458974813", "0001mafia", "extortion jan-jul"), new Payment("54867324894123", "0003mafia", "extortion jan-jul"), new Payment("3229462", "0002mafia", "mafia") ); var legit = List.of( new Payment("98894561567894231", "8423178492631", "invoice 1234"), new Payment("612894123189562", "1854513589", "salary"), new Payment("000111111", "0002mafia", "extortion"), new Payment("00011458974813", "54867324894123", "pizza"), new Payment("54867324894123", "00011458974813", "my tribute") ); for (Payment payment : sus) { processedPayments.putAsync(payment.paymentId(), VectorDocument.of(new ProcessedPayment(Classifier.SUSPICIOUS, payment), payment.toVector())); } for (Payment payment : legit) { processedPayments.putAsync(payment.paymentId(), VectorDocument.of(new ProcessedPayment(Classifier.LEGIT, payment), payment.toVector())); } } }
Conclusion
The new Hazelcast Platform Enterprise CDC connector provides an easy way to modernize an existing batch-based database system into a real-time streaming solution.
In addition:
- We did not have to set up Kafka / Pulsar / other queues and send copies of events there (= lower maintenance cost, lower complexity, more robust)
- We can easily “integrate” into existing databases without needing to change existing systems (= faster time to market)
- CDC, with Hazelcast Platform in particular, is a great way to enable the Transaction Outbox pattern in your system without significant overhead (= performance)
With everybody talking about AI these days, as you can see from our example, you can quickly enable AI features based on your existing data model. A few lines, and we had (very basic) vector search-based classification with events streamed in real-time from an external database. Neat!
Important and helpful links: