Upgrading to Jet 4.0
As we have announced earlier, Jet 4.0 is out! In this blog post, we aim to give you the lower-level details needed for migrating from older versions.
Jet 4.0 is a major version release. According to the semantic versioning, we apply, this means that in version 4.0 some of the API has changed in a breaking way and code written for 3.x may no longer compile against it.
Jet on IMDG 4.0
Jet 4.0 uses IMDG 4.0, which is also a major release with its own breaking changes. For details see IMDG Release Notes and IMDG Migration Guides.
The most important changes we made and which have affected Jet too are as follows:
- We renamed many packages and moved classes around. For details see the IMDG Release Notes. The most obvious change is that many classes that used to be in the general
com.hazelcast.corepackage are now in specific packages likecom.hazelcast.maporcom.hazelcast.collection. com.hazelcast.jet.function, the package containing serializable variants ofjava.util.function, is now merged intocom.hazelcast.function: BiConsumerEx, BiFunctionEx,.
BinaryOperatorEx, BiPredicateEx, ComparatorEx, ComparatorsEx,
ConsumerEx, FunctionEx, Functions, PredicateEx, SupplierEx,
ToDoubleFunctionEx, ToIntFunctionEx, ToLongFunctionExEntryProcessorand several other classes and methods received a cleanup of their type parameters. See the relevant section in the IMDG Migration Guide.- The term “group” in configuration was replaced with “cluster”. See the code snippet below for an example. This changes a Jet Command Line parameter as well (
-g/--groupNamerenamed to-n/--cluster-name).clientConfig.setClusterName("cluster_name"); //clientConfig.getGroupConfig().setName("cluster_name") EventJournalConfigmoved from the top-level Config class to data structure-specific configs (MapConfig,CacheConfig):config.getMapConfig("map_name").getEventJournalConfig(); //config.getMapEventJournalConfig("map_name")ICompletableFuturewas removed and replaced with the JDK-standardCompletionStage. This affects the return type of async methods. See the relevant section in the IMDG Migration Guide.
Jet API Changes
We made multiple breaking changes in Jet’s own APIs too:
IMapJet,ICacheJetandIListJet, which used to be Jet-specific wrappers around IMDG’s standardIMap,ICacheandIList, were removed. The methods that used to return these types now return the standard ones.- Renamed
Pipeline.drawFromtoPipeline.readFromandGeneralStage.drainTo
toGeneralStage.writeTo:pipeline.readFrom(TestSources.items(1, 2, 3)).writeTo(Sinks.logger()); //pipeline.drawFrom(TestSources.items(1, 2, 3)).drainTo(Sinks.logger()); ContextFactorywas renamed toServiceFactoryand we added support for instance-wide initialization. createFn now takesProcessorSupplier.Contextinstead of justJetInstance. We also added convenience methods inServiceFactoriesto simplify constructing the common variants:ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8), ExecutorService::shutdown); //ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)).withLocalSharing(); ServiceFactories.nonSharedService(ctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"), ConsumerEx.noop()); //ContextFactory.withCreateFn(jet -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"))map/filter/flatMapUsingContextwas renamed tomap/filter/flatMapUsingService:pipeline.readFrom(TestSources.items(1, 2, 3)) .filterUsingService( ServiceFactories.sharedService(pctx -> 1), (svc, i) -> i % 2 == svc) .writeTo(Sinks.logger()); /* pipeline.drawFrom(TestSources.items(1, 2, 3)) .filterUsingContext( ContextFactory.withCreateFn(i -> 1), (ctx, i) -> i % 2 == ctx) .drainTo(Sinks.logger()); */filterUsingServiceAsynchas been removed. Usages can be replaced withmapUsingServiceAsync, which behaves like a filter if it returns anullfuture or the returned future contains anullresult:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Long> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0 ? item : null)); return f; }); /* stage.filterUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Boolean> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0)); return f; }); */flatMapUsingServiceAsynchas been removed. Usages can be replaced withmapUsingServiceAsyncfollowed by non-asyncflatMap:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<List<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(Arrays.asList(item + "-1", item + "-2", item + "-3"))); return f; }) .flatMap(Traversers::traverseIterable); /* stage.flatMapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Traverser<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(traverseItems(item + "-1", item + "-2", item + "-3"))); return f; }) */- The methods
withMaxPendingCallsPerProcessor(int)and
withUnorderedAsyncResponses()were removed fromServiceFactory. These properties are relevant only in the context of asynchronous operations and were used in conjunction withGeneralStage.mapUsingServiceAsync(…). In Jet 4.0 theGeneralStage.mapUsingServiceAsync(…)method has a new variant with explicit parameters for the above settings: -
stage.mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8)), 2, false, (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); /* stage.mapUsingContextAsync( ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)) .withMaxPendingCallsPerProcessor(2) .withUnorderedAsyncResponses(), (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); */ com.hazelcast.jet.pipeline.Sinks#mapWithEntryProcessorgot a new signature in order to accommodate the improvedEntryProcessor, which became more lambda-friendly in IMDG (see the relevant section in the IMDG Migration Guide). The return type ofEntryProcessoris now an explicit parameter inmapWithEntryProcessor‘s method signature:FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer, Void>> entryProcFn = entry -> (EntryProcessor<String, Integer, Void>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); /* FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer>> entryProcFn = entry -> (EntryProcessor<String, Integer>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); */- HDFS source and sink methods are now
Hadoop.inputFormatandHadoop.outputFormat. MetricsConfigis no longer part ofJetConfig, but resides in the IMDGConfigclass:jetConfig.getHazelcastConfig().getMetricsConfig().setCollectionFrequencySeconds(1); //jetConfig.getMetricsConfig().setCollectionIntervalSeconds(1);Traversertype got a slight change in theflatMaplambda’s generic type wildcards. This change shouldn’t affect anything in practice.- In sources and sinks we changed the method signatures so that the lambda becomes the last parameter, where applicable.
JetBootstrap.getInstance()moved toJet.bootstrappedInstance()and now it automatically creates an isolated local instance when not running throughjet submit. If used fromjet submit, the behaviour remains the same.JobConfig.addResource(…) is nowaddClasspathResource(…).ResourceType,ResourceConfigandJobConfig.getResourceConfigs()are now labeled as private API and we discourage their direct usage. We also renamedResourceType.REGULAR_FILEtoResourceType.FILE, but this is now an internal change.
Further help
In case you encounter any difficulties with migrating to Jet 4.0 feel free to contact us any time.