Cluster Leader election with Spring Integration and Hazelcast
Tomas Kloucek recently published a bitbucket post entitled, Cluster Leader Election with Spring Integration in Hazelcast. In the post, Tomas demonstrates a method for fine-grained control of Microservice and IMap interaction with Hazelcast and Spring.
Recently when checking Spring Integration area I noticed of pretty nice integration with Hazelcast datagrid. Check this out at:
If you use Hazelcast then you can feed your Spring Integration channel infrastructure from various distributed datastructures backed by Hazelcast like:
com.hazelcast.core.IMap, com.hazelcast.core.MultiMap, com.hazelcast.core.IList, com.hazelcast.core.ISet, com.hazelcast.core.IQueue, com.hazelcast.core.ITopic, com.hazelcast.core.ReplicatedMap
Sweet, but what really impressed me is the implementation of cluster leader election by the Spring Integration team via Hazelcast. Let’s take a look on that and test it. But first things first:
Demo Task: We’re having two Spring Boot Microservices, both feeding distributed IMap with random values every 10 seconds. Now ONLY ONE Microservice is allowed to consume the data from IMap in one moment. To make it a little spicy, Microservice should give up its leadership after one message consuming to hand over the work to other nodes.
Solution via Spring Integration Cluster Leadership:
First what you need to do is to add Spring Boot MicroService into the leadership game:
@Bean public Candidate nodeService1Candidate() { final NodeCandidate candidate = new NodeCandidate("service1", HazelcastConfiguration.ROLE_JOB_MAP); return candidate; } @Bean public LeaderInitiator initiator() { final LeaderInitiator leaderInitiator = new LeaderInitiator(hazelcastConfiguration.hazelcastInstance(), nodeService1Candidate()); return leaderInitiator; }
But that’s not all. Our goal is to start the data consuming from IMap after leadership is granted and on the other hand stop the data consuming after leadership is revoked. To do that we need to listen to the onGranted and onRevoked events in the org.springframework.integration.leader.DefaultCandidate subclass, NodeCandidate. First constructor parameter is the node id, second is the role name.
To be informed about the data changes SI Hazelcast integration offers HazelcastEventDrivenMessageProducer which listens on the distributed IMap changes and delegates appropriate data change events to the Spring Integration channel infrastructure.
@Configuration public class HazelcastConfiguration { . . @Bean public IMap<String, String> getDistributedMapForJobInput() { return hazelcastInstance().getMap(INPUT_JOB_MAP); } @Bean public MessageChannel inputJobChannel() { return new DirectChannel(); } @Bean public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() { final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer( getDistributedMapForJobInput() ); producer.setOutputChannel(inputJobChannel()); producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL"); producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE); producer.setAutoStartup(false); return producer; } }
Notice of setAutostartup(false). We want to have this producer down initiatily and start it when leadership is granted. Lets code it then:
/** * Created by tomask79 on 24.08.17. */ public class NodeCandidate extends DefaultCandidate { @Autowired private HazelcastConfiguration hazelcastConfiguration; public NodeCandidate(String nodeId, String role) { super(nodeId, role); } @Override public void onGranted(Context ctx) { super.onGranted(ctx); System.out.println("Leader granted to: "+ctx.toString()); hazelcastConfiguration.hazelcastEventDrivenMessageProducer().start(); } @Override public void onRevoked(Context ctx) { super.onRevoked(ctx); System.out.println("Leader revoked to: "+ctx.toString()); hazelcastConfiguration.hazelcastEventDrivenMessageProducer().stop(); } }
Last task is consuming the message from distributed IMap and giving up the leadership so other node may take the work and enjoy some fun too. So lets declare ServiceActivator listening on the data from jobInputChannel DirectChannel:
@Bean @ServiceActivator(inputChannel = "inputJobChannel") public MessageHandler logger() { return new LogAndGiveInitiatorHandler(); }
logging the message to the standard output:
* Created by tomask79 on 24.08.17. */ public class LogAndGiveInitiatorHandler implements MessageHandler{ @Autowired private JobServices jobServices; @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message.toString()); System.out.println("Waiting for another node to take the work...!"); jobServices.giveUp(); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("........"); }
and commanding the Microservice to give up its leadership:
/** * Created by tomask79 on 10.08.17. */ @Service public class JobServices { @Autowired private LeaderInitiator initiator; . . public void giveUp() { if (initiator.getContext().isLeader()) { System.out.println("Giving up on leadership: "+initiator.getContext().toString()); initiator.getContext().yield(); } } }
And that’s it! Let’s test the whole package.
Testing the demo
git clone <this repo> mvn clean all (in the directory with top pom.xml to build all three projects)
Output should be:
[INFO] Reactor Summary: [INFO] [INFO] spring-cloud-cluster-demo .......................... SUCCESS [ 0.412 s] [INFO] spring-microservice-hazelcast ...................... SUCCESS [ 2.380 s] [INFO] spring-microservice-service1 ....................... SUCCESS [ 3.685 s] [INFO] spring-microservice-service2 ....................... SUCCESS [ 2.745 s] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 10.047 s [INFO] Finished at: 2017-08-28T19:57:53+02:00 [INFO] Final Memory: 40M/532M [INFO] ------------------------------------------------------------------------
Now open two terminals and run:
java -jar spring-microservice-service1/target/service1-0.0.1-SNAPSHOT.war (in the first terminal) java -jar spring-microservice-service2/target/service2-0.0.1-SNAPSHOT.war (in the second terminal)
To verify that both Microservices forms valid Hazelcast cluster you should see something like:
Members [2] { Member [192.168.1.112]:5702 Member [192.168.1.112]:5701 this }
After Hazelcast cluster setup is formed, you should see following output
changing in both terminals interchangeably (service1 < -> service 2):
Terminal 1 (getting and giving up Leadership to service 2):
[st-leadership-0] com.example.hazelcast.NodeCandidate : DefaultCandidate{role=leader, id=service1} has been granted leadership; context: HazelcastContext{role=leader, id=service1, isLeader=true} Leader granted to: HazelcastContext{role=leader, id=service1, isLeader=true} [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer GenericMessage [payload=EntryEventMessagePayload [key=service18eff005d-6da8-4fb8-b747-f977ad8e1544, value=a61b5f9a-1b96-493d-b240-61ccb549ba17, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5702, id=f9c5455b-b42d-3ab7-ec49-9bd33db9ec5f, hazelcast_eventType=ADDED, timestamp=1503945864993}] Waiting for another node to take the work...! Giving up on leadership: HazelcastContext{role=leader, id=service1, isLeader=true}
Terminal 2 (getting the Leadership and giving up the Leadership to service 1):
Leader granted to: HazelcastContext{role=leader, id=service2, isLeader=true} 2017-08-28 20:47:08.001 INFO 1357 --- [st-leadership-0] .h.i.HazelcastEventDrivenMessageProducer : started hazelcastEventDrivenMessageProducer 2017-08-28 20:47:08.019 INFO 1357 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8082 (http) 2017-08-28 20:47:08.029 INFO 1357 --- [ main] c.e.SpringMicroserviceServiceComponent : Started SpringMicroserviceServiceComponent in 12.807 seconds (JVM running for 13.507) ........ GenericMessage [payload=EntryEventMessagePayload [key=service249cde108-5045-4b77-84f7-cdc9f524df04, value=c4fe775f-e44d-4f10-ac43-0fe7157c0e67, oldValue=null], headers={hazelcast_cacheName=randomInputDataMap, hazelcast_member=/192.168.1.112:5701, id=df474178-1ff1-35e5-e1e1-d3f6f25d6d68, hazelcast_eventType=ADDED, timestamp=1503946037904}] Waiting for another node to take the work...! Giving up on leadership: HazelcastContext{role=leader, id=service2, isLeader=true}
…and so on and so on…
Summary
Just a few thoughts. If number of messages which flows through your system at the production goes to just a few thousands per day (rate in our production system in Embedit) then Hazelcast is certainly an overkill. And always go with JMS/AMPQ to distribute the data to your nodes in round robin manner. Don’t be a fool…:) But when dealing with Big Data stored in-memory. You shouldn’t miss the Spring Integration Election agorithm backed by Hazelcast.
regards
Tomas