Cluster Leader election with Spring Integration and Hazelcast

Tomas Kloucek recently published a bitbucket post entitled, Cluster Leader Election with Spring Integration in Hazelcast. In the post, Tomas demonstrates a method for fine-grained control of Microservice and IMap interaction with Hazelcast and Spring.


Recently when checking Spring Integration area I noticed of pretty nice integration with Hazelcast datagrid. Check this out at:

https://github.com/spring-projects/spring-integration-extensions/tree/master/spring-integration-hazelcast

If you use Hazelcast then you can feed your Spring Integration channel infrastructure from various distributed datastructures backed by Hazelcast like:

com.hazelcast.core.IMap,
 com.hazelcast.core.MultiMap,
 com.hazelcast.core.IList,
 com.hazelcast.core.ISet,
 com.hazelcast.core.IQueue,
 com.hazelcast.core.ITopic,
 com.hazelcast.core.ReplicatedMap

Sweet, but what really impressed me is the implementation of cluster leader election by the Spring Integration team via Hazelcast. Let’s take a look on that and test it. But first things first:

Demo Task: We’re having two Spring Boot Microservices, both feeding distributed IMap with random values every 10 seconds. Now ONLY ONE Microservice is allowed to consume the data from IMap in one moment. To make it a little spicy, Microservice should give up its leadership after one message consuming to hand over the work to other nodes.

Solution via Spring Integration Cluster Leadership:

First what you need to do is to add Spring Boot MicroService into the leadership game:

@Bean
        public Candidate nodeService1Candidate() {
            final NodeCandidate candidate = new NodeCandidate("service1", HazelcastConfiguration.ROLE_JOB_MAP);
            return candidate;
        }
    
    
        @Bean
        public LeaderInitiator initiator() {
            final LeaderInitiator leaderInitiator = new LeaderInitiator(hazelcastConfiguration.hazelcastInstance(), nodeService1Candidate());
            return leaderInitiator;
        }

But that’s not all. Our goal is to start the data consuming from IMap after leadership is granted and on the other hand stop the data consuming after leadership is revoked. To do that we need to listen to the onGranted and onRevoked events in the org.springframework.integration.leader.DefaultCandidate subclass, NodeCandidate. First constructor parameter is the node id, second is the role name.

To be informed about the data changes SI Hazelcast integration offers HazelcastEventDrivenMessageProducer which listens on the distributed IMap changes and delegates appropriate data change events to the Spring Integration channel infrastructure.

@Configuration
    public class HazelcastConfiguration {
        .
        .
        @Bean
        public IMap<String, String> getDistributedMapForJobInput() {
            return hazelcastInstance().getMap(INPUT_JOB_MAP);
        }
    
        @Bean
        public MessageChannel inputJobChannel() {
            return new DirectChannel();
        }
    
        @Bean
        public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
            final HazelcastEventDrivenMessageProducer producer =
                    new HazelcastEventDrivenMessageProducer(
                            getDistributedMapForJobInput()
                    );
            producer.setOutputChannel(inputJobChannel());
            producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
            producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
            producer.setAutoStartup(false);
    
            return producer;
        }
    }

Notice of setAutostartup(false). We want to have this producer down initiatily and start it when leadership is granted. Lets code it then:

/**
     * Created by tomask79 on 24.08.17.
     */
    public class NodeCandidate extends DefaultCandidate {
    
        @Autowired
        private HazelcastConfiguration hazelcastConfiguration;
    
        public NodeCandidate(String nodeId, String role) {
            super(nodeId, role);
        }
    
        @Override
        public void onGranted(Context ctx) {
            super.onGranted(ctx);
            System.out.println("Leader granted to: "+ctx.toString());
            hazelcastConfiguration.hazelcastEventDrivenMessageProducer().start();
        }
    
        @Override
        public void onRevoked(Context ctx) {
            super.onRevoked(ctx);
            System.out.println("Leader revoked to: "+ctx.toString());
            hazelcastConfiguration.hazelcastEventDrivenMessageProducer().stop();
        }
    }

Last task is consuming the message from distributed IMap and giving up the leadership so other node may take the work and enjoy some fun too. So lets declare ServiceActivator listening on the data from jobInputChannel DirectChannel:

@Bean
        @ServiceActivator(inputChannel = "inputJobChannel")
        public MessageHandler logger() {
            return new LogAndGiveInitiatorHandler();
        }    

