Everyone is jumping on the real-time bandwagon, especially by deploying streaming data technologies like Apache Kafka. But many of us are not taking advantage of real-time data in the fullest ways available. It’s worth assessing what you currently have in place for your real-time initiatives to explore what else can be done to create more value from your data.

Many of us use Kafka (or a similar message bus) to capture data on business events that ultimately can be stored in an analytics database. Information updates on business artifacts such as sales orders, product inventory levels, website clickstreams, etc., all can be transferred to Kafka and then transformed into a format optimized for analytics. This lets analysts quickly and easily find actionable insights from the data that they can pursue. Example actions include creating new promotions, replenishing stock, and adding webpage calls-to-action.

The typical real-time data journey is depicted in the diagram below, which represents a pattern that has long been known as the Kappa Architecture. While that is not a term that’s used as much these days as when it was first defined, it is still an important concept for designing systems that leverage real-time data (i.e., data that was just created).

Real-Time Analytics Diagram
The Kappa Architecture for Real-Time Analytics


The Real-Time Analytics Data Flow – A Quick Review

There are 3 data stages between the various components in this pattern, as labeled by the circled numerals, that contribute to the “real-timeness” of the data usage. Arrow #1 represents the transmission of data from the source to the message bus (e.g., Apache Kafka). In many cases, the data in this leg of the journey is not significantly modified, so the data loading process typically occurs very quickly. This stage is a simple form of serialization, in which the data is formatted as comma-separated values (CSV) or JSON prior to insertion into the message bus, to simplify or standardize downstream processing. Sometimes the data is filtered or aggregated prior to loading it into the message bus to reduce storage requirements. But in general, keeping all data points intact is a common practice that lets subsequent data processing steps work on the raw data at a granular level.

Arrow #2 represents the most significant step in this pattern, where data must be processed to turn it into a queryable format to store in a database (or any analytical store) and thus make it more usable by human analysts. In this stage, data can be filtered, transformed, enriched, and aggregated, and then inserted into an analytics platform like a database or data warehouse. We know this step to be the extract/transform/load (ETL) step, and many “streaming ETL” technologies designed for data-in-motion can be used to perform this step. Traditional batch-oriented tools can also be used for this step, with the caveat that delays are introduced due to the time intervals of periodic ETL processing, versus continuous ETL processing. Once the queryable data is in the analytics platform, additional processing could be done, including creating indexes and materialized views to speed up queries.

One great thing about step #2 is that you aren’t limited to a single flow. In other words, you can run as many processing jobs as you want on the raw data. This is why the use of Kafka as intermediary storage is important here. Each of your processing jobs can independently read data from Kafka and format the data in its own way. This allows not only the ability to support many distinct types of analytical patterns by the analysts, but also the opportunity to easily fix any data errors caused by bugs in the processing code (also referred to as “human fault tolerance”).

Arrow #3 represents the last mile that entails quickly delivering query results to analysts. The delivery speed is mostly a function of the analytics database, so many technologies in the market today tout their speed of delivering query results.

The faster each of these steps run, the sooner you get data to the analysts, and thus the sooner you can take action. Certainly, the speed of the 3 steps combined is a limiting factor in achieving real-time action, but the main bottleneck is the speed of the human analysts. Fortunately, in most real-time analytics use cases, this is not a major problem, as the relatively slow responsiveness of human analysts is well within any service-level agreement (SLA) that the business has defined.

Adding Real Real-Time Action

At this point, you likely know where we’re going in this blog—that there is a complementary pattern that can add new uses for your real-time data that cannot wait for human responses. This pattern involves the use of stream processing engines to automate a real-time response to your real-time data stored in Kafka.

Real-Time Analytics with Hazelcast Diagram
Real-Time Stream Processing Complements Your Existing Infrastructure

In addition to arrows #2 and #3, you have arrows #2A and #3A in which real-time applications take care of the things that should be completed quickly and automatically without waiting for human intervention. Consider real-time offers while your customers are interacting with you (via shopping, browsing, banking), versus delivering an offer later in the day after they’ve left. Or how about sending your customer an immediate SMS about a potentially fraudulent transaction on their credit card? If the transaction is legitimately initiated by the customer, you won’t inadvertently deny the card swipe, and they won’t use a different credit card, causing you to lose the transaction fee. Or what about any process that involves machine learning (e.g., predictive maintenance, real-time route optimization, logistics tracking, etc.) that needs to deliver a prediction immediately so that quick corrective action can avoid a bigger disaster?

You Already Have the Data, Now Make More Use of It

If you already have a real-time data analytics system that leverages real-time data in Kafka, think about all the other real-time use cases that can help give your business additional advantage. This is where Hazelcast can help. You don’t have to throw out what you’ve already built. You simply tap into the data streams you already have, and automate the actions to create real-time responses.

If you’d like to explore what other real-time opportunities you might have that you are not currently handling, please contact us and we’d be happy to discuss these with you.

 

Why did I join Hazelcast?

  1. Hazelcasters: Culture is critical to me and it starts with leadership. At Hazelcast, I have the opportunity to work with a super-intelligent, strategic and collaborative team passionate about helping our customers on their real-time journey. We attract strong talent, empower them to succeed, and guide their career development.
  2. Enabling Real-Time Innovation: The golden thread throughout my career has been educating people about breakthrough innovations in data and analytics to improve results. At Hazelcast, I have the opportunity to educate business leaders and developers who want help to enable real-time automated actions. By harnessing the fresh, new data streaming 24/7 and instantly joining that with deep contextual data hidden in databases, businesses are getting measurable business results, including growing revenue by acquiring and retaining customers, cutting costs, and mitigating risk. In addition, I want to help business leaders get better results by seizing the moment in real time.
  3. Hazelcast Customers: Hazelcast works with many Fortune 500 brands around the globe and we’re building a community of real-time innovators in financial services, banking, retail/e-commerce, transportation, and other industries.

Let’s dive into what’s happening in the market.

Are You Ready to Compete in the Real-Time Economy?

While many business leaders doubled down to accelerate their digital transformation efforts during the pandemic, business customers and consumers raised their expectations – again.

No one wants to wait. Time is precious. People want instant gratification while they are interacting with a business, regardless of whether they are browsing, shopping, ordering, banking, traveling, or having a bad experience.

That means the bar is raising for business leaders. Offering a personalized product or service, resolving a problem, or taking action one day, one hour or one minute later is too late.

Think about the travel disruption caused by a national airline in December 2022. More than 16,700 flights were canceled, leaving families and staff stranded over the holidays. Ultimately, the airline needed to resolve customer and employee problems faster. So how did it happen?

It turns out outdated IT infrastructure is to blame. Although some parts of their infrastructure could automate actions in real-time (alerting people of canceled flights), they didn’t have a fully enabled real-time business. They are not alone. Although business leaders have worked with IT to update some of their infrastructure for real-time automated action, it just takes one bottleneck, or weak link in the chain, for things to start falling apart.

Moments matter. Opportunities pass in an instant.

Which companies are leading in this real-time economy? Look no further than Meta/Facebook, Apple, Amazon, Netflix and Alphabet/Google (FAANGs). They act in real time by unlocking the superpower of combining new and historical data and leveraging ML to automate actions. The result? The ability to respond to customers’ expectations in real-time, delivering highly personalized experiences and adapting in the moment to drive more revenue, cut costs, and mitigate risk.

Why has the bar been raised for non-FAANG executives? Because the people who use the FAANG services also use banks, healthcare providers, telcos, grocery stores, transportation, etc, and they expect the same real-time service level. As a result, the FAANGs are making it harder for companies in other industries to keep up.

Use Real-Time as a Strategic Weapon to Seize the Moment

All these successful digital businesses have one thing in common: They use real-time as a strategic weapon.

They present highly customized options while customers are online or when they need a service. Consumers receive shortlists that uniquely fit them and change in real time as their behavior changes. Carefully curated choices of search topics and ads, prospective friends, movie titles, social topics, and retail products happen in the background as best-fit options.

With each new click, the personalization is refined to the individual; choices change in real-time—and real-time innovators seize the moment.

