Is the Data Evenly Allocated?
Uniformity and balance are key principles for data grids.
All grid members should hold the same amount of data, do the same amount of compute and have the same amount of resources (CPU, etc) available.
At least approximately. It doesn’t have to be exactly even, but pretty close. A hot spot in the grid means an overloaded member and a cold spot means under-utilized resources.
So let’s look at the data spread.
A Recap on Sharding
Hazelcast uses a sharded approach for record storage. The potential range of data records is partitioned into subsections and each grid member is responsible for hosting a fair share of the partitions.
Those of us who remember offices may remember filing cabinets. The idea is the same for sharding. The filing cabinet might contain 26 folders, so we can very quickly determine that the “Stevenson” record would be in the “S” folder and, equally, quickly find that folder. If we add a second filing cabinet, we just rearrange the folders so each cabinet gets half.
The Problem
The above solution, a folder for “A“, a folder for “B“, and so on has some appeal. It’s very easy to understand and implement.
But does it result in a fair data spread? Meaning does each folder get approximately the same number of records? Well, it depends on what we put in it! If we insert dictionary words, probably the spread won’t be fair. A quick look at the size of the sections in a printed dictionary will easily confirm.
But if we insert randomly generated letter sequences, the spread should be fair.
A good idea would be to measure it to see.
We might think we have randomly generated letter sequences, and therefore a fair balance, but there could be a flaw in our random letter generation algorithm.
Measurement is our backstop.
How to Do It?
One of the concepts of Hazelcast is that the distributed nature of storage, spread across multiple member nodes, is an abstraction kept hidden from the end-user. You might specify that two entries should be kept together, but not where. This gives the system freedom to move them from place to place (but still together).
There are ways to determine where keys are though.
Here we will use a method that returns the keys for entries on an individual member node, and another method to determine which partition shard each key belongs to.
For amusement, other ways might include listening for events on one node only. If you get an “INSERT
” event for key “XYZ
” appearing on node 6, you can infer that node 6 is where that key is currently stored.
Map.localKeySet()
The first part of the API used here is Map.localKeySet()
.
In a Hazelcast IMap, the data is spread across the nodes in the cluster.
When we do containsKey() we get a true
or false
if the key is present, but it is abstracted from us where the entry is held.
When we do keySet() we get all the keys currently in the map, but this is for all nodes and deliberately doesn’t confirm which entry is where. Watch for this operation, if the map has many entries across many nodes, the set of all keys may be a large object and may use up all memory.
However, there is a convenient operation available to run server-side.
The localKeySet() operation, when run on a server returns only those keys where the server has the primary copy. In other words, the keys on that server.
Some caution still applies. The server has some of the entries in memory, and the keys for the entries use less memory than the whole entry. But what is returned is a copy of the keys for the entries, so this still creates a potentially large object in memory.
PartitionService.getPartition(key)
Now we know which keys we are interested in, we just need to find out which partition or shard each is in.
The method PartitionService.getPartition(key) allows us to obtain the partition for a key.
If we really wanted to, we could work it out ourselves, as shown here but it’s a lot of work.
The code
In a Java Callable
, we can define a method like this.
public Map call() throws Exception { final Map result = new HashMap(); final PartitionService partitionService = this.hazelcastInstance.getPartitionService(); this.hazelcastInstance.getDistributedObjects() .stream() .filter(distributedObject -> (distributedObject instanceof IMap)) .filter(distributedObject -> !distributedObject.getName().startsWith("__")) .map(distributedObject -> ((IMap) distributedObject)) .forEach(iMap -> { iMap.localKeySet() .forEach(key -> { int partitionId = partitionService.getPartition(key).getPartitionId(); result.merge(partitionId, 1, Integer::sum); }); }); return result; }
For all maps found, we ignore the system internal maps that begin with “__
” and look at the local keys. Then we just count each key against its partition, to return a map of partition ids and their entry counts.
More code
The above code is a Callable
. We need to use Hazelcast’s IExecutorService to get it run on all nodes in the cluster.
Each node will return information about the partitions on that node, which we collate.
Callable callable = new MyCallable(); final Map<Integer, Tuple2> collatedResults = new TreeMap(); Map<Member, Future<Map>> rawResults = this.hazelcastInstance.getExecutorService("default").submitToAllMembers(callable); rawResults.entrySet() .stream() .forEach(memberEntry -> { try { String member = memberEntry.getKey().getAddress().getHost() + ":" + memberEntry.getKey().getAddress().getPort(); Map result = memberEntry.getValue().get(); result.entrySet().stream() .forEach(resultEntry -> collatedResults.put(resultEntry.getKey(), Tuple2.tuple2(resultEntry.getValue(), member))); } catch (Exception e) { e.printStackTrace(); }});
This is a bit more complicated, but not much more.
Invoking the task on multiple nodes gets us a Future for each asynchronous call.
Each node where the task runs returns a map of partition ids and entry counts. We augment the entry count, turning it from a single integer into a pair (Tuple2) of the entry count and the member’s address. This is cosmetic, but it’s useful to display which member has which partition.
A real example
Here’s some of the output from a real example of this. 5,000,000 or so keys are stored in a 5 node cluster with 271 partitions.
The keys here were the toString()
representation of randomly generated UUID.
Partition 0 - entry count 18508 - member 10.48.4.6:5701 Partition 1 - entry count 18647 - member 10.48.4.6:5701 Partition 2 - entry count 18514 - member 10.48.4.6:5701 Partition 3 - entry count 18274 - member 10.48.4.6:5701 Partition 4 - entry count 18410 - member 10.48.4.6:5701 Partition 5 - entry count 18288 - member 10.48.4.6:5701 Partition 6 - entry count 18622 - member 10.48.4.6:5701 Partition 7 - entry count 18671 - member 10.48.4.6:5701 Partition 8 - entry count 18351 - member 10.48.4.6:5701 Partition 9 - entry count 18603 - member 10.48.4.6:5701 Partition 10 - entry count 18416 - member 10.48.4.6:5701 Partition 11 - entry count 18805 - member 10.48.4.6:5701 Partition 12 - entry count 18331 - member 10.48.4.6:5701 Partition 13 - entry count 18518 - member 10.48.4.6:5701 Partition 14 - entry count 18305 - member 10.48.4.6:5701 Partition 15 - entry count 18540 - member 10.48.4.6:5701 Partition 16 - entry count 18522 - member 10.48.4.6:5701 Partition 17 - entry count 18482 - member 10.48.4.6:5701 Partition 18 - entry count 18330 - member 10.48.4.6:5701 Partition 19 - entry count 18572 - member 10.48.4.6:5701 Partition 20 - entry count 18646 - member 10.48.4.6:5701 Partition 21 - entry count 18401 - member 10.48.4.6:5701 Partition 22 - entry count 18666 - member 10.48.4.6:5701 Partition 23 - entry count 18549 - member 10.48.4.6:5701 Partition 24 - entry count 18344 - member 10.48.4.6:5701 Partition 25 - entry count 18440 - member 10.48.4.6:5701 Partition 26 - entry count 18449 - member 10.48.4.6:5701 Partition 27 - entry count 18765 - member 10.48.4.6:5701 Partition 28 - entry count 18774 - member 10.48.4.6:5701 Partition 29 - entry count 18403 - member 10.48.4.6:5701 Partition 30 - entry count 18363 - member 10.48.4.6:5701 Partition 31 - entry count 18534 - member 10.48.4.6:5701 Partition 32 - entry count 18444 - member 10.48.4.6:5701 Partition 33 - entry count 18502 - member 10.48.4.6:5701 Partition 34 - entry count 18235 - member 10.48.4.6:5701 Partition 35 - entry count 18360 - member 10.48.4.6:5701 Partition 36 - entry count 18600 - member 10.48.3.18:5701 Partition 37 - entry count 18552 - member 10.48.4.6:5701 Partition 38 - entry count 18588 - member 10.48.3.18:5701 Partition 39 - entry count 18656 - member 10.48.3.18:5701 Partition 40 - entry count 18647 - member 10.48.4.6:5701 Partition 41 - entry count 18606 - member 10.48.3.18:5701 Partition 42 - entry count 18484 - member 10.48.3.18:5701 Partition 43 - entry count 18439 - member 10.48.3.18:5701 Partition 44 - entry count 18411 - member 10.48.3.18:5701 Partition 45 - entry count 18668 - member 10.48.3.18:5701 Partition 46 - entry count 18460 - member 10.48.3.18:5701 Partition 47 - entry count 18465 - member 10.48.4.6:5701 Partition 48 - entry count 18499 - member 10.48.3.18:5701 Partition 49 - entry count 18264 - member 10.48.4.6:5701 Partition 50 - entry count 18656 - member 10.48.4.6:5701 Partition 51 - entry count 18520 - member 10.48.4.6:5701 Partition 52 - entry count 18553 - member 10.48.4.6:5701 Partition 53 - entry count 18485 - member 10.48.4.6:5701 Partition 54 - entry count 18300 - member 10.48.3.18:5701 Partition 55 - entry count 18499 - member 10.48.4.6:5701 Partition 56 - entry count 18599 - member 10.48.3.18:5701 Partition 57 - entry count 18436 - member 10.48.3.18:5701 Partition 58 - entry count 18757 - member 10.48.4.6:5701 Partition 59 - entry count 18710 - member 10.48.4.6:5701 Partition 60 - entry count 18612 - member 10.48.3.18:5701 Partition 61 - entry count 18565 - member 10.48.4.6:5701 Partition 62 - entry count 18389 - member 10.48.3.18:5701 Partition 63 - entry count 18666 - member 10.48.4.6:5701 Partition 64 - entry count 18545 - member 10.48.4.6:5701 Partition 65 - entry count 18303 - member 10.48.4.6:5701 Partition 66 - entry count 18632 - member 10.48.4.6:5701 Partition 67 - entry count 18396 - member 10.48.3.18:5701 Partition 68 - entry count 18605 - member 10.48.3.18:5701 Partition 69 - entry count 18579 - member 10.48.4.6:5701 Partition 70 - entry count 18516 - member 10.48.3.18:5701 Partition 71 - entry count 18320 - member 10.48.4.6:5701 Partition 72 - entry count 18489 - member 10.48.3.18:5701 Partition 73 - entry count 18065 - member 10.48.4.6:5701 - MINIMUM Partition 74 - entry count 18402 - member 10.48.4.6:5701 Partition 75 - entry count 18478 - member 10.48.3.18:5701 Partition 76 - entry count 18650 - member 10.48.4.6:5701 Partition 77 - entry count 18566 - member 10.48.4.6:5701 Partition 78 - entry count 18481 - member 10.48.3.18:5701 Partition 79 - entry count 18787 - member 10.48.3.18:5701 Partition 80 - entry count 18480 - member 10.48.3.18:5701 Partition 81 - entry count 18365 - member 10.48.4.6:5701 Partition 82 - entry count 18535 - member 10.48.4.6:5701 Partition 83 - entry count 18328 - member 10.48.4.6:5701 Partition 84 - entry count 18430 - member 10.48.4.6:5701 Partition 85 - entry count 18630 - member 10.48.4.6:5701 Partition 86 - entry count 18469 - member 10.48.4.6:5701 Partition 87 - entry count 18445 - member 10.48.4.6:5701 Partition 88 - entry count 18558 - member 10.48.4.6:5701 Partition 89 - entry count 18591 - member 10.48.0.15:5701 Partition 90 - entry count 18317 - member 10.48.0.15:5701 Partition 91 - entry count 18588 - member 10.48.0.15:5701 Partition 92 - entry count 18539 - member 10.48.0.15:5701 Partition 93 - entry count 18457 - member 10.48.4.6:5701 Partition 94 - entry count 18517 - member 10.48.0.15:5701 Partition 95 - entry count 18605 - member 10.48.0.15:5701 Partition 96 - entry count 18491 - member 10.48.0.15:5701 Partition 97 - entry count 18327 - member 10.48.0.15:5701 Partition 98 - entry count 18650 - member 10.48.0.15:5701 Partition 99 - entry count 18438 - member 10.48.0.15:5701 Partition 100 - entry count 18432 - member 10.48.0.15:5701 Partition 101 - entry count 18655 - member 10.48.0.15:5701 Partition 102 - entry count 18313 - member 10.48.0.15:5701 Partition 103 - entry count 18544 - member 10.48.0.15:5701 Partition 104 - entry count 18519 - member 10.48.0.15:5701 Partition 105 - entry count 18300 - member 10.48.0.15:5701 Partition 106 - entry count 18437 - member 10.48.0.15:5701 Partition 107 - entry count 18600 - member 10.48.0.15:5701 Partition 108 - entry count 18545 - member 10.48.0.15:5701 Partition 109 - entry count 18370 - member 10.48.0.15:5701 Partition 110 - entry count 18370 - member 10.48.0.15:5701 Partition 111 - entry count 18456 - member 10.48.0.15:5701 Partition 112 - entry count 18400 - member 10.48.0.15:5701 Partition 113 - entry count 18393 - member 10.48.0.15:5701 Partition 114 - entry count 18694 - member 10.48.0.15:5701 Partition 115 - entry count 18817 - member 10.48.0.15:5701 - MAXIMUM Partition 116 - entry count 18459 - member 10.48.0.15:5701 Partition 117 - entry count 18360 - member 10.48.0.15:5701 Partition 118 - entry count 18574 - member 10.48.0.15:5701 Partition 119 - entry count 18385 - member 10.48.0.15:5701 Partition 120 - entry count 18609 - member 10.48.0.15:5701 Partition 121 - entry count 18640 - member 10.48.0.15:5701 Partition 122 - entry count 18521 - member 10.48.0.15:5701 Partition 123 - entry count 18528 - member 10.48.0.15:5701 Partition 124 - entry count 18548 - member 10.48.0.15:5701 Partition 125 - entry count 18544 - member 10.48.0.15:5701 Partition 126 - entry count 18080 - member 10.48.0.15:5701 Partition 127 - entry count 18370 - member 10.48.0.15:5701 Partition 128 - entry count 18408 - member 10.48.0.15:5701 Partition 129 - entry count 18498 - member 10.48.0.15:5701 Partition 130 - entry count 18547 - member 10.48.0.15:5701 Partition 131 - entry count 18695 - member 10.48.0.15:5701 Partition 132 - entry count 18494 - member 10.48.0.15:5701 Partition 133 - entry count 18464 - member 10.48.0.15:5701 Partition 134 - entry count 18492 - member 10.48.0.15:5701 Partition 135 - entry count 18701 - member 10.48.3.18:5701 Partition 136 - entry count 18555 - member 10.48.3.18:5701 Partition 137 - entry count 18476 - member 10.48.3.18:5701 Partition 138 - entry count 18709 - member 10.48.3.18:5701 Partition 139 - entry count 18457 - member 10.48.3.18:5701 Partition 140 - entry count 18556 - member 10.48.3.18:5701 Partition 141 - entry count 18251 - member 10.48.3.18:5701 Partition 142 - entry count 18623 - member 10.48.3.18:5701 Partition 143 - entry count 18451 - member 10.48.3.18:5701 Partition 144 - entry count 18605 - member 10.48.3.18:5701 Partition 145 - entry count 18563 - member 10.48.3.18:5701 Partition 146 - entry count 18690 - member 10.48.3.18:5701 Partition 147 - entry count 18547 - member 10.48.3.18:5701 Partition 148 - entry count 18373 - member 10.48.3.18:5701 Partition 149 - entry count 18502 - member 10.48.3.18:5701 Partition 150 - entry count 18357 - member 10.48.3.18:5701 Partition 151 - entry count 18542 - member 10.48.3.18:5701 Partition 152 - entry count 18385 - member 10.48.3.18:5701 Partition 153 - entry count 18706 - member 10.48.3.18:5701 Partition 154 - entry count 18637 - member 10.48.3.18:5701 Partition 155 - entry count 18386 - member 10.48.3.18:5701 Partition 156 - entry count 18700 - member 10.48.3.18:5701 Partition 157 - entry count 18578 - member 10.48.3.18:5701 Partition 158 - entry count 18323 - member 10.48.3.18:5701 Partition 159 - entry count 18466 - member 10.48.3.18:5701 Partition 160 - entry count 18535 - member 10.48.3.18:5701 Partition 161 - entry count 18495 - member 10.48.3.18:5701 Partition 162 - entry count 18445 - member 10.48.3.18:5701 Partition 163 - entry count 18419 - member 10.48.3.18:5701 Partition 164 - entry count 18246 - member 10.48.3.18:5701 Partition 165 - entry count 18608 - member 10.48.3.18:5701 Partition 166 - entry count 18374 - member 10.48.3.18:5701 Partition 167 - entry count 18370 - member 10.48.3.18:5701 Partition 168 - entry count 18442 - member 10.48.3.18:5701 Partition 169 - entry count 18553 - member 10.48.3.18:5701 Partition 170 - entry count 18492 - member 10.48.3.18:5701 Partition 171 - entry count 18467 - member 10.48.3.18:5701 Partition 172 - entry count 18463 - member 10.48.3.18:5701 Partition 173 - entry count 18455 - member 10.48.3.18:5701 Partition 174 - entry count 18631 - member 10.48.3.18:5701 Partition 175 - entry count 18345 - member 10.48.3.18:5701 Partition 176 - entry count 18338 - member 10.48.3.18:5701 Partition 177 - entry count 18437 - member 10.48.3.18:5701 Partition 178 - entry count 18465 - member 10.48.3.18:5701 Partition 179 - entry count 18307 - member 10.48.3.18:5701 Partition 180 - entry count 18482 - member 10.48.0.15:5701 Partition 181 - entry count 18384 - member 10.48.0.15:5701 Partition 182 - entry count 18329 - member 10.48.0.15:5701 Partition 183 - entry count 18469 - member 10.48.0.15:5701 Partition 184 - entry count 18211 - member 10.48.0.15:5701 Partition 185 - entry count 18546 - member 10.48.1.10:5701 Partition 186 - entry count 18523 - member 10.48.1.10:5701 Partition 187 - entry count 18460 - member 10.48.1.10:5701 Partition 188 - entry count 18538 - member 10.48.0.15:5701 Partition 189 - entry count 18293 - member 10.48.0.15:5701 Partition 190 - entry count 18463 - member 10.48.0.15:5701 Partition 191 - entry count 18365 - member 10.48.0.15:5701 Partition 192 - entry count 18495 - member 10.48.0.15:5701 Partition 193 - entry count 18536 - member 10.48.0.15:5701 Partition 194 - entry count 18287 - member 10.48.1.10:5701 Partition 195 - entry count 18664 - member 10.48.0.15:5701 Partition 196 - entry count 18393 - member 10.48.0.15:5701 Partition 197 - entry count 18642 - member 10.48.1.10:5701 Partition 198 - entry count 18306 - member 10.48.1.10:5701 Partition 199 - entry count 18576 - member 10.48.1.10:5701 Partition 200 - entry count 18395 - member 10.48.1.10:5701 Partition 201 - entry count 18357 - member 10.48.1.10:5701 Partition 202 - entry count 18589 - member 10.48.1.10:5701 Partition 203 - entry count 18437 - member 10.48.0.15:5701 Partition 204 - entry count 18787 - member 10.48.1.10:5701 Partition 205 - entry count 18657 - member 10.48.0.15:5701 Partition 206 - entry count 18601 - member 10.48.0.15:5701 Partition 207 - entry count 18256 - member 10.48.1.10:5701 Partition 208 - entry count 18420 - member 10.48.1.10:5701 Partition 209 - entry count 18422 - member 10.48.0.15:5701 Partition 210 - entry count 18589 - member 10.48.0.15:5701 Partition 211 - entry count 18537 - member 10.48.0.15:5701 Partition 212 - entry count 18441 - member 10.48.0.15:5701 Partition 213 - entry count 18325 - member 10.48.0.15:5701 Partition 214 - entry count 18458 - member 10.48.0.15:5701 Partition 215 - entry count 18472 - member 10.48.0.15:5701 Partition 216 - entry count 18485 - member 10.48.1.10:5701 Partition 217 - entry count 18525 - member 10.48.1.10:5701 Partition 218 - entry count 18600 - member 10.48.1.10:5701 Partition 219 - entry count 18502 - member 10.48.1.10:5701 Partition 220 - entry count 18605 - member 10.48.1.10:5701 Partition 221 - entry count 18398 - member 10.48.1.10:5701 Partition 222 - entry count 18679 - member 10.48.1.10:5701 Partition 223 - entry count 18712 - member 10.48.1.10:5701 Partition 224 - entry count 18563 - member 10.48.1.10:5701 Partition 225 - entry count 18535 - member 10.48.1.10:5701 Partition 226 - entry count 18651 - member 10.48.1.10:5701 Partition 227 - entry count 18141 - member 10.48.1.10:5701 Partition 228 - entry count 18502 - member 10.48.1.10:5701 Partition 229 - entry count 18460 - member 10.48.1.10:5701 Partition 230 - entry count 18266 - member 10.48.1.10:5701 Partition 231 - entry count 18475 - member 10.48.1.10:5701 Partition 232 - entry count 18534 - member 10.48.1.10:5701 Partition 233 - entry count 18436 - member 10.48.1.10:5701 Partition 234 - entry count 18619 - member 10.48.1.10:5701 Partition 235 - entry count 18486 - member 10.48.1.10:5701 Partition 236 - entry count 18416 - member 10.48.1.10:5701 Partition 237 - entry count 18565 - member 10.48.1.10:5701 Partition 238 - entry count 18629 - member 10.48.1.10:5701 Partition 239 - entry count 18513 - member 10.48.1.10:5701 Partition 240 - entry count 18543 - member 10.48.1.10:5701 Partition 241 - entry count 18580 - member 10.48.1.10:5701 Partition 242 - entry count 18604 - member 10.48.1.10:5701 Partition 243 - entry count 18385 - member 10.48.1.10:5701 Partition 244 - entry count 18371 - member 10.48.1.10:5701 Partition 245 - entry count 18428 - member 10.48.1.10:5701 Partition 246 - entry count 18442 - member 10.48.1.10:5701 Partition 247 - entry count 18537 - member 10.48.1.10:5701 Partition 248 - entry count 18231 - member 10.48.1.10:5701 Partition 249 - entry count 18385 - member 10.48.1.10:5701 Partition 250 - entry count 18657 - member 10.48.1.10:5701 Partition 251 - entry count 18461 - member 10.48.1.10:5701 Partition 252 - entry count 18805 - member 10.48.1.10:5701 Partition 253 - entry count 18553 - member 10.48.1.10:5701 Partition 254 - entry count 18500 - member 10.48.1.10:5701 Partition 255 - entry count 18619 - member 10.48.1.10:5701 Partition 256 - entry count 18409 - member 10.48.1.10:5701 Partition 257 - entry count 18478 - member 10.48.1.10:5701 Partition 258 - entry count 18716 - member 10.48.1.10:5701 Partition 259 - entry count 18346 - member 10.48.1.10:5701 Partition 260 - entry count 18663 - member 10.48.1.10:5701 Partition 261 - entry count 18510 - member 10.48.1.10:5701 Partition 262 - entry count 18405 - member 10.48.1.10:5701 Partition 263 - entry count 18486 - member 10.48.1.10:5701 Partition 264 - entry count 18323 - member 10.48.1.10:5701 Partition 265 - entry count 18560 - member 10.48.1.10:5701 Partition 266 - entry count 18647 - member 10.48.1.10:5701 Partition 267 - entry count 18625 - member 10.48.1.10:5701 Partition 268 - entry count 18363 - member 10.48.1.10:5701 Partition 269 - entry count 18673 - member 10.48.1.10:5701 Partition 270 - entry count 18478 - member 10.48.1.10:5701 ------------------------------------- Total 5012116, StdDev 130.69222311720472, Maximum 18817, Mininum 18065 -------------------------------------
As you’ll see at the bottom, the extra code has calculated the standard deviation, minimum and maximum. So we have good confirmation that all our partitions contain roughly the same amount of data. Result!
Migration
The above code is a sample. For simplicity, production strength coding such as dealing with exceptions has been omitted.
Also, it assumes the partitions don’t move while the results are being calculated. If a node joins or leaves the cluster then the partitions will get moved about to rebalance the load.
Summary
All members in a grid should contribute equally to the collective storage and processing workload. If they’re not equally loaded that condition is unlikely to be met.
While a rebalance moves the shards around, if one is disproportionately large, all we do is move the problematic shard from one node to another.
Counting records is a good start. But if records are variable-sized and vary wildly, an even count may not mean an even data load.