logging the message to the standard output:

* Created by tomask79 on 24.08.17.
 */
public class LogAndGiveInitiatorHandler implements MessageHandler{

    @Autowired
    private JobServices jobServices;

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println(message.toString());
        System.out.println("Waiting for another node to take the work...!");
        jobServices.giveUp();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("........");
    }

and commanding the Microservice to give up its leadership:

/**
     * Created by tomask79 on 10.08.17.
     */
    @Service
    public class JobServices {    
        @Autowired
        private LeaderInitiator initiator;
        .
        .
        public void giveUp() {
            if (initiator.getContext().isLeader()) {
                System.out.println("Giving up on leadership: "+initiator.getContext().toString());
                initiator.getContext().yield();
            }
        }
    }

And that’s it! Let’s test the whole package.

Testing the demo

git clone <this repo>
mvn clean all (in the directory with top pom.xml to build all three projects)

Output should be:

[INFO] Reactor Summary:
[INFO] 
[INFO] spring-cloud-cluster-demo .......................... SUCCESS [  0.412 s]
[INFO] spring-microservice-hazelcast ...................... SUCCESS [  2.380 s]
[INFO] spring-microservice-service1 ....................... SUCCESS [  3.685 s]
[INFO] spring-microservice-service2 ....................... SUCCESS [  2.745 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 10.047 s
[INFO] Finished at: 2017-08-28T19:57:53+02:00
[INFO] Final Memory: 40M/532M
[INFO] ------------------------------------------------------------------------

Now open two terminals and run:

java -jar spring-microservice-service1/target/service1-0.0.1-SNAPSHOT.war (in the first terminal)
java -jar spring-microservice-service2/target/service2-0.0.1-SNAPSHOT.war (in the second terminal)

To verify that both Microservices forms valid Hazelcast cluster you should see something like:

Members [2] {
    Member [192.168.1.112]:5702
    Member [192.168.1.112]:5701 this
}

After Hazelcast cluster setup is formed, you should see following output

changing in both terminals interchangeably (service1 < -> service 2):

Terminal 1 (getting and giving up Leadership to service 2):

[st-leadership-0] com.example.hazelcast.NodeCandidate      : DefaultCandidate{role=leader, id=service1} has been granted leadership; context: HazelcastContext{role=leader, id=service1, isLeader=true}
Leader granted to: HazelcastContext{role=leader, id=service1, isLeader=true}
[st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
GenericMessage [payload=EntryEventMessagePayload [key=service18eff005d-6da8-4fb8-b747-f977ad8e1544, value=a61b5f9a-1b96-493d-b240-61ccb549ba17, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5702, id=f9c5455b-b42d-3ab7-ec49-9bd33db9ec5f, hazelcast_eventType=ADDED, timestamp=1503945864993}]
Waiting for another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service1, isLeader=true}

Terminal 2 (getting the Leadership and giving up the Leadership to service 1):

Leader granted to: HazelcastContext{role=leader, id=service2, isLeader=true}
2017-08-28 20:47:08.001  INFO 1357 --- [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer
2017-08-28 20:47:08.019  INFO 1357 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http)
2017-08-28 20:47:08.029  INFO 1357 --- [           main] c.e.SpringMicroserviceServiceComponent   : Started SpringMicroserviceServiceComponent in 12.807 seconds (JVM running for 13.507)
........
GenericMessage [payload=EntryEventMessagePayload [key=service249cde108-5045-4b77-84f7-cdc9f524df04, value=c4fe775f-e44d-4f10-ac43-0fe7157c0e67, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5701, id=df474178-1ff1-35e5-e1e1-d3f6f25d6d68, hazelcast_eventType=ADDED, timestamp=1503946037904}]
Waiting for another node to take the work...!
Giving up on leadership: HazelcastContext{role=leader, id=service2, isLeader=true}

…and so on and so on…

Summary

Just a few thoughts. If number of messages which flows through your system at the production goes to just a few thousands per day (rate in our production system in Embedit) then Hazelcast is certainly an overkill. And always go with JMS/AMPQ to distribute the data to your nodes in round robin manner. Don’t be a fool…:) But when dealing with Big Data stored in-memory. You shouldn’t miss the Spring Integration Election agorithm backed by Hazelcast.

regards

Tomas