I can make Lambda Expressions distributed!
When I first learned about Lambda expressions, I felt an immediate desire for distributing them with Hazelcast Distributed Executor Service. The idea was to submit a lambda expression into Hazelcast Executor Service and let Hazelcast to execute it on a remote node.
Here is how I want it to look like:
HazelcastInstance h = Hazelcast.newHazelcastInstance(config); ExecutorService ex = h.getExecutorService("ex"); ex.execute(()-> System.out.println("I am a lambda expression"));
However the serialization was a barrier here. Everything that you pass to Hazelcast needs to be serializable and there wasn’t a clean and elegant way of making a lambda expression serializable.
Been a while and I forgot this idea until couple of days ago when I realized that I could use a custom Serializer to serialize a lambda expression.
A Pluggable Custom Serialization was introduced as a feature in Hazelcast 3. It enables plugging a custom Serializer and this way a user is not limited to what Hazelcast offers and can use any serialization framework.
I grabbed my laptop and here comes my favorite serialization framework, Kryo. Kryo can serialize an object even if it is not marked as java.io.Serializable. A very simple custom Serializer that I implemented before is as follows. I just added the registration of a Runnable class and it is ready to go.
import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.esotericsoftware.kryo.io.UnsafeInput; import com.esotericsoftware.kryo.io.UnsafeOutput; import com.hazelcast.nio.ObjectDataInput; import com.hazelcast.nio.ObjectDataOutput; import com.hazelcast.nio.serialization.StreamSerializer; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; public class KryoSerializer implements StreamSerializer<Object> { Kryo kryo = new Kryo(); final boolean unsafe; public SOKryoSerializer(boolean unsafe) { this.unsafe = unsafe; kryo.register(Runnable.class); } public int getTypeId() { return 5; // } public void write(ObjectDataOutput out, Object object) throws IOException { Output output = unsafe ? new UnsafeOutput((OutputStream) out) : new Output((OutputStream) out); kryo.writeClassAndObject(output, object); output.flush(); } public Object read(ObjectDataInput in) throws IOException { Input input = unsafe ? new UnsafeInput((InputStream) in) : new Input((InputStream) in); return kryo.readClassAndObject(input); } public void destroy() { } }
Creating a Config object and plugging the Serializer is easy.
SerializerConfig sc = new SerializerConfig(). setImplementation(new KryoSerializer(false)). setTypeClass(Runnable.class); Config config = new Config(); config.getSerializationConfig().addSerializerConfig(sc);
Finally, I love this simplicity
HazelcastInstance h = Hazelcast.newHazelcastInstance(config); ExecutorService ex = h.getExecutorService("ex"); ex.execute(()-> System.out.println("I can make a lambda " + "expression distributed"));