But, on its own, fresh streaming data needs more context. So, to unlock its power, it must be put into the same data processing environment as historical or stored data, providing the context that informs the best action.

Don’t Let Legacy Infrastructure Hold You Back from Seizing the Moment

While the FAANGs have raised the bar of customer expectations, many business leaders face considerable challenges in pulling off the right action at the right time.

According to McKinsey’s “The Data-Driven Enterprise of 2025” report, “only a fraction of data from connected devices is ingested, processed, queried, and analyzed in real-time due to the limits of legacy technology structures, the challenges of adopting more modern architectural elements, and the high computational demands of intensive, real-time processing jobs.”

As a result, “companies often must choose between speed and computational intensity, which can delay more sophisticated analyses and inhibit the implementation of real-time use cases.”

Systems need to act in the moments that matter most.

But, most enterprise IT architectures are saddled with systems designed for batch processing of stored data, not real-time processing of fresh data that can enable automated action in the moment.

How do Facebook, Netflix and Google overcome this problem? They have armies of engineers building solutions that meet real-time needs. They need technology to help.

Start Your Journey to Become a Real-Time Innovator

Many business leaders across industries are trying to figure out how to become real-time innovators to seize the moment in the real-time economy. They want to enable their business to act faster by automating action on a massive scale.

The secret sauce requires using real-time streaming 24/7, combining it with historical data, and leveraging machine learning (ML) to automate real-time actions. But this isn’t easy to do when starting with legacy infrastructure – which does not impact the cloud-first, “Silicon Valley” companies. And the thought of ripping and replacing is as painful as it is time-consuming. The good news is just about any company can take steps to evolve from batch processing to solutions that support real-time automated action.

The Benefits of Real-Time Automated Action

Real-time innovators are working with Hazelcast to extend their current infrastructure with a modern layer that enables real-time automated action. As a result, they are seizing the moment and improving outcomes, such as:

  1. Boosting revenue and improving customer experience during every interaction with personalized real-time offers. For example, BNP Paribas, captured a 400% increase in conversion rates on loans offered in the moment of need.
  2. Cutting costs and mitigating risk with more robust fraud detection during every transaction. For example, a top 10 global bank improved fraud detection while improving customer satisfaction.
  3. Gaining a competitive edge and saving developer time by simplifying the development and maintenance of real-time apps. Here is an example of a bank using real-time data to meet strict customer SLAs.

Why Wait? Seize the Moment

The Real-Time Economy is here, and there will be winners and losers across all industries. Real-time presents an opportunity to acquire and retain customers, cut costs and mitigate risk. Real-time innovators are embarking on a real-time journey to act in the moments that matter most to business customers and consumers. The good news is that newer technologies can quickly bring real-time responsiveness to companies across industries, and any company can begin its real-time journey now.

Don’t wait. I hope you take the opportunity to seize the moment in real time.

For more information on leveraging real-time data to drive your business forward, I recommend starting with these four use cases: What is real-time?

Follow us on LinkedIn.

Introduction

One of the most useful features of real-time stream processing is to combine the strengths and advantages of various technologies to provide a unique developer experience and an efficient way of processing data in real time at scale. Hazelcast is a real-time distributed computation and storage platform for consistently low latency queries, aggregation and stateful computation against real-time event streams and traditional data sources. Apache Pulsar is a real-time multitenant geo-replicated distributed pub-sub messaging and streaming platform for real-time workloads handling millions of events per hour.   

However, real-time stream processing is not an easy task, especially when combining multiple live streams with large volumes of data stored in external data storages to provide context and instant results. When it comes to usage, Hazelcast can be used for stateful data processing over real-time streaming data, data at rest or a combination of both, querying streaming and batch data sources directly using SQL, distributed coordination for microservices, replicating data from one region to another or between data centres in the same region. Meanwhile, Apache Pulsar can be used for both messaging and streaming use cases, taking the place of multiple products and provides a superset of their features.   Apache Pulsar is a cloud-native multitenant unified messaging platform to replace Apache Kafka, RabbitMQ, MQTT and legacy messaging platforms.   Apache Pulsar provides an infinite message bus for Hazelcast to act as an instant source and sink for any and all data sources.

Prerequisites

We’re building an application where we ingest data from Apache Pulsar into Hazelcast and then process it in real-time. To run this application, make sure your system has the following components:

  • Hazelcast installed on your system: we’re using CLI
  • Pulsar installed on your system: we’re using Docker

If you have macOS & Homebrew, you can install Hazelcast using the following command:

brew tap hazelcast/hz

brew install [email protected]

 Check if Hazelcast is installed:

hz -V

Then start a local cluster:

hz start

You should see the following in the console:

INFO: [192.168.1.164]:5701 [dev] [5.2.1]

Members {size:1, ver:1} [

  Member [192.168.1.164]:5701 - 4221d540-e34e-4ff2-8ad3-41e060b895ce this

]

You can start Pulsar in Docker using the following command:

docker run -it -p 6650:6650 -p 8080:8080 \

    --mount source=pulsardata,target=/pulsar/data \

    --mount source=pulsarconf,target=/pulsar/conf \

    apachepulsar/pulsar:2.11.0 bin/pulsar standalone

To install Management Center, use one of the following methods, depending on your operating system:

 brew tap hazelcast/hz

brew install [email protected]

 Check that Management Center is installed:

hz-mc -V

Data collection:

For our application, we wish to ingest air quality readings from around the United States via the AirNow data provider.  If you wish to learn more about Air Quality, check out the information at AirNow.

Sourcehttps://docs.airnowapi.org/

With a simple Java application we make REST calls to the AirNow API that provides air quality reading for major zip codes around the United States.   The java application sends the JSON encoded AirNow data to the “airquality” Pulsar topic..   From this point a Hazelcast application can read it.  

Source:   https://github.com/tspannhw/spring-pulsar-airquality 

We also have a Java Pulsar function receiving each event from the “airquality” topic and parsing it into different topics based on which type of air quality reading it is.   This includes PM2.5, PM10 and Ozone.

Source: https://github.com/tspannhw/pulsar-airquality-function 

Example AirQuality Data

{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","latitude":39.95,"longitude":-75.151,"parameterName":"PM10","aqi":19,"category":{"number":1,"name":"Good","additionalProperties":{}},"additionalProperties":{}}

Example Ozone Data

{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"O3","latitude":39.95,"longitude":-75.151,"aqi":8}

Example PM10 Data

{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM10","latitude":39.95,"longitude":-75.151,"aqi":19}

Example PM2.5 Data

{"dateObserved":"2023-01-19 ","hourObserved":12,"localTimeZone":"EST","reportingArea":"Philadelphia","stateCode":"PA","parameterName":"PM2.5","latitude":39.95,"longitude":-75.151,"aqi":54}

Data processing

In order to process the data collected, we used the Hazelcast Pulsar connector module to ingest data from Pulsar topics (note: you can use the same connector to write to Pulsar topics). Using Hazelcast allows us to compute various aggregation functions (sum, avg etc.) in real time on a specified window of stream items. The Pulsar connector uses the Pulsar client library, which has two different ways of reading messages from a Pulsar topic. These are Consumer API and Reader API, both use the builder pattern (for more information click here).

In your pom file, import the following dependencies. 

<dependency>

        <groupId>com.hazelcast</groupId>

        <artifactId>hazelcast</artifactId>

        <version>5.1.4</version>

    </dependency>

    <dependency>

        <groupId>com.hazelcast.jet.contrib</groupId>

        <artifactId>pulsar</artifactId>

        <version>0.1</version>

    </dependency>

    <dependency>

        <groupId>org.apache.pulsar</groupId>

        <artifactId>pulsar-client</artifactId>

        <version>2.10.1</version>

    </dependency> 

We create a PulsarSources.pulsarReaderBuilder instance to connect to the previously started pulsar cluster located at pulsar://localhost:6650

StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(

     topicName,

     () -> PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(),

     () -> Schema.JSON(Event.class),

     Message::getValue).build();

 We then create a pipeline to read from the source with a sliding window and aggregate count, before we write to logger:

Pipeline p = Pipeline.create();

p.readFrom(source)

 .withNativeTimestamps(0)

 .groupingKey(Event::getUser)

 .window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))

 .aggregate(counting())

 .writeTo(Sinks.logger(wr -> String.format(

      "At %s Pulsar got %,d messages in the previous minute from %s.",

      TIME_FORMATTER.format(LocalDateTime.ofInstant(

              Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),

      wr.result(), wr.key()))); 

JobConfig cfg = new JobConfig()

     .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)

     .setSnapshotIntervalMillis(SECONDS.toMillis(1))

     .setName("pulsar-airquality-counter");

