I can make Lambda Expressions distributed!

When I first learned about Lambda expressions, I felt an immediate desire for distributing them with Hazelcast Distributed Executor Service. The idea was to submit a lambda expression into Hazelcast Executor Service and let Hazelcast to execute it on a remote node.

Here is how I want it to look like:

HazelcastInstance h = Hazelcast.newHazelcastInstance(config);
ExecutorService ex = h.getExecutorService("ex");
ex.execute(()-> System.out.println("I am a lambda expression"));

However the serialization was a barrier here. Everything that you pass to Hazelcast needs to be serializable and there wasn’t a clean and elegant way of making a lambda expression serializable.

Been a while and I forgot this idea until couple of days ago when I realized that I could use a custom Serializer to serialize a lambda expression.

Pluggable Custom Serialization was introduced as a feature in Hazelcast 3. It enables plugging a custom Serializer and this way a user is not limited to what Hazelcast offers and can use any serialization framework.

I grabbed my laptop and here comes my favorite serialization framework, Kryo. Kryo can serialize an object even if it is not marked as java.io.Serializable.  A very simple custom Serializer that I implemented before is as follows. I just added the registration of a Runnable class and it is ready to go.

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.io.UnsafeInput;
import com.esotericsoftware.kryo.io.UnsafeOutput;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class KryoSerializer implements StreamSerializer<Object> {

    Kryo kryo = new Kryo();
    final boolean unsafe;

    public SOKryoSerializer(boolean unsafe) {
        this.unsafe = unsafe;
        kryo.register(Runnable.class);
    }

    public int getTypeId() {
        return 5; //
    }

    public void write(ObjectDataOutput out, Object object)
            throws IOException {
        Output output = unsafe ?
                new UnsafeOutput((OutputStream) out) :
                new Output((OutputStream) out);
        kryo.writeClassAndObject(output, object);
        output.flush();
    }

    public Object read(ObjectDataInput in) throws IOException {
        Input input = unsafe ?
                new UnsafeInput((InputStream) in) :
                new Input((InputStream) in);
        return kryo.readClassAndObject(input);
    }

    public void destroy() {
    }
}

Creating a Config object and plugging the Serializer is easy.

SerializerConfig sc = new SerializerConfig().
                setImplementation(new KryoSerializer(false)).
                setTypeClass(Runnable.class);
Config config = new Config();
config.getSerializationConfig().addSerializerConfig(sc);

Finally, I love this simplicity

HazelcastInstance h = Hazelcast.newHazelcastInstance(config);
ExecutorService ex = h.getExecutorService("ex");
ex.execute(()-> System.out.println("I can make a lambda " +
                "expression distributed"));