Introducing Hazelcast Scala API with Aggregations
A pre-release alpha version of the Scala API for Hazelcast has been released and is available on Github.
It’s a “soft” API, i.e. it expands the Java API rather than replace it. The Scala API also adds built-in distributed aggregations, and IMap
join capability.
The Scala API targets version 3.6 of Hazelcast.
Key features
The following is a breakdown of some of the key features of the Scala API
Configuration
Properties
Given a configuration instance, Config
or ClientConfig
, you can set property values in a type-safe and unit agnostic manner.
E.g. in Java, to set the maximum wait time for graceful shutdown, you’d have to know what unit is expected. This is not necessarily hard, but does require either remembering or looking into the documention or code:
conf.setProperty("hazelcast.graceful.shutdown.max.wait", "300") // <- seconds
In Scala, using import concurrent.duration._
, you provide the unit instead:
conf.setGracefulShutdownMaxWait(5.minutes)
Likewise for setting socket receive buffer size, in Java:
conf.setProperty("hazelcast.socket.receive.buffer.size", "64") // <- kilobytes
And Scala:
conf.setSocketReceiveBufferSize(64.kilobytes)
Serialization
The Scala API provides a number of serializers for commonly used Scala classes, which can be registered for better performance:
import com.hazelcast.Scala._ serialization.DefaultSerializers.register(conf.getSerializationConfig)
In an earlier blog post I described how a Java enum
can be used as a container for the serialization type classes ByteArraySerializer
and StreamSerializer
. The Java implementation requires some boilerplate that we can avoid in Scala and the same pattern can be provided as a default implementation. Given a Person
class:
package foo.domain import java.time.LocalDate case class Person(name: String, birthday: LocalDate)
And serializer:
package foo.serialization import com.hazelcast.Scala.serialization._ object MySerializers extends SerializerEnum { val PersonSer = new StreamSerializer[Person] { def write(out: ObjectDataOutput, person: Person): Unit = { out.writeUTF(person.name) out.writeShort(person.birthday.getYear) out.writeByte(person.birthday.getMonthValue) out.writeByte(person.birthday.getDayOfMonth) } def read(inp: ObjectDataInput): Person = { val name = inp.readUTF() val year = inp.readShort() val month = inp.readByte() val day = inp.readByte() Person(name, LocalDate.of(year, month, day)); } } }
We can chain serializer containers (SerializerEnum
objects), to logically separate serializers, while maintaing serializer id sequencing, by indirect extension:
object OtherSerializers extends SerializerEnum(MySerializers) { // More serializers... }
This ensures that all serializers have a unique, monotonically increasing, serializer id.
New instance
In Scala, you get the HazelcastInstance
directly from the configuration object:
val member = conf.newInstance() // or ClientConfig: val client = clientConf.newClient()
Non-blocking operations
The Scala API puts all non-blocking operations into a separate interface, which is available by calling .async
. All methods return scala.concurrent.Future
s and is adaptations of the async methods on the Java API, as well as some convenience methods for performing fast in-place delta updates, e.g. for IMap
these are available:
def upsertAndGet(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[V] def updateAndGet(key: K)(updateIfPresent: V => V): Future[Option[V]] def updateAndGetIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Option[V]] def upsert(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[UpsertResult] def update(key: K)(updateIfPresent: V => V): Future[Boolean] def updateIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Boolean] def getAndUpsert(key: K, insertIfMissing: V)(updateIfPresent: V => V): Future[Option[V]] def getAndUpdate(key: K)(updateIfPresent: V => V): Future[Option[V]] def getAndUpdateIf(cond: V => Boolean, key: K)(updateIfPresent: V => V): Future[Option[(V, Boolean)]]
This makes it syntactically easy to e.g. implement the equivalent of an AtomicDouble
:
val doubles = hz.getMap[String, Double]("atomic-doubles") val delta = 1.3333 // Insert value if "gauge" unknown, otherwise add value: val gaugeValue: Future[Double] = doubles.upsertAndGet("gauge", delta)(_ + delta) // Print updated value: gaugeValue.foreach(println)
Distributed computing
The Scala API for Hazelcast makes it easy to do distributed compute queries and aggregations, using the familiar syntax from the Scala collections.
It’s currently only implemented for IMap
, but may be provided for MultiMap
and List
/Set
in the future.
Weather
Given this trait:
import java.time._ import concurrent.Future trait Stats { /** * Calculate the mean max temperature, per month, for * a given year. * @param forYear The year */ def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]] }
Let’s say we’d like to print the mean daily max by month, for Milan of previous year:
val milanStats: Stats = ??? val lastYearMonthlyMeanMax = milanStats.monthlyMaxMeanTemp(Year.now minusYears 1) lastYearMonthlyMeanMax.foreach { byMonth => byMonth.toSeq.sortBy(_._1).foreach { case (month, meanMax) => println(s"$month: Mean max: $meanMax C") } }
We can store weather stats in one or more Hazelcast IMap
s, using this class to hold the values:
case class WeatherStats( minTempC: Float, maxTempC: Float, rainMm: Float, dewPointC: Float, humidity: Float )
If each region holds daily weather stats, we can provide a Hazelcast implementation of Stats
, like this:
class HzStats(hz: HazelcastInstance, region: String)(implicit ec: ExecutionContext) extends Stats { private val weather = hz.getMap[LocalDate, WeatherStats](region) def monthlyMaxMeanTemp(forYear: Year): Future[Map[Month, Float]] = { val begin = forYear.atMonth(Month.JANUARY).atDay(1) val end = forYear.atMonth(Month.DECEMBER).atEndOfMonth val weatherForYear = weather.filter(where.key.between(begin, end)) val maxTempsByMonth = weatherForYear.groupBy( e => Month.from(e.key), // Group key: Month _.value.maxTempC // Group values: maxTempC ) val meanMaxTempByMonth = maxTempsByMonth.mean() // Get the average per group meanMaxTempByMonth.map(_.toMap) } }
We can then instantiate milanStats
:
val hz: HazelcastInstance = ??? val milanStats: Stats = new HzStats(hz, "milan")
The mean
calculation one of many built-in aggregations provided by hazelcast-scala. More will presumably be added in the future and submissions are welcome, provided they are of general use.
Built-in aggregations
The following aggregations are currently built in:
All types:
distinct()
distribution()
count()
mode()
Ordering
types:
max()
min()
minMax()
medianValues()
Numeric
types:
sum()
product()
mean()
range()
median()
variance()
Custom aggregations
Custom aggregations can be provided by calling the aggregate
method that is syntactically close, and semantically identical to the Scala collections aggregate
method. A more fine grained control can be had by implementing the Aggregator
trait and calling .submit()
on the DDS
, the Distributed Data Set, which is provided as an implicit conversion from IMap
.
Performance
Aggregations are executed locally on every node that holds data within the provided filter. Since Hazelcast holds data in memory, this allows for very fast and memory efficient aggregations. Informal testing reveals a performance increase of 10-40x over the existing Map/Reduce framework.
What else
For more complete and up-to-date examples, visit the Github Wiki.