CastMapR – The Hazelcast 3 MapReduce Framework

A few days ago while porting our current system to Hazelcast 3 Snapshots I finally decided to start a MapReduce implementation for Hazelcast which I was missing for a long time.

Whereas there always was a way to query IMaps in a distributed manner using Predicates I missed a solution for doing calculations / conversions on remote cluster nodes as possible with MapReduce and so I started it.

Thinking about the API to use I came to the solution to make it very similar to that of Redhat’s Infinispan which is widely known and known to work.

Currently the implementation features support for IMap but will be extended to IList, ISet, MultiMap and eventually the new NestedMaps (when arriving in the wild world).

The project itself is (as always) available licensed using Apache License 2.0 and is hosted on GitHub https://github.com/noctarius/castmapr

But now enough of the words and have a look what a simple example will look like:

public class CastMapRDemo
{
  public static void main(String[] args)
  {
    HazelcastInstance hazelcast = nodeFactory
        .newHazelcastInstance();
    IMap map = hazelcast.getMap( "PlayerLogins" );
     
    MapReduceTaskFactory factory = MapReduceTaskFactory
        .newInstance( hazelcast );
     
    MapReduceTask task = factory.build( map );
     
    Map loginCounts = task.mapper( new PlayerMapper() )
        .reducer( new LoginReducer() ).submit();
          
    for ( Entry entry : loginCounts.entrySet() )
    {
      System.out.println( "Player " + entry.getKey() 
        + " has " + entry.getValue() + " logins." );
    } 
  }
}
 
public class PlayerMapper extends Mapper
{
  public void map( Integer playerId, Long timestamp,
                   Collector collector )
  {
    // We are interested in the count of player logins so
    // we discard the timestamp information
    collector.emit( playerId, 1 );
  }
}
 
public class LoginReducer implements DistributableReducer
{
  public void reduce( Integer playerId, Iterator values )
  {
    int count = 0;
    while ( values.hasNext() )
    {
      values.next();
      count++;
    }
    return count;
  }
}