HazelcastInstance hz = Hazelcast.bootstrappedInstance();

hz.getJet().newJob(p, cfg);

You can run the previous code from your IDE (in this case, it will create its own Hazelcast member and run the job on it), or you can run this on the previously started Hazelcast member (in this case, you need to create a runnable JAR including all dependencies required to run it): 

mvn package

bin/hz-cli submit target/pulsar-example-1.0-SNAPSHOT.jar

To cancel the job and shut down the Hazelcast cluster:

bin/hz-cli cancel pulsar-message-counter

hz-stop

Conclusion

In this blog post, we demonstrated how you can combine the strengths and advantages of various technologies to provide a unique developer experience and an efficient way of processing data in real time at scale. We stream of air quality data from Apache Pulsar into Hazelcast, where we processed data in real time. The rising trend in cloud technologies, the need for real-time intelligent applications and the urgency to process data at scale have brought us to a new chapter of real-time stream processing, where latencies are measured, not in minutes but in milliseconds and submilliseconds. 

Hazelcast allows you to quickly build resource-efficient, real-time applications. You can deploy it at any scale from small edge devices to a large cluster of cloud instances. A cluster of Hazelcast nodes share both the data storage and computational load which can dynamically scale up and down. When you add new nodes to the cluster, the data is automatically rebalanced across the cluster, and currently running computational tasks (known as jobs) snapshot their state and scale with processing guarantees.  Pulsar allows you to use your choice of messaging protocols to quickly distribute events between multiple types of consumers and producers and act as a universal message hub.   Pulsar separates compute from storage allowing for dynamic scaling and efficient handling of fast data.   StreamNative is the company made up of the original creators of Apache Pulsar and Apache BookKeeper.   StreamNative provides a full enterprise experience for Apache Pulsar in the cloud and on premise. 

 

