Upgrading to Jet 4.0
As we have announced earlier, Jet 4.0 is out! In this blog post, we aim to give you the lower-level details needed for migrating from older versions.
Jet 4.0 is a major version release. According to the semantic versioning, we apply, this means that in version 4.0 some of the API has changed in a breaking way and code written for 3.x may no longer compile against it.
Jet on IMDG 4.0
Jet 4.0 uses IMDG 4.0, which is also a major release with its own breaking changes. For details see IMDG Release Notes and IMDG Migration Guides.
The most important changes we made and which have affected Jet too are as follows:
- We renamed many packages and moved classes around. For details see the IMDG Release Notes. The most obvious change is that many classes that used to be in the generalÂ
com.hazelcast.core package are now in specific packages likeÂcom.hazelcast.map orÂcom.hazelcast.collection. com.hazelcast.jet.function, the package containing serializable variants ofÂjava.util.function, is now merged intoÂcom.hazelcast.function: BiConsumerEx, BiFunctionEx,.
BinaryOperatorEx, BiPredicateEx, ComparatorEx, ComparatorsEx,
ConsumerEx, FunctionEx, Functions, PredicateEx, SupplierEx,
ToDoubleFunctionEx, ToIntFunctionEx, ToLongFunctionExEntryProcessor and several other classes and methods received a cleanup of their type parameters. See the relevant section in the IMDG Migration Guide.- The term “group” in configuration was replaced with “cluster”. See the code snippet below for an example. This changes a Jet Command Line parameter as well (
-g/--groupName renamed toÂ-n/--cluster-name).clientConfig.setClusterName("cluster_name"); //clientConfig.getGroupConfig().setName("cluster_name") EventJournalConfig moved from the top-level Config class to data structure-specific configs (MapConfig,ÂCacheConfig):config.getMapConfig("map_name").getEventJournalConfig(); //config.getMapEventJournalConfig("map_name")ICompletableFuture was removed and replaced with the JDK-standardÂCompletionStage. This affects the return type of async methods. See the relevant section in the IMDG Migration Guide.
Jet API Changes
We made multiple breaking changes in Jet’s own APIs too:
IMapJet,ÂICacheJet andÂIListJet, which used to be Jet-specific wrappers around IMDG’s standardÂIMap,ÂICache andÂIList, were removed. The methods that used to return these types now return the standard ones.- RenamedÂ
Pipeline.drawFrom toÂPipeline.readFrom andÂGeneralStage.drainTo
toÂGeneralStage.writeTo:pipeline.readFrom(TestSources.items(1, 2, 3)).writeTo(Sinks.logger()); //pipeline.drawFrom(TestSources.items(1, 2, 3)).drainTo(Sinks.logger()); ContextFactory was renamed toÂServiceFactory and we added support for instance-wide initialization. createFn now takesÂProcessorSupplier.Context instead of justÂJetInstance. We also added convenience methods inÂServiceFactories to simplify constructing the common variants:ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8), ExecutorService::shutdown); //ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)).withLocalSharing(); ServiceFactories.nonSharedService(ctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"), ConsumerEx.noop()); //ContextFactory.withCreateFn(jet -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"))map/filter/flatMapUsingContext was renamed toÂmap/filter/flatMapUsingService:pipeline.readFrom(TestSources.items(1, 2, 3)) .filterUsingService( ServiceFactories.sharedService(pctx -> 1), (svc, i) -> i % 2 == svc) .writeTo(Sinks.logger()); /* pipeline.drawFrom(TestSources.items(1, 2, 3)) .filterUsingContext( ContextFactory.withCreateFn(i -> 1), (ctx, i) -> i % 2 == ctx) .drainTo(Sinks.logger()); */filterUsingServiceAsync has been removed. Usages can be replaced withÂmapUsingServiceAsync, which behaves like a filter if it returns aÂnull future or the returned future contains aÂnull result:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Long> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0 ? item : null)); return f; }); /* stage.filterUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Boolean> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0)); return f; }); */flatMapUsingServiceAsync has been removed. Usages can be replaced withÂmapUsingServiceAsync followed by non-asyncÂflatMap:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<List<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(Arrays.asList(item + "-1", item + "-2", item + "-3"))); return f; }) .flatMap(Traversers::traverseIterable); /* stage.flatMapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Traverser<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(traverseItems(item + "-1", item + "-2", item + "-3"))); return f; }) */- The methodsÂ
withMaxPendingCallsPerProcessor(int)Â and
withUnorderedAsyncResponses() were removed fromÂServiceFactory. These properties are relevant only in the context of asynchronous operations and were used in conjunction withÂGeneralStage.mapUsingServiceAsync(…​). In Jet 4.0 theÂGeneralStage.mapUsingServiceAsync(…​) method has a new variant with explicit parameters for the above settings: -
stage.mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8)), 2, false, (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); /* stage.mapUsingContextAsync( ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)) .withMaxPendingCallsPerProcessor(2) .withUnorderedAsyncResponses(), (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); */ com.hazelcast.jet.pipeline.Sinks#mapWithEntryProcessor got a new signature in order to accommodate the improvedÂEntryProcessor, which became more lambda-friendly in IMDG (see the relevant section in the IMDG Migration Guide). The return type ofÂEntryProcessor is now an explicit parameter inÂmapWithEntryProcessor‘s method signature:FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer, Void>> entryProcFn = entry -> (EntryProcessor<String, Integer, Void>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); /* FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer>> entryProcFn = entry -> (EntryProcessor<String, Integer>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); */- HDFS source and sink methods are nowÂ
Hadoop.inputFormat andÂHadoop.outputFormat. MetricsConfig is no longer part ofÂJetConfig, but resides in the IMDGÂConfig class:jetConfig.getHazelcastConfig().getMetricsConfig().setCollectionFrequencySeconds(1); //jetConfig.getMetricsConfig().setCollectionIntervalSeconds(1);Traverser type got a slight change in theÂflatMap lambda’s generic type wildcards. This change shouldn’t affect anything in practice.- In sources and sinks we changed the method signatures so that the lambda becomes the last parameter, where applicable.
JetBootstrap.getInstance() moved toÂJet.bootstrappedInstance() and now it automatically creates an isolated local instance when not running throughÂjet submit. If used fromÂjet submit, the behaviour remains the same.JobConfig.addResource(…​) is nowÂaddClasspathResource(…​).ResourceType,ÂResourceConfig andÂJobConfig.getResourceConfigs() are now labeled as private API and we discourage their direct usage. We also renamedÂResourceType.REGULAR_FILE toÂResourceType.FILE, but this is now an internal change.
Further help
In case you encounter any difficulties with migrating to Jet 4.0 feel free to contact us any time.