Introducing Hazelcast Scala API with Aggregations

A pre-release alpha version of the Scala API for Hazelcast has been released and is available on Github.

It’s a “soft” API, i.e. it expands the Java API rather than replace it. The Scala API also adds built-in distributed aggregations, and IMap join capability.

The Scala API targets version 3.6 of Hazelcast.

Key features

The following is a breakdown of some of the key features of the Scala API

Configuration

Properties

Given a configuration instance, Config or ClientConfig, you can set property values in a type-safe and unit agnostic manner.

E.g. in Java, to set the maximum wait time for graceful shutdown, you’d have to know what unit is expected. This is not necessarily hard, but does require either remembering or looking into the documention or code:

conf.setProperty("hazelcast.graceful.shutdown.max.wait", "300") // <- seconds

In Scala, using import concurrent.duration._, you provide the unit instead:

conf.setGracefulShutdownMaxWait(5.minutes)

Likewise for setting socket receive buffer size, in Java:

conf.setProperty("hazelcast.socket.receive.buffer.size", "64") // <- kilobytes

And Scala:

conf.setSocketReceiveBufferSize(64.kilobytes)

Serialization

The Scala API provides a number of serializers for commonly used Scala classes, which can be registered for better performance:

import com.hazelcast.Scala._

serialization.DefaultSerializers.register(conf.getSerializationConfig)

In an earlier blog post I described how a Java enum can be used as a container for the serialization type classes ByteArraySerializer and StreamSerializer. The Java implementation requires some boilerplate that we can avoid in Scala and the same pattern can be provided as a default implementation. Given a Person class:

package foo.domain
    
import java.time.LocalDate
    
case class Person(name: String, birthday: LocalDate)

And serializer:

package foo.serialization
    
import com.hazelcast.Scala.serialization._
    
object MySerializers extends SerializerEnum {
    
  val PersonSer = new StreamSerializer[Person] {
    def write(out: ObjectDataOutput, person: Person): Unit = {
      out.writeUTF(person.name)
      out.writeShort(person.birthday.getYear)
      out.writeByte(person.birthday.getMonthValue)
      out.writeByte(person.birthday.getDayOfMonth)
    }
    def read(inp: ObjectDataInput): Person = {
      val name = inp.readUTF()
      val year = inp.readShort()
      val month = inp.readByte()
      val day = inp.readByte()
      Person(name, LocalDate.of(year, month, day));    
    }
  }
    
}

We can chain serializer containers (SerializerEnum objects), to logically separate serializers, while maintaing serializer id sequencing, by indirect extension:

object OtherSerializers extends SerializerEnum(MySerializers) {
  // More serializers...
}

This ensures that all serializers have a unique, monotonically increasing, serializer id.

New instance

In Scala, you get the HazelcastInstance directly from the configuration object:

val member = conf.newInstance()
// or ClientConfig:
val client = clientConf.newClient()

Non-blocking operations

The Scala API puts all non-blocking operations into a separate interface, which is available by calling .async. All methods return scala.concurrent.Futures and is adaptations of the async methods on the Java API, as well as some convenience methods for performing fast in-place delta updates, e.g. for IMap these are available:

def upsertAndGet(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[V]
def updateAndGet(key: K)(updateIfPresent: V => V): Future[Option[V]]
def updateAndGetIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Option[V]]
def upsert(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[UpsertResult]
def update(key: K)(updateIfPresent: V => V): Future[Boolean]
def updateIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Boolean]
def getAndUpsert(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[Option[V]]
def getAndUpdate(key: K)(updateIfPresent: V => V): Future[Option[V]]
def getAndUpdateIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Option[(V, Boolean)]]

This makes it syntactically easy to e.g. implement the equivalent of an AtomicDouble:

val doubles = hz.getMap[String, Double]("atomic-doubles")
val delta = 1.3333
// Insert value if "gauge" unknown, otherwise add value:
val gaugeValue: Future[Double] = doubles.upsertAndGet("gauge", delta)(_ + delta)
// Print updated value:
gaugeValue.foreach(println)

Distributed computing

The Scala API for Hazelcast makes it easy to do distributed compute queries and aggregations, using the familiar syntax from the Scala collections.

It’s currently only implemented for IMap, but may be provided for MultiMap and List/Set in the future.

Weather

Given this trait:

import java.time._
import concurrent.Future

trait Stats {
  /**
   * Calculate the mean max temperature, per month, for 
   * a given year.
   * @param forYear The year
   */
  def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]]
}

Let’s say we’d like to print the mean daily max by month, for Milan of previous year:

val milanStats: Stats = ???

val lastYearMonthlyMeanMax = milanStats.monthlyMaxMeanTemp(Year.now minusYears 1)

lastYearMonthlyMeanMax.foreach { byMonth =>
  byMonth.toSeq.sortBy(_._1).foreach {
    case (month, meanMax) => println(s"$month: Mean max: $meanMax C")
  }
}

We can store weather stats in one or more Hazelcast IMaps, using this class to hold the values:

case class WeatherStats(
  minTempC: Float,
  maxTempC: Float,
  rainMm: Float,
  dewPointC: Float,
  humidity: Float
)

If each region holds daily weather stats, we can provide a Hazelcast implementation of Stats, like this:

class HzStats(hz: HazelcastInstance, region: String)(implicit ec: ExecutionContext)
    extends Stats {

  private val weather = hz.getMap[LocalDate, WeatherStats](region)

  def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]] = {
    val begin = forYear.atMonth(Month.JANUARY).atDay(1)
    val end = forYear.atMonth(Month.DECEMBER).atEndOfMonth
    val weatherForYear = weather.filter(where.key.between(begin, end))
    val maxTempsByMonth = weatherForYear.groupBy(
      e => Month.from(e.key), // Group key: Month
      _.value.maxTempC // Group values: maxTempC
    )
    val meanMaxTempByMonth = maxTempsByMonth.mean() // Get the average per group
    meanMaxTempByMonth.map(_.toMap)
  }

}

We can then instantiate milanStats:

val hz: HazelcastInstance = ???
val milanStats: Stats = new HzStats(hz, "milan")

The mean calculation one of many built-in aggregations provided by hazelcast-scala. More will presumably be added in the future and submissions are welcome, provided they are of general use.

Built-in aggregations

The following aggregations are currently built in:

All types:
  1. distinct()
  2. distribution()
  3. count()
  4. mode()
Ordering types:
  1. max()
  2. min()
  3. minMax()
  4. medianValues()
Numeric types:
  1. sum()
  2. product()
  3. mean()
  4. range()
  5. median()
  6. variance()

Custom aggregations

Custom aggregations can be provided by calling the aggregate method that is syntactically close, and semantically identical to the Scala collections aggregate method. A more fine grained control can be had by implementing the Aggregator trait and calling .submit() on the DDS, the Distributed Data Set, which is provided as an implicit conversion from IMap.

Performance

Aggregations are executed locally on every node that holds data within the provided filter. Since Hazelcast holds data in memory, this allows for very fast and memory efficient aggregations. Informal testing reveals a performance increase of 10-40x over the existing Map/Reduce framework.

What else

For more complete and up-to-date examples, visit the Github Wiki.