More on Hazelcast

  • Join us on our Real-Time Stream Processing Unconference (#RTSPUnconf): https://hazelcast.com/lp/unconference/
  • Learn the Hazelcast Fundamentals: Start a Local Cluster with the CLI or Docker.
  • Start a Viridian Serverless Cluster: Serverless is a managed cloud service that offers a pay-as-you-go pricing model. Serverless clusters auto-scale to provide the resources that your application needs. You pay only for the resources that your application consumes.
  • Join the Hazelcast Slack and Hazelcast Github repository.

More on Apache Pulsar

More on the Authors

 

 

 

 

 

 

Tim Spann

Developer Advocate at StreamNative

https://twitter.com/paasdev 

https://github.com/tspannhw/SpeakerProfile/blob/main/README.md

 

 

 

 

 

 

 

Fawaz Ghali

Principal Developer Advocate at Hazelcast

https://www.linkedin.com/in/fawazghali/

https://twitter.com/FawazGhali

Introduction

The popularity of Kubernetes makes it one of the most widely used platforms for deploying applications. And there are many tools to simplify Kubernetes deployments. But, two deployment solutions stand out among others: Helm and Operator.

Why do we need an Operator?

Hazelcast’s platform is cloud-native, and for this purpose, we have a well-supported Helm chart that allows for easy installation and configuration of the Hazelcast Platform and Management Center. The Hazelcast Platform is a powerful product, but with great features brings some complexity. Therefore, we aimed to make the users’ cloud-native experience with Hazelcast as smooth as possible. With the Helm chart, it is pretty straightforward to install the Hazelcast cluster, but more hands-on work may be required when we want to use some of the Platform’s advanced features. So, instead of trying to script Helm to do what it’s not supposed to, the decision was to write a Kubernetes Operator.

One of the decisions taken for the operator development was to give an abstraction over the Hazelcast Platform configuration. It means one can’t provide hazelcast.yaml to configure the Hazelcast cluster. Instead, we deliver a smooth API that focuses on features rather than the powerful yet complex Hazelcast configuration.

At the moment, Hazelcast Platform Operator gives us full control of the application and features lifecycle, which makes it a Level III operator. But besides that, having an operator makes it easier to grow and add additional features such as metrics or scaling in the future.

The problems that the operator solves

The table below represents a summary of the Operator and Helm Capabilities that will be described in detail further down.

Operator Helm
Simple one-liner cluster installation
Makes Hazelcast Platform Kubernetes-native
Automatic platform dependant configuration
Dynamic change of the configuration without cluster restart
Full control over all the Hazelcast Platform configs
Automatic feature lifecycle control  (no need for additional REST calls)
Simple API abstraction over complex configuration
Feature completeness

Custom Resource Definition (CRD)

With the operator, we can make the Hazelcast Platform truly Kubernetes-native with the help of CRD. With this, we can create a custom resource called Hazelcast and manage it as any other Kubernetes resource.

apiVersion: hazelcast.com/v1alpha1 
kind: Hazelcast 
metadata:   
  name: my-hazelcast

Helm and Operator allow using opinionated configuration so that the user doesn’t need to configure anything to install a basic cluster. However, with the Operator, we can decide on the runtime and what configuration is better to apply. For example, we use a different SecurityContextconfiguration for the StatefulSet if the cluster runs on the OpenShift platform. Using the Helm chart, the user must manually configure the SecurityContext for a specific platform.

And, of course, Hazelcast CR would not be truly Kubernetes-native if you could not observe its status.

$ kubectl get hazelcast my-hazelcast 
NAME         STATUS  MEMBERS EXTERNAL-ADDRESSES 
my-hazelcast Running 3/3

Dynamic configuration

One might not use some Hazelcast features from the beginning but can decide to enable them in the future. Let’s use WAN Replication as an example of a feature we want to add. It can be configured and enabled with the Helm chat. However, such a change will need to restart the cluster. On the other hand, the Operator can update the configuration without the cluster restart simply by creating the WanReplication CR.

apiVersion: hazelcast.com/v1alpha1
kind: WanReplication 
metadata:   
  name: wanreplication-sample 
spec:   
  mapResourceName: my-map   
  targetClusterName: dev   
  endpoints: "203.0.113.61"

API

The crucial part of every product is its API. With the operator, we want to ease the usage of the Hazelcast Platform. And for a great experience, we need to provide a great API that will abstract all the complexity and be intuitive for the user. One example is connecting to Hazelcast running on Kubernetes from outside with a smart client. User needs to create the service per pod manually, making scaling more difficult, or use a third-party tool such as metacontroller. With the Operator, users can declaratively specify the exposeExternally configuration in the Hazelcast CRD.

apiVersion: hazelcast.com/v1alpha1
kind: Hazelcast
metadata:
  name: hazelcast
spec:
  licenseKeySecret: hazelcast-license-key
  exposeExternally:
    type: Smart
    discoveryServiceType: LoadBalancer
    memberAccess: NodePortExternalIP

And the operator will take care of creating the service per pod.

Feature lifecycle

Hazelcast Platform is rich in features. However, these features can be advanced and may require multiple manual steps. For example, let’s say the user wants to create a Backup of the cluster and copy it to external storage. Helm chart will allow you to mount the volume for the members, but since Helm does not manage the application lifecycle, you will have to trigger the Backup process manually using any available API. After, you must copy the Backup to the external storage of choice. The operator simplifies these operations so that all you will need is to enable persistence and apply the HotBackup CRD.

apiVersion: hazelcast.com/v1alpha1
kind: Hazelcast
metadata:
  name: hazelcast
spec:
  licenseKeySecret: hazelcast-license-key
  persistence:
    baseDir: "/data/hot-restart/"
apiVersion: hazelcast.com/v1alpha1
kind: HotBackup
metadata:
  name: hot-backup
spec:
  hazelcastResourceName: my-hazelcast
  bucketURI: "s3://operator-backup"
  secret: "br-secret-s3"

Moreover, the Backup process can be Scheduled, allowing it to run periodically without any interventions.

Conclusions

We have seen how users can benefit from the Operator and how it can simplify the usage of the Hazelcast Platform. However, this doesn’t mean that the Helm chart is no longer needed; it can serve a different purpose. The Hazelcast Helm chart is the preferred option among experienced Hazelcast options as it provides the user with complete control over the Hazelcast configurations. But, for the users that prefer simplicity, automation, and a Kubernetes-native experience, the Operator is the way to go. It might seem that the Hazelcast Operator supports fewer Hazelcast Platform features than the Helm chart; however, it is still in active development. In the near future, we plan to achieve feature parity so you can choose based on your needs, not on the supported features. Don’t hesitate to share your experience with us in our community Slack or Github repository.

In this blog post, we are happy to work with our community member, Martin W. Kirst, who is a leading software architect for Oliver Wyman, a global strategic consultancy company. He has been crafting software and building large cloud platforms for three decades and has gained plenty of experience in various roles such as Software Engineering, Software Architecture, and Product Ownership across the Energy Retail, Logistic/Transport, Finance/Accounting, and Meteorology. He’s super passionate about technologies, tools and automation to drive the business. While developing the infrastructure for a large cloud-native workflow automation and orchestration platform, he got in touch with Hazelcast. It’s our please to welcome Martin’s contribution of Ringbuffer support in Hazelcast’s Go Client SDK. You can connect with Martin on Linkedin: https://www.linkedin.com/in/martin-w-kirst/ 

Introduction

Hazelcast Go Client v1.4 brings ringbuffer support into the Go language. Hazelcast has offered the ringbuffer feature for a while and enables a broad range of integration options for distributed applications. With the latest release, the Go community will benefit from the ringbuffer feature as well. Hazelcast’s ringbuffer integration is a replicated, but not partitioned, data structure that stores its data in a ring-like structure. You can think of it as a circular array with a given capacity. Each ringbuffer has a tail and a head. The tail is where the items are added and the head is where the items are overwritten or expired. Each client is connected to a ringbuffer that can independently read items between the head and tail, which makes the ringbuffer an ideal candidate for publish and subscribe or efficient data replication use cases.

Example code

The following example will illustrate how to use a Hazelcast ringbuffer in a scenario with distributed Go applications that forward/replicate message events. For simplicity, imagine there are three microservices: Publisher, Logger, and Alerter. As the names already indicate, the Publisher will emit message events only. The Logger will only log each message event to the console, and the Alerter will wait for a certain message event to create an alert. The Logger and Alerter services will demonstrate that each subscriber to the ringbuffer operates independently.

Let’s have a look at the Publisher code first.

Creating a new Hazelcast client connection and attaching to a ringbuffer can be done in just two lines (errors are omitted for brevity)

client, _ := hazelcast.StartNewClient(ctx)

rb, _ := client.GetRingbuffer(ctx, "my-ringbuffer")

 

Our trivial Publisher is supposed to send a message every second and with a 10% chance send an alert.

for true {

msg := fmt.Sprintf("Hello World, sender's time is %s", time.Now())

if rand.Intn(100) < 10 {

// a 10% chance of sending an alert

msg = fmt.Sprintf("Alert at %s", time.Now())

}

sequence, _ := rb.Add(ctx, msg, hazelcast.OverflowPolicyOverwrite)

log.Printf("Published with sequence=%d, msg=%s", sequence, msg)

time.Sleep(1 * time.Second)

}

 

Let’s have a look at the Logger, which will simply log every message to the console by reading one item at a time.

for sequence := int64(0); true; sequence++ {

item, _ := rb.ReadOne(ctx, sequence)

log.Printf("Received item idx=%d: %s", sequence, item)

}

 

Last but more interestingly, the Alerter will read many events but only print alert messages to the console.

for sequence := int64(0); true; {

resultSet, _ := rb.ReadMany(ctx, sequence, 5, 10, nil)

for i := 0; i < resultSet.Size(); i++ {

if msg, err := resultSet.Get(i); err == nil && strings.HasPrefix(msg.(string), "Alert") {

msgSequence, _ := resultSet.GetSequence(i)

log.Printf("Received alert sequence=%d msg=%s", msgSequence, msg)

}

}

sequence = resultSet.GetNextSequenceToReadFrom()

}

While looking at the Alerter’s code, you’ll recognize multiple message items are read from the ringbuffer at once. The ReadMany() function will block until the minimum number of messages (5 in this case) is available in the ringbuffer. So this microservice controls its own sequence number, to read from the ringbuffer independently from the Logger.

The following screenshots from the console will show you, the Publisher’s output.

While running Logger in one console and the Alerter in another one, you can observe how each service reading from the ringbuffer will receive the same sequence numbers, as sent by Hazelcast, but has independent control over when and what to read.

By using ringbuffers, clients are able to read items from the past, as long as the capacity is not reached. When it’s reached, the oldest items are overwritten (when using OverflowPolicy). This helps integrate services, including those having short service interrupts during deployments. In our scenario, the Logger deployment could take multiple minutes to start up without losing messages, because once the new Logger instance is deployed and ready, it would pickup messages from the past.

Summary

This blog post explained how to use a Hazelcast ringbuffer in a scenario with distributed Go applications which will forward/replicate message events. This example code gives you a first impression of how easy and simple implementation of Hazelcast ringbuffers in Go can be. Depending on your use cases you might be interested in fine-tuning the capacity or even a time-to-live (TTL) value for your ringbuffer. Such useful settings are done on Hazelcast server side and are well documented. Don’t hesitate to share your experience with us in our community Slack or Github repository.

Do you remember the early days of Big Data and the three V’s: Volume, Variety, and Velocity? Some people got a bit carried away with the alliteration and came up with even longer lists, but those three were at the heart of most lists. Data management vendors have made considerable efforts to tackle those problems because they were, and still are, a driving factor in new projects for their customers and prospects (and thus, were wonderful opportunities for revenue for them), but while many companies have succeeded in overcoming their volume and variety challenges, some seem to still contend with velocity issues. 

Big Data Volume and Variety have been addressed

The “Big” in Big Data started out meaning “large” (the first V, Volume), which really meant “too large for us to adequately handle right now”. There was a massive amount of new data that was being generated by and on the Internet and the thinking was (and still is) that it all should be stored for analysis later. Storage is always getting bigger, improving in both total amount able to be stored and data density, so dealing with volume didn’t require too much of a shift in technology.

Variety was a bit more of a challenge – it required a new way of thinking. Much of the new data being generated was not necessarily structured (relational) data that fit easily into existing traditional relational database management systems, but rather included unknown types of data or unstructured data. Companies wanted a way to store it all very quickly without analyzing types or doing transformations first, which would have allowed it to fit into their RDBMSs. That problem was addressed by creating new kinds of databases, non-relational or NoSQL databases, that could store any data, especially unstructured data, very quickly. But that introduced another problem, which I’ll touch on in the next section.

 

 

Big Data Velocity

Big Data Velocity has been the most challenging V to conquer and it remains a hurdle for many companies. It is especially tricky, and important, since it has a compounding effect on the other Vs. Storing large amounts of data isn’t necessarily a big challenge for some companies, but storing it as quickly as it arrives and more importantly, being able to analyze it in real-time, the moment it arrived, as opposed to the typical batch approach of analytics jobs over terabytes or petabytes of data that took hours or even days to complete, is still just a hope and a dream for many companies. 

By creating NoSQL data stores to handle lots of data and different types of data (volume and variety), companies like Google, Meta (nee Facebook), and LinkedIn created tools that were fast enough to handle the ingestion of lots of different data quickly (the Big Data trifecta – all three V’s) but something had to be sacrificed. The majority of the latency was really just moved downstream – reappearing when someone wanted to make use of that data, perhaps by analyzing it. Now they couldn’t analyze it quickly or easily, as they could with their relational data, because they had no way of easily retrieving data since there was no metadata (schema) stored which could tell them exactly what each data bit was and where it was. They had prioritized being able to store the data quickly over being able to retrieve and analyze the data quickly, so they were willing to sacrifice the metadata.

The key takeaway here is that these new NoSQL data stores were not able to eliminate latency but rather they moved it further downstream in the data processing pipeline – to the point of analysis. The tradeoff to extremely fast ingestion was analysis that was slow and not real-time.

Why is speed (velocity) so critical in data management?

  • People don’t like to wait. If your application takes too long, the user might just stop using it and find one of your competitors who has faster response times.
  • Less time on one thing leaves more time for others. If there is a multi-step process that is barely meeting a critical SLA, cutting some time out of any one of the steps gives you more leeway for meeting that SLA. And many of today’s business services are measured against SLAs – violations of which can be very costly.
  • Faster speeds give you time to iterate. Sometimes actions are best guesses and responses are needed in order to fine-tune the action in subsequent steps. Think of your data scientist modeling data for a new algorithm. The process involves extracting data from a source system, developing a model, testing the model against a different test set of data, fine tuning the model, testing again, etc.  The faster each of those steps can be, the more accurate the data scientist can get the model in a given amount of time. Take the data scientist out of the loop, as many new machine learning applications do now, and you get similar benefits from algorithms that can run and improve more quickly.
  • Your company wants to make its actions, like sales and marketing activities, more accurate (and valuable), which may cause you to add even more data sources to your processes. Initiatives like 360° Customer Views attempt to pull together multiple disparate data sources, each of which may add a little data that can improve the resolution of your company’s view of the customer, which then makes it possible to make better, more timely promotions to the customer or to improve the customer’s satisfaction when they have a problem they need resolved.  Adding new data sources means more data coming at you that you need to deal with in the same amount of time, increasing the speed of data ingestion required. 

What is driving this “need for speed”?

The World is going “Ops”

The data management world is in a transition due to velocity. Companies are driven to improve their top lines and bottom lines – increasing revenues and decreasing costs – which implies increasing efficiencies. Time is a critical factor in efficiency (do more or do better with fewer resources in less time). Following on, information technology (IT) groups are driven to improve the time to value and the return on investment of their projects, to match similar efforts of other parts of the company, like engineering/development. It isn’t sufficient for a company to develop new applications in the vacuum of a pristine, fully controllable and never failing development environment and then throw it over the wall to the production people in IT to make it work in the real world (which is not fully controllable and never failing).

DevOps, MLOps, AIOps, DataOps. Ops, of course, stands for operations. Development Operations (DevOps) strives to have all code and applications being developed be able to easily and seamlessly transition from development and test into production. Agile development attempts to break down the long, slow waterfall development process into much more manageable modular function blocks that can be worked on and deployed independently, without the need for long upgrade processes and downtime. DevOps is so popular that it has spawned different Ops variants across the spectrum of IT and data management functions, broadly ITOps and DataOps but more specifically as things like AIOps and MLOps. 

The Cloud

The Cloud is a contributing factor for operationalizing IT. Applications deployed in the cloud need to stay up and running, even when new functionality is added or bugs are fixed. Maybe more importantly, deployment infrastructures like Kubernetes are used to let these applications scale up and down seamlessly, so IT teams need to make sure that all of their applications are “cloud-native”, and can be properly managed by automatic systems like Kubernetes. 

Result: The Rise of Real-Time

Acting in real-time means acting in the moment, reacting to newly created data (fresh data) as soon as possible. In order to achieve it, we need to examine every source of latency in our current systems.  We have been writing much about real-time because we think we offer a solution that enables true real-time operations but we see many companies still stuck at “near real-time” since that is the best that past technologies had been capable of. 

What can you do?

Simplify your solutions

Complex IT architectures can hide latency. Each interface, each dependency on a previous component adds a potential time sink. Reexamine the overall architecture to determine if there are any ways to simplify or streamline it. Maybe one of the components has added new capabilities or functionality since the system was originally designed, making one of the other components now redundant so it might be eliminated, simplifying the solution, eliminating interfaces, reducing latency and thus, improving velocity.

Reexamine and challenge your assumptions, “standards”, and defaults

Sometimes we do what we’ve always done, use what we’ve always used. Companies sometimes have standard platforms or tools that their development teams “must” use. There are good reasons for standards, like making sure platforms and tools are reliable, scalable, and secure through rigorous certification processes. But those standards can put severe limitations on the capabilities of new systems. If you assume you need to make use of your company’s standard relational database system, your solution will look very different than if you could use a non-relational database or a message queue.

If velocity remains a challenge for your company, reexamine your existing solutions. Are you still using the fastest, most efficient data pipeline? Are all of the components being used still the fastest in their categories? Or maybe they are fast enough?  Build in periodic system reviews so these kinds of assessments can be made. There are other factors that are just as important as speed (like reliability, for example), so the fastest isn’t always the best, but it is good to be aware of possible improvements for performance.

Technology changes quickly and our aversion to changing our beloved system designs can blind us to the availability of better, faster, more efficient components. On the other hand, it would be better to be able to use as much of what is in place now as possible, tweaking rather than replacing. Ripping and replacing is fraught with risk and is often not necessary – your current production system is working (I assume), so it is usually better to figure out how you can make it better rather than implementing a brand new, unproven, system.

Think in terms of “Best If Used By” for your new data

And finally, keep in mind that new “fresh” data gets stale very quickly – the value of that data drops dramatically minutes, seconds, or sometimes even milliseconds after it is first produced. After that, it just gets added to all of the other old data in massive collections  stored by companies that, most likely, will never be used.

In the realm of real-time data, you “use it or lose it” when it comes to the opportunity to make the right offer to the right person at the right time. That fresh new data is what tells you and your system when the time is right but it is a fleeting moment that you need to be able to react too quickly. Don’t squander the opportunity — make sure your systems can support acting on real-time data. Seeing how much data is being wasted (not being acted upon) and how much value is being lost might help you to justify new systems that can overcome your company’s data velocity challenges.

Here’s some other velocity and real-time content we’ve created recently that you may find interesting:

It does not make sense to drive your real-time operations with your database. It does not compute, and by that, I mean your database lacks powerful computational capabilities (see my earlier blog on how databases are really just about reads and writes). And thus, it cannot really do much with your real-time data other than store it and retrieve it when asked.

A 2022 report on the data-driven enterprise by McKinsey & Company asserts the real-time processing and delivery of data as a key characteristic of companies that gain the most value from data-focused initiatives. It also called out the “high computational demands” of real-time processing as one inhibitor to the implementation of real-time use cases. To address those demands, businesses must choose the right technologies that provide the computational power to enable real-time advantages. The right technologies include not only hardware and cloud instances, but also newer classes of software that are designed for real-time applications.

Near Real-Time? Good Enough Real-Time?

You might argue that your database already gives you real-time capabilities, especially for end-user analytics. You take your real-time data feeds, transform them into an optimized format for end-user querying, and then write them to a database. When end users run queries, they get nearly instantaneous results, and you consider this sufficient for your real-time operations. That’s good enough, right?

It’s good, but not good enough for a true real-time enterprise. The limitation here is that the focus is getting the queries to run as quickly as possible. Database vendors tell you that’s all you need today, which is the same thing they told you in the past. The problem is that there still is a dependency on writing to the database first, along with the slow human intervention in the critical path for identifying actionable insights. This store-then-analyze paradigm precludes the ability to serve real-time use cases that instantly drive the “action” in “actionable insights.” Databases simply don’t give you a framework for writing applications that handle the computationally intensive work inherent in real-time data processing systems. Therefore, you miss the advantage of immediately responding to business activities and events.

Take Immediate Action

Real-time enterprises place heavy emphasis on recognizing time-sensitive opportunities and responding to them right now. They recognize that data has much more value “in the moment.” Delays result in lost value, especially when storing data that captures specific business events for later analysis. But to be clear, these enterprises also recognize the collective value of historical data, so they don’t see historical data analysis as a bad thing, but rather, as one component of their data strategy. They gain their significant advantage by responding to data when it’s most valuable, well before it gets relegated to the “historical” bin.

The process of hot data turning cold.
Hot Data Will Turn Cold. Take Action While the Data Is Most Valuable.

Think about real-time use cases that require immediate action. Fraud detection is a popular example that most of us understand because we know we want to take action, i.e., stop the fraud, as the transaction is taking place. We’d rather flag a transaction as potentially fraudulent now, than to figure it out later and potentially even miss it. Fraud detection is a specific example of the broader practice of anomaly detection, where an outlier event is worthy of scrutiny because it may represent some undesirable activity or status that needs immediate investigation. Patient monitoring, predictive maintenance, and quality assurance testing are just a few examples that leverage anomaly detection through data analysis.

Another popular example of a real-time use case is real-time recommendations, where you make highly relevant recommendations to your customers as they are interacting with your business, not after they’ve left. Up-sell and cross-sell opportunities are especially valuable to customers while they are shopping and are especially effective when the promotions are related to the items the shopper is already prepared to purchase. You cannot accurately predict the night before what customers will put into their shopping carts, so you need real-time.

Also, many real-time applications can leverage machine learning algorithms, and if you are asking your computers to do the fast thinking for you, wouldn’t it make sense for them to do the thinking now versus later? By sending real-time data through machine learning algorithms, you can get instant feedback on hidden insights in your data such as unusual characteristics of a given transaction, customer interactions that suggest future churn, subtle signs of impending equipment failure, etc. These are just examples of insights that you want to uncover as soon as possible so you can get alerts or take automated action so that you don’t miss the opportunity (or the potential failure).

Stream Processing on Apache Kafka Enables Real-Time

If you are using Apache Kafka, Apache Pulsar, AWS Kinesis, or any message bus for storing event data, then you likely are taking advantage of real-time data. But if you are merely turning that real-time data (i.e., “data in motion”) into data at rest by first storing it in a database, then you are adding an unnecessary delay while also relying on a batch-oriented paradigm to process your data. Specifically, your database applications will have to continually and inefficiently poll your database to try to uncover the real-time actionable insights buried in your data. Anyone who has tried this type of continual polling knows that it is a very clunky way to analyze data, and it is not very real-time.

The alternative is to leverage a stream processing engine to read event data sets as they are created and then immediately identify the trends, patterns, or anomalies that are otherwise difficult to uncover in databases. Stream processing engines are designed to work with data as a flow, so you can write applications that take immediate action on data. Don’t rely on your database as the foundation of a real-time strategy—explore the technologies that were built explicitly for getting the most out of real-time data.

If the idea of leveraging stream processing to handle your real-time data sounds like a good idea, take a look at the Hazelcast Platform, a real-time stream processing platform that offers the simplicity and performance (among other advantages) you seek to gain a true real-time advantage. “How simple” and “how fast,” you ask? With our free offerings, you can see for yourself. Check out our software, our cloud-managed service, or even just contact us so we can discuss the implementation challenges you face and how we can help you overcome them.

 

In the previous blog post, we introduced Compact Serialization and showed how to use it.

In this post, we will examine the details of the Compact Serialization to give you a better idea of how things are working “under the hood.”

We will start by showing how the Compact Serialized objects are laid out in the binary, talk about the schemas, and how Hazelcast uses them to read or query data, as well as support class evolution. We will also briefly show how Hazelcast replicates the schemas across the cluster and talk about the zero configuration.

Binary Representation

Layout of Compact serialized objects

As the image above shows, the binary representation of the Compact Serialized objects consists of three sections. Note, in the above image, the dashed borders represent that these are optional and might not be in the binary representation depending on the schema.

The endianness of the data is configurable in client or member, and they must be consistent.

Header Section

This section consists of two numbers back to back.

Schema id is an 8-byte number that uniquely identifies the schema. It is used to associate the data with the actual schema so that we can avoid duplicating it in every serialized object but still know the structure of the binary data.

Data length is a 4-byte optional number, representing the data section size in bytes. Suppose the schema does not have any variable-size fields. In that case, this number is excluded from the binary form, as the length of the data section can be determined by the length of the fixed-size data, which is a piece of information available in the schema. It is used to determine where the offsets section starts.

Data Section

This section is where the actual data sits in the binary representation. It consists of two subsections, both of which are optional depending on whether at least one fixed-size or variable-size field exists in the schema.

Fixed-size fields subsection contains the data of the fixed-size fields listed in the previous blog post, sorted in descending order of the number of bytes occupied by the fields, and the ascending order of the alphabetical names of the fields, in case of ties. A special optimization is made for boolean fields, where we pack 8 boolean fields into 1 byte, spending only a single bit per boolean field.

The fixed-size fields cannot be null, hence they are always in the binary data.

Variable-size fields subsection contains the data of the variable-size fields, sorted in the ascending order of the alphabetical names of the fields. A special optimization is made for the array of boolean fields, where we pack 8 boolean items into a single item of size 1 byte, spending only a single bit per boolean item.

Arrays of fixed-size fields are serialized as shown below, where the number of items is a 4 bytes integer, and each item occupies a fixed number of bytes, according to its type.

Layout of array of fixed size items

Arrays of variable-size fields are serialized as shown below, where the data length and the number of items are two 4 bytes integers describing the length of the data section and the number of items in the array respectively. The offsets section of the array has the same properties as the offsets section described below for the Compact Serialized objects.

Layout of array of var size items

The variable-size fields can be null, hence they can be missing in the binary data.

Offsets Section

As the size of the variable-size fields cannot be known beforehand, there has to be an extra level of indirection to determine the actual offsets of the fields that rest in the binary data.

The offsets section consists of an offset for each variable-size field. The orders of offsets for fields are determined from the schema, according to the ascending alphabetical order of the names of the fields.

For variable-size fields that are null, a special value, -1, is used as the offset.

The size of the offsets can be either 1, 2, or 4 bytes, depending on the size of the data section. This is an optimization to avoid wasting more space for offsets than necessary when the actual data fits into 254 bytes (1 byte offsets) or 65534 bytes (2 byte offsets).

This section is also optional and is only present in the binary data if there is at least one variable-size field in the schema.

Schema

Schema is the central structure of the Compact Serialization, which describes the binary data. It is used in the serialization and deserialization processes to put the data into the correct places.

A schema consists of two pieces: the schema id and the fields.

The schema id is calculated from the fields and the type name was given by the user. So, the type name, number of fields, names of the fields, and types of fields identify the schema. When any one of them changes slightly, so does the schema id. It is a unique fingerprint of the user classes or types.

The id is calculated from the information described above using the Rabin Fingerprint algorithm. We use 64 bits long fingerprints, which should result in no collision for schema caches up to a million entries, far beyond the number of schemas expected to be available in a running Hazelcast system.

The other important piece of the schema is the fields, which consist of the following information:

  • Name
  • Type
  • Offset (for fixed-size fields)
  • Offset index (for variable-size fields)
  • Bit offset (for boolean fields)

With that information, the schema can be considered a lookup table and used to deserialize individual fields.

Let’s see the schema in action to read a fixed-size and variable-size field from a schema consisting of the following fields:

Original offsets of the fields

Let’s start with reading a fixed-size field, “id”. First, we need to consult the schema to learn the offset that the data for that field begins. We can do that by asking for the fixed-size field offset with the field name and the type. The schema can return the offset as long as a field with that name and the type exists in itself. In this case, such a field exists, and it starts at the offset “0”. After getting that information from the schema, we can go to that offset, read 4 bytes, and reinterpret it as int32.

As seen from the image below, we can read fixed-size fields with just one lookup, which is another improvement over the Portable serialization.

Reading fixed size fields

We will not be showing the deserialization procedure for the boolean fields not to repeat ourselves. Still, it is very similar to reading other fixed-size fields: We first ask the schema for offset and bit offset, read a single byte at the given offset from the fixed-size fields section, and mask and return the bit given at the returned bit offset.

Reading variable-size fields involves an extra lookup because we can’t pinpoint the locations on them from the schema, as they differ for each binary data. For some data, a variable-size field might take 10 bytes, and for another, it might take 100 bytes. Due to that, we always encode the offsets of the variable-size fields into the binary itself in the offsets section described above. We again consult the schema to learn the index where the offset for the given field resides in the offsets section. For the string field with the “name”, the schema returns the offset index “1”. With that information, we can jump to the offsets section, and read the offset at the given index. We know exactly where the index sits in the data because we know how many bytes a single offset index occupies from the data size. After reading the actual offset, we can read the data as a string. This mechanism is the same as the Portable serialization, but we optimize the offset sizes in the data depending on the data size.

Reading var size fields

Schema Evolution

It is expected for the user classes or types to evolve as time passes. New fields might be added, some fields might be removed, or the types of fields might change. The Compact Serialization format supports all those use cases by design.

As we stated above, the schema id is calculated from the type name given by the user and the fields. If the fields change in any way by adding or removing fields, the schema id changes along with it. That results in a completely new schema tied to earlier versions of the schemas of the user classes only by the type name. There is no relationship between the old and new schemas in any other way.

Hazelcast clients and members implement ways to store and retrieve all versions of the schemas in a running cluster. Therefore, any deserializer can access all the schemas available in the cluster and use them.

When Hazelcast is confronted with the data, it first checks the schema id and finds a schema associated with it.

After finding the schema, the deserializers do nothing special. They again consult the schema for the offsets, offset indexes, or bit offsets of the fields while deserializing the data. Using the schema as the source of truth allows the deserializers to dynamically use different schemas to read evolved versions of the data by changing nothing on their side.

In the above section, we showed how we use the schemas to get the necessary information to read the actual data. From the point of view of the deserializer, using schema#0 or schema#1 makes no difference at all. As long as there is a field with that name and the type, the new or the old schema will return the correct offset, offset index, or bit offset for that field.

For example, in the next version of our application, assume that we have added integer “age” and string “department” fields to the schema; there would be no need for any change in the old applications. When the old applications asked for the offset of the “id” field, they were reading the 4 bytes starting at the offset “0”, because the schema told them so. Now, they would be reading the 4 bytes starting at the offset “1”, again because the new schema told them so. The same thing holds true for the “name” field. They were reading the actual offset from the offset index “1”, but now they will use the index “2”  for the new data and still be able to read the data written by the new application without any change. For the newly added fields, an old application will ignore them, as it was not aware of them at the beginning.

New offsets after a few fields are added

Now, what happens when I try to read the data written by the old applications with the new application? The answer is that you can keep reading the existing fields on the old schema. For the newly added fields, you have to use the APIs shown in the previous blog post to conditionally skip reading the fields because there won’t be such fields in the old schemas.

Partial Deserialization

Hazelcast is a real-time data platform. Apart from storing the data, doing queries, aggregations, and indexing operations on the data is quite a common practice.

However, most of the time, we would only need part of the data and don’t necessarily need to deserialize the whole of it. Deserialization is quite costly, so avoiding any part of it would be a win for us. That’s why the new Compact Serialization format supports partial deserialization.

For example, when an index is added to Hazelcast for the “department” field shown above, it will only deserialize that string field while updating the indexes.

The benefit of partial deserialization is quite noticeable when one application switches from IdentifiedDataSerializable to Compact Serialization.

Schema Replication

As shown above, the schema and the data are now separate entities. However, Hazelcast nodes need to know the schema to be able to work on the data associated with it. Without a schema, data is just a regular byte array; nothing can be known about it.

That’s why we have to ensure that any schema associated with the data about to be put into Hazelcast must be replicated across the Hazelcast cluster.

There are a few things about the schemas that make the replication easier and do not need any consensus protocol.

Schemas are unique. There is only a single schema for a given schema id. There is no relationship between different “versions” of the schemas. The order that the schemas are delivered to the system is not important. In short, schema delivery is a commutative, associative, and idempotent operation.

Hazelcast achieves reliable replication of the schemas to the cluster by making sure that:

  • Before sending any data associated with a schema, the clients must make sure that the schema is replicated across the cluster by waiting on acknowledgment from one of the cluster members once for any schema.
  • For any schema to be replicated across the cluster that is not known to be replicated yet, the cluster members initiate the replication procedure. The replication procedure resembles the two-phase commit protocol and ensures that the schema is available in every cluster node before declaring it replicated.

Hazelcast also supports schema persistence and replication over WAN so that the Compact Serialization can be used with HotRestart persistence and WAN replication.

Note that it is not possible to delete schemas after they are replicated right now. There are plans to integrate schema management capabilities into the Management Center in the future.

Zero Configuration

As described in the previous blog post, it is possible to use Compact Serialization without doing any configuration at all, currently only in Java and C# clients.

Under the hood, Hazelcast creates a serializer in runtime for a class the first time it is being serialized. It uses reflection to determine the fields of a class, both the types and names, and registers a serializer that uses reflection to read and write fields.

The type name used for this runtime serializer is selected as the fully qualified class name.

The supported field types differ from language to language, but Hazelcast supports all the types offered in the public APIs of the CompactReader/Writer.

On top of that, it supports some extra types for different languages to make it more usable for a wide range of classes. Please take a look at the reference manual for the exact list of field types supported for different languages.

Since the Zero-Config Serializers use reflection to read and write fields, it has some overhead compared to explicit Compact serializers. We recommend using explicit serializers in production for performance-critical applications.

Conclusion

That was all we wanted to talk about. It is our belief that the new Compact Serialization We hope that the new Compact Serialization will make the improve the performance and overall experience of the serialization within Hazelcast. We are looking forward to your feedback and comments about the new format. Don’t hesitate to try it and share your experience with us in our community Slack or Github repository.

Hazelcast offers various serialization mechanisms to convert user classes into a series of bytes, which Hazelcast can understand.

With those in-house serialization mechanisms, Hazelcast can run queries, perform indexing, run user code, and many more, on top of the data stored in its distributed data structures more efficiently than the user-provided custom serialization.

However, those serialization mechanisms had some advantages and disadvantages. Some were optimized for the binary size (IdentifiedDataSerializable), some for query speed (Portable), and some for ease of use (HazelcastJsonValue).

With those in mind, we wanted to create a new serialization mechanism that would combine all the good things offered by the existing Hazelcast serialization mechanisms and would be:

  • As compact as possible in the serialized form so that less data is transferred over the network and stored in the memory.
  • Efficient for all sorts of queries and indexing
  • Easy to use

This post will introduce the new format and show how to use it.

Overview of Compact Serialization

The core idea behind the new format is to have a schema that describes the serialized data and not to duplicate it with each serialized data. With this format, serialized data only carries an 8-byte long fingerprint that uniquely identifies the schema. The schema and the data are separate entities connected with that fingerprint. Introducing Compact Serialization.

Compact is a massive improvement over Portable Serialization, as each portable binary carries the “schema.” That means, depending on the schema and the data, one can save up to 40% of the memory occupied in Hazelcast or the number of bytes transferred over the network for the data just by switching to the new Compact Serialization format for some workloads.

Similar to Portable, having a schema that describes the data allows Hazelcast only to deserialize fields used in the queries or indexing operations, resulting in massive performance gains compared to formats like IdentifiedDataSerializable, which does not have this feature.

Having a schema also allows you to evolve it by adding or removing fields and still be able to read and write data between old and new applications.

The new format currently supports the following types, which can be used as building blocks to serialize all sorts of data:

  • Fixed-size types like boolean, int8, int16, int32, int64, float32, float64
  • Nullable versions of the fixed-size types
  • Variable-size types like string, decimal, time, date, timestamp, timestamp with timezone
  • Arrays of the types listed above
  • Nested Compact serializable objects containing the types listed above, and arrays of them

With the exception of Fixed-size types, all others are considered variable-sized.

Also, the new format is entirely language-independent and can be used with different client languages, while working on the same data.

API

Let’s see how you can use Compact Serialization for your classes.

We will use Java for the code snippets below for demonstration purposes. We tried to mimic the public APIs across all the client languages, so it should be trivial to port these examples to other languages.

Assuming you have the following class:

public class Employee {

   private final int id;
   private final String name;
   private final boolean isActive;
   private final LocalDate hiringDate;


   public Employee(int id, String name, boolean isActive, LocalDate hiringDate) {
       this.id = id;
       this.name = name;
       this.isActive = isActive;
       this.hiringDate = hiringDate;
   }

   public int getId() {
       return id;
   }

   public String getName() {
       return name;
   }

   public boolean isActive() {
       return isActive;
   }

   public LocalDate getHiringDate() {
       return hiringDate;
   }
}

You can write a serializer for it without touching a single line of the class above. The Compact Serialization APIs do not require your classes to implement special interfaces like Portable or IdentifiedDataSerializable, allowing you to use existing legacy classes in your application.

The serializer, not the class, must implement the CompactSerializer interface, as shown below.

public class EmployeeSerializer implements CompactSerializer<Employee> {
   @Override
   public Employee read(CompactReader reader) {
       int id = reader.readInt32("id");
       String name = reader.readString("name");
       boolean isActive = reader.readBoolean("isActive");
       LocalDate hiringDate = reader.readDate("hiringDate");
       return new Employee(id, name, isActive, hiringDate);
   }

   @Override
   public void write(CompactWriter writer, Employee employee) {
       writer.writeInt32("id", employee.getId());
       writer.writeString("name", employee.getName());
       writer.writeBoolean("isActive", employee.isActive());
       writer.writeDate("hiringDate", employee.getHiringDate());
   }

   @Override
   public String getTypeName() {
       return "employee";
   }

   @Override
   public Class<Employee> getCompactClass() {
       return Employee.class;
   }
}

The serializer consists of four parts, and we will go over them one by one.

The read method is the one Hazelcast calls while deserializing the streams of bytes to your classes. In this method, you are expected to read the fields of your objects with the field names and types you have used in the write method and return an instance of your class with those fields. You don’t deal with the schemas in the public API, Hazelcast takes care of the logic, and all you do is provide the fields you want to read.

In the write method, you write the fields of your object to a writer, with the types and field names according to your class. The field names provided to the writer methods don’t necessarily match with the actual field names, but it is an excellent practice to be consistent. Hazelcast will only use the field names provided to writer methods while referencing those fields. Also, this method is used to generate a schema for your class.

The getTypeName method returns the unique type name in the cluster provided by you. The type name is the piece of information used to match the serializers written in different client languages and evolved versions of the serializers written in the same language. It is part of the schema Hazelcast automatically generates out of your serializer.

The getCompactClass returns the class you have written the serializer for so that Hazelcast can map instances of this class to the serializer.

After writing the serializer, you must register it to your configuration. We will be showing the programmatic way to configure the serializer, but the declarative configuration is also very similar.

After the registration, you can start using instances of your class in everything supported by Hazelcast.

Config config = new Config();
config.getSerializationConfig()
        .getCompactSerializationConfig()
        .addSerializer(new EmployeeSerializer());

HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
IMap<Integer, Employee> employees = instance.getMap("employees");
employees.put(1, new Employee(1, "John Doe", true, LocalDate.of(2022, 1, 1)));

Employee employee = employees.get(1);

In the upcoming months, we will also be working on a code generator tool to automatically generate serializers in different languages using schema files to make writing serializers easier.

Let us also show how to achieve schema evolution support with the Compact Serialization APIs.

Imagine that we have added two new fields: a department string and an age byte.

private final String department;
private final byte age;

The only thing we need to do to support these fields is to change the read and write methods.

We need to update the write method with those new fields and write them with the correct types and field names.

@Override
public void write(CompactWriter writer, Employee employee) {
  writer.writeInt32("id", employee.getId());
  writer.writeString("name", employee.getName());
  writer.writeBoolean("isActive", employee.isActive());
  writer.writeDate("hiringDate", employee.getHiringDate());
  writer.writeString("department", employee.getDepartment());
  writer.writeInt8("age", employee.getAge());
}

The read method requires a bit more care. We now need to conditionally read the new fields. That’s why the Compact reader provides an API to check the existence of a field, using its name and the field kind. With the help of it, we can conditionally read the data from the reader or use an appropriate default value.

@Override
public Employee read(CompactReader reader) {
  int id = reader.readInt32("id");
  String name = reader.readString("name");
  boolean isActive = reader.readBoolean("isActive");
  LocalDate hiringDate = reader.readDate("hiringDate");

  String department;
  if (reader.getFieldKind("department") == FieldKind.STRING) {
      department = reader.readString("department");
  } else {
      department = "N/A";
  }

  byte age;
  if (reader.getFieldKind("age") == FieldKind.INT8) {
      age = reader.readInt8("age");
  } else {
      age = -1;
  }

  return new Employee(id, name, isActive, hiringDate, department, age);
}

There is no explicit versioning on the serializer or the class. Hazelcast automatically knows this is a different version of the same class (through the type name), and assigns a new schema and a schema id to it.

Zero-Config

One of the core requirements of Compact Serialization was the ease of use.

We tried to achieve that by requiring no changes to the user classes and simplifying schema evolution and version.

We went even further for some of the languages. We planned to provide a Compact Serializer for users, even if the user provides no configuration or a serializer for Java and C#. As for our other clients, it might be hard to extract the correct type of information out of the instances on runtime.

Previously, when Hazelcast can’t find a serializer for a type, we were throwing HazelcastSerializationException, saying that the type of the given object cannot be serialized, as there are no applicable serializers for it.

Now, we try one last time to create a serializer out of your class, using different mechanisms in different languages. For Java, we are relying on reflection.

If the fields of your object are of types supported by the zero-config serializers, we now use the reflective Compact Serializer, automatically generated on the fly.

For Java, we even support Java records, so you can serialize/deserialize them without requiring a serializer!

Imagine you have the above Employee class or the following Employee record:

public record Employee(
      int id,
      String name,
      boolean isActive,
      LocalDate hiringDate) {
}

You can use it, with zero configuration!

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer, Employee> employees = instance.getMap("employees");
employees.put(1, new Employee(1, "John Doe", true, LocalDate.of(2022, 1, 1)));

Employee employee = employees.get(1);

Conclusion

We have designed the Compact Serialization format considering the majority of Hazelcast users that have long-living clients/members and use Hazelcast as a data platform where you manipulate or query the data in different ways. Considering this use case, we can say that Compact Serialization is your best bet for the serialization mechanisms in the Hazelcast. The only cost the Compact format brings with it is the cost of replicating the schemas and making sure that the schemas are replicated in the cluster for clients, which happens only once per schema. We believe that these operations are amortized in the long run, and the benefits it brings to the table with smaller data sizes and fast queries are worth it.

We are really looking forward to your feedback and comments about the new format. Don’t hesitate to try the new format and share your experience with us in our community Slack or Github repository.

We are pleased to announce the following recent releases below. Please join us in the Hazelcast Community on Slack to raise questions, share tips and provide feedback ♥. Also, check the community page for other happenings, including talks. Happy Hazelcasting! 🙂

Hazelcast Platform 5.2.0

New Features

  • SQL stream-to-stream joins: You can now enrich one stream’s data with another’s data by joining them together.
  • Generic MapStore (BETA): You no longer need to write Java code to get data from an external data store, such as a relational database, into Hazelcast by implementing the MapStore or MapLoader interfaces.
  • JDBC Connector (BETA): SQL can now connect to and query any database that supports the JDBC interface.
  • Experimental User-Defined Types: Query nested objects within Java, compact, and portable mappings are now supported using the User-Defined Types (UDTs).
  • CP Leadership Priority: CP member leadership can now be transferred to another member to ensure the availability of the CP subsystem.

Links

Bonus 🙂

Hazelcast Platform 5.2 – Show Me the Code!

The Hazelcast Platform Rocketing to the Next Level

Hazelcast Management Center 5.2.0

New Features

  • Added the following for SQL Browser:
    • separate workflows for batch and streaming queries.
    • editor autocomplete support.
    • connector wizard for Kafka connector.
    • connector wizard for File connector.
  • Introduced customizable tables in terms of selected columns and order.
  • Introduced cluster connection diagnosis functionality
  • Added statistics table for map indexes.
  • Added a chart for Tiered Storage usage monitoring.
  • Added support for Command-Line Client (CLC).
  • Added health checks for the number of connected clients and Tiered Storage metrics and configuration.

Links

Hazelcast Go Client 1.3.1

This release of the Go client fixes various issues, including portable array serializers for some types being different from the other Platform clients.

Hazelcast Platform 5.1.4