Queries and Aggregations w/ Scala, part 1
In this post we’ll take a look at the built-in aggregations in the Scala API.

I was looking at publicly available data sets to run some analysis on, and a good choice seemed the Chicago Crime statistics, a data set that includes all crime cases since 2001.
I chose the CSV download, which unfortunately is not compressed and exceeds 1 GB.
After (a lengthy) zip compression, the file became a slightly more managable 311 MB, which helps when copying around on the LAN.
The code
The CSV timestamps do not include timzeone information, so first up I defined the timezone needed for parsing, in the crime.chicago
package:
package crime import java.time.ZoneId package object chicago { val TimeZone = ZoneId.of("America/Chicago") }
I then defined the class to hold each case. Since the dataset is a fairly large (6+ million cases), I kept only the fields that seemed interesting:
package crime.chicago import java.time.ZonedDateTime case class CrimeCase ( time: ZonedDateTime, block: String, primaryType: String, description: String, location: String, year: Short )
Parsing the CSV
For CSV parsing, I chose Apache Commons CSV, for no particular reason other than it seemed easy to get working:
package crime.chicago import java.io.Reader import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import scala.collection.JavaConverters._ import org.apache.commons.csv.{CSVParser, CSVFormat} object CrimeCSVParser { // Timestamp parser function: private val parseTime: (String => ZonedDateTime) = { val timestampParser = DateTimeFormatter.ofPattern("MM/dd/yyyy h:m:s a").withZone(TimeZone) str => ZonedDateTime from timestampParser.parse(str) } /** * Parse through lazy transformation. */ def parse(reader: Reader): Iterator[(Int, CrimeCase)] = { val parser = new CSVParser(reader, CSVFormat.DEFAULT) val records = parser.iterator.asScala val FieldNames = new Enumeration { private[this] val header = records.next.iterator.asScala.toIndexedSeq implicit def toId(fld: Value) = fld.id val ID = Value(header.indexOf("ID")) val Timestamp = Value(header.indexOf("Date")) val Block = Value(header.indexOf("Block")) val PrimaryType = Value(header.indexOf("Primary Type")) val Description = Value(header.indexOf("Description")) val LocationDescription = Value(header.indexOf("Location Description")) val Year = Value(header.indexOf("Year")) } records.map { rec => import FieldNames._ val id = rec.get(ID).toInt val cc = new CrimeCase( time = parseTime(rec.get(Timestamp)), block = rec.get(Block), primaryType = rec.get(PrimaryType), description = rec.get(Description), location = rec.get(LocationDescription), year = rec.get(Year).toShort) (id, cc) } } }
With most of the plumbing in place, I wrote the code for a single node:
package crime import java.io.InputStreamReader import java.util.zip.ZipFile import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import com.hazelcast.Scala._ import com.hazelcast.config.Config import com.hazelcast.core.HazelcastInstance import com.hazelcast.core.IMap import com.hazelcast.logging.ILogger object CrimeNode { def main(args: Array[String]): Unit = { val csvFile = args match { case Array(filename) => new ZipFile(filename) case _ => println("Please provide Chicago Crime zip filename") return } val hz = newInstance() implicit val logger = hz.getLoggingService.getLogger(getClass) val crimeCases = getChicagoCrimes(hz) populateMap(hz, crimeCases, csvFile) logger.info("Ready!") } def populateMap( hz: HazelcastInstance, chicagoCrimeCases: IMap[Int, chicago.CrimeCase], csvFile: ZipFile)(implicit logger: ILogger): Unit = { val zipEntry = csvFile.entries().nextElement() val zipInput = csvFile.getInputStream(zipEntry) val crimeReader = new InputStreamReader(zipInput) try { val asyncMap = chicagoCrimeCases.async val allCases = chicago.CrimeCSVParser.parse(crimeReader) val finalCount = allCases.foldLeft(0) { case (lastCount, (id, crimeCase)) => if (lastCount == 0) logger.info(s"Now processing ${csvFile.getName}") asyncMap.set(id, crimeCase).failed.foreach { e => logger.warning(s"Failed to insert case $id", e) } val thisCount = lastCount + 1 if (thisCount % 25000 == 0) logger.info(f"processed $thisCount%,d records") thisCount } logger.info(f"Done parsing CSV, total $finalCount%,d processed") } finally { crimeReader.close() } } def getChicagoCrimes(hz: HazelcastInstance) = hz.getMap[Int, chicago.CrimeCase]("chicago-crimes") private def newInstance(): HazelcastInstance = { val conf = new Config conf.getGroupConfig.setName("crime-cluster") serialization.Defaults.register(conf.getSerializationConfig) serialization.DynamicExecution.register(conf.getSerializationConfig) conf.newInstance } }
Let’s quickly go through what happens here. The main
method expects the zip file as argument, which it will then proceed to parse and add to the IMap
(using set
). For every 25,000 record, it will log progress. In the newInstance()
method, we configure and return a HazelcastInstance
, which simply is named and have the Defaults
serializers added, as well as the DynamicExecution
, which will allow us to run ad hoc queries without pre-compiling and restarting.
Running a single node
We can compile this to a JAR or simply run against the IDE classpath:
java -Xmx12G -cp $CP crime.CrimeNode ./Crimes_-_2001_to_present.zip
On my laptop, it takes slightly less than 2 minutes to parse the file.
Connecting a client
With that done, I then connected a client, through the Scala REPL (outlined here), so I can run ad hoc queries. Make sure the classpath includes all dependencies.
First, let’s setup the imports:
Welcome to Scala 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_74). Type in expressions for evaluation. Or try :help. scala> :paste // Entering paste mode (ctrl-D to finish) import com.hazelcast.Scala._, client._ import com.hazelcast.client.config._ import concurrent._, duration._, ExecutionContext.Implicits.global // Exiting paste mode, now interpreting. import com.hazelcast.Scala._ import client._ import com.hazelcast.client.config._ import concurrent._ import duration._ import ExecutionContext.Implicits.global scala>
Now, let’s configure and connect the client:
scala> :paste // Entering paste mode (ctrl-D to finish) val conf = new ClientConfig conf.getGroupConfig.setName("crime-cluster") conf.getNetworkConfig.addAddress("localhost") serialization.Defaults.register(conf.getSerializationConfig) serialization.DynamicExecution.register(conf.getSerializationConfig) val hz = conf.newClient // Exiting paste mode, now interpreting. Jun 16, 2016 11:32:49 AM com.hazelcast.core.LifecycleService INFO: HazelcastClient[hz.client_0_crime-cluster][3.6.2] is STARTING Jun 16, 2016 11:32:50 AM com.hazelcast.core.LifecycleService INFO: HazelcastClient[hz.client_0_crime-cluster][3.6.2] is STARTED Jun 16, 2016 11:32:50 AM com.hazelcast.client.spi.impl.ClientMembershipListener INFO: Members [1] { Member [192.168.1.7]:5701 } Jun 16, 2016 11:32:50 AM com.hazelcast.core.LifecycleService INFO: HazelcastClient[hz.client_0_crime-cluster][3.6.2] is CLIENT_CONNECTED conf: com.hazelcast.client.config.ClientConfig = com.hazelcast.client.config.ClientConfig@30839e44 hz: com.hazelcast.core.HazelcastInstance = com.hazelcast.client.impl.HazelcastClientInstanceImpl@5f174dd2
Client connected to the single node, and then it’s down to business.
The query and aggregation
First, get the crime case IMap
using the helper method:
scala> val crimeCases = crime.CrimeNode.getChicagoCrimes(hz) crimeCases: com.hazelcast.core.IMap[Int,crime.chicago.CrimeCase] = IMap{name='chicago-crimes'}
Then, filter out 2016, since it’s incomplete.
scala> val crimesEntries2001_2015 = crimeCases.filter(where("year") < 2016) crimes2001_2015: com.hazelcast.Scala.dds.DDS[java.util.Map.Entry[Int,crime.chicago.CrimeCase]] = com.hazelcast.Scala.dds.MapDDS@405495e7
Notice, this returns a DDS
, a Distributed Data Structure, a lazily evaluated collection type. Its content is java.util.Entry
, because that’s what the IMap
contains. However, we’re not interested in the full Entry
, only the CrimeCase
value, so let’s map
:
scala> val crimes2001_2015 = crimesEntries2001_2015.map(_.value) crimes2001_2015: com.hazelcast.Scala.dds.DDS[crime.chicago.CrimeCase] = com.hazelcast.Scala.dds.MapDDS@5ceca60
I wanted to look at the number of crimes, per year, per type, so let’s group the cases by that:
scala> val casesByTypeAndYear = crimes2001_2015.groupBy(cc => cc.primaryType -> cc.year) casesByTypeAndYear: com.hazelcast.Scala.dds.GroupDDS[(String, Short),crime.chicago.CrimeCase] = com.hazelcast.Scala.dds.MapGroupDDS@429b336b
So far, nothing has executed, we’re simply staging the map for the eventual aggregation.
With the cases grouped, we’re ready to execute the count. However, I wanted to time this, so let’s write a quick timing function:
scala> :paste // Entering paste mode (ctrl-D to finish) def timeThis[T](thunk: => Future[T]): (T, Duration) = { val start = System.currentTimeMillis val result = Await.result(thunk, 10.minutes) result -> (System.currentTimeMillis - start).milliseconds } // Exiting paste mode, now interpreting. timeThis: [T](thunk: => scala.concurrent.Future[T])(T, scala.concurrent.duration.Duration)
It’s never a good idea to block forever, so I used a 10 minute timout. Don’t worry, it shouldn’t take that long.
Ok, let’s execute, and time, the built-in count()
aggregation, which will execute on the cluster node, not the client:
scala> val (countByTypeAndYear, execTime) = timeThis { casesByTypeAndYear.count() }
Zzzzzzz…..
scala> val (countByTypeAndYear, execTime) = timeThis { casesByTypeAndYear.count() } countByTypeAndYear: scala.collection.Map[(String, Short),Int] = Map((STALKING,2009) -> 167, (MOTOR VEHICLE THEFT,2014) -> 9902, (THEFT,2015) -> 57219, (OBSCENITY,2014) -> 36, (WEAPONS VIOLATION,2008) -> 3877, (CRIMINAL TRESPASS,2008) -> 12310, (STALKING,2008) -> 190, (KIDNAPPING,2009) -> 293, (OBSCENITY,2006) -> 17, (HOMICIDE,2007) -> 448, (BATTERY,2005) -> 83964, (MOTOR VEHICLE THEFT,2008) -> 18881, (BATTERY,2010) -> 65400, (PUBLIC INDECENCY,2015) -> 14, (BURGLARY,2002) -> 25623, (WEAPONS VIOLATION,2004) -> 4297, (ASSAULT,2002) -> 31521, (INTERFERENCE WITH PUBLIC OFFICER,2005) -> 615, (DECEPTIVE PRACTICE,2010) -> 12377, (OFFENSE INVOLVING CHILDREN,2007) -> 2854, (OFFENSE INVOLVING CHILDREN,2015) -> 2193, (HUMAN TRAFFICKING,2013) -> 2, (OTHER OFFENSE,2009) -> 25601, (NON-CRIMINAL,2012) ... scala> println(s"Took ${execTime.toSeconds} secs") Took 155 secs
Ok, so crunching the data took a little over 2 minutes on my laptop, with 12G allocated heap.
Now that we have the data locally, let’s find the top 10 crimes across all years:
scala> val countByType = countByTypeAndYear.groupBy(_._1._1).mapValues(_.values.sum) countByType: scala.collection.immutable.Map[String,Int] = Map(CRIMINAL TRESPASS -> 174206, OFFENSE INVOLVING CHILDREN -> 38309, NARCOTICS -> 674156, RITUALISM -> 23, INTIMIDATION -> 3505, OBSCENITY -> 358, MOTOR VEHICLE THEFT -> 282956, GAMBLING -> 13846, OTHER OFFENSE -> 369153, HOMICIDE -> 7449, NON-CRIMINAL (SUBJECT SPECIFIED) -> 3, SEX OFFENSE -> 21932, NON - CRIMINAL -> 33, PUBLIC INDECENCY -> 130, HUMAN TRAFFICKING -> 17, INTERFERENCE WITH PUBLIC OFFICER -> 11967, ROBBERY -> 223423, DECEPTIVE PRACTICE -> 206607, CRIMINAL DAMAGE -> 687105, ARSON -> 9873, CRIM SEXUAL ASSAULT -> 22231, THEFT -> 1236003, CONCEALED CARRY LICENSE VIOLATION -> 49, PUBLIC PEACE VIOLATION -> 43504, NON-CRIMINAL -> 42, KIDNAPPING -> 6127, OTHER NARCOTIC VIOLATION -> 108, BATTERY -> 1089044, DOMESTIC VIOLENC... scala> val top10 = countByType.toSeq.sortBy(_._2).reverse.take(10) top10: Seq[(String, Int)] = ArrayBuffer((THEFT,1236003), (BATTERY,1089044), (CRIMINAL DAMAGE,687105), (NARCOTICS,674156), (OTHER OFFENSE,369153), (ASSAULT,362605), (BURGLARY,350545), (MOTOR VEHICLE THEFT,282956), (ROBBERY,223423), (DECEPTIVE PRACTICE,206607))
Using this top 10, we can filter the data set:
scala> val top10Types = top10.map(_._1).toSet top10Types: scala.collection.immutable.Set[String] = Set(NARCOTICS, MOTOR VEHICLE THEFT, OTHER OFFENSE, ROBBERY, DECEPTIVE PRACTICE, CRIMINAL DAMAGE, THEFT, BATTERY, BURGLARY, ASSAULT) scala> val top10CountByTypeAndYear = countByTypeAndYear.filter(t => top10Types.contains(t._1._1)) top10CountByTypeAndYear: scala.collection.Map[(String, Short),Int] = Map((BURGLARY,2003) -> 25156, (MOTOR VEHICLE THEFT,2014) -> 9902, (THEFT,2015) -> 57219, (BATTERY,2012) -> 59132, (NARCOTICS,2001) -> 50567, (OTHER OFFENSE,2014) -> 16958, (ASSAULT,2010) -> 21535, (ROBBERY,2001) -> 18441, (BATTERY,2013) -> 54002, (THEFT,2008) -> 88430, (NARCOTICS,2009) -> 43543, (CRIMINAL DAMAGE,2012) -> 35852, (BATTERY,2005) -> 83964, (ROBBERY,2002) -> 18522, (MOTOR VEHICLE THEFT,2008) -> 18881, (BURGLARY,2012) -> 22844, (OTHER OFFENSE,2006) -> 27099, (DECEPTIVE PRACTICE,2001) -> 14890, (CRIMINAL DAMAGE,2005) -> 54548, (BATTERY,2010) -> 65400, (CRIMINAL DAMAGE,2009) -> 47724, (THEFT,2005) -> 85685, (OTHER OFFENSE,2015) -> 17484, (BURGLARY,2002) -> 25623, (THEFT,2006) -> 86238, (OTHER OFFENSE,2005) -> ...
And then plot it out:
Conclusion
Ok, so there you have it. Very easy to crunch data in Hazelcast, and crime in Chicago is down too, so what’s not to like?
Well, but wait… more than 2 minutes to crunch that?? Come on, what is this, 1946?
Don’t worry, in part 2 we’ll be taking a look at how to make this perform better, much better.