Upgrading to Jet 4.0
As we have announced earlier, Jet 4.0 is out! In this blog post, we aim to give you the lower-level details needed for migrating from older versions.
Jet 4.0 is a major version release. According to the semantic versioning, we apply, this means that in version 4.0 some of the API has changed in a breaking way and code written for 3.x may no longer compile against it.
Jet on IMDG 4.0
Jet 4.0 uses IMDG 4.0, which is also a major release with its own breaking changes. For details see IMDG Release Notes and IMDG Migration Guides.
The most important changes we made and which have affected Jet too are as follows:
- We renamed many packages and moved classes around. For details see the IMDG Release Notes. The most obvious change is that many classes that used to be in the general
com.hazelcast.core
package are now in specific packages likecom.hazelcast.map
orcom.hazelcast.collection
. com.hazelcast.jet.function
, the package containing serializable variants ofjava.util.function
, is now merged intocom.hazelcast.function: BiConsumerEx, BiFunctionEx,
.
BinaryOperatorEx, BiPredicateEx, ComparatorEx, ComparatorsEx,
ConsumerEx, FunctionEx, Functions, PredicateEx, SupplierEx,
ToDoubleFunctionEx, ToIntFunctionEx, ToLongFunctionExEntryProcessor
and several other classes and methods received a cleanup of their type parameters. See the relevant section in the IMDG Migration Guide.- The term “group” in configuration was replaced with “cluster”. See the code snippet below for an example. This changes a Jet Command Line parameter as well (
-g/--groupName
renamed to-n/--cluster-name
).clientConfig.setClusterName("cluster_name"); //clientConfig.getGroupConfig().setName("cluster_name")
EventJournalConfig
moved from the top-level Config class to data structure-specific configs (MapConfig
,CacheConfig
):config.getMapConfig("map_name").getEventJournalConfig(); //config.getMapEventJournalConfig("map_name")
ICompletableFuture
was removed and replaced with the JDK-standardCompletionStage
. This affects the return type of async methods. See the relevant section in the IMDG Migration Guide.
Jet API Changes
We made multiple breaking changes in Jet’s own APIs too:
IMapJet
,ICacheJet
andIListJet
, which used to be Jet-specific wrappers around IMDG’s standardIMap
,ICache
andIList
, were removed. The methods that used to return these types now return the standard ones.- Renamed
Pipeline.drawFrom
toPipeline.readFrom
andGeneralStage.drainTo
toGeneralStage.writeTo
:pipeline.readFrom(TestSources.items(1, 2, 3)).writeTo(Sinks.logger()); //pipeline.drawFrom(TestSources.items(1, 2, 3)).drainTo(Sinks.logger());
ContextFactory
was renamed toServiceFactory
and we added support for instance-wide initialization. createFn now takesProcessorSupplier.Context
instead of justJetInstance
. We also added convenience methods inServiceFactories
to simplify constructing the common variants:ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8), ExecutorService::shutdown); //ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)).withLocalSharing(); ServiceFactories.nonSharedService(ctx -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"), ConsumerEx.noop()); //ContextFactory.withCreateFn(jet -> DateTimeFormatter.ofPattern("HH:mm:ss.SSS"))
map/filter/flatMapUsingContext
was renamed tomap/filter/flatMapUsingService
:pipeline.readFrom(TestSources.items(1, 2, 3)) .filterUsingService( ServiceFactories.sharedService(pctx -> 1), (svc, i) -> i % 2 == svc) .writeTo(Sinks.logger()); /* pipeline.drawFrom(TestSources.items(1, 2, 3)) .filterUsingContext( ContextFactory.withCreateFn(i -> 1), (ctx, i) -> i % 2 == ctx) .drainTo(Sinks.logger()); */
filterUsingServiceAsync
has been removed. Usages can be replaced withmapUsingServiceAsync
, which behaves like a filter if it returns anull
future or the returned future contains anull
result:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Long> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0 ? item : null)); return f; }); /* stage.filterUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Boolean> f = new CompletableFuture<>(); executor.submit(() -> f.complete(item % 2 == 0)); return f; }); */
flatMapUsingServiceAsync
has been removed. Usages can be replaced withmapUsingServiceAsync
followed by non-asyncflatMap
:stage.mapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<List<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(Arrays.asList(item + "-1", item + "-2", item + "-3"))); return f; }) .flatMap(Traversers::traverseIterable); /* stage.flatMapUsingServiceAsync(serviceFactory, (executor, item) -> { CompletableFuture<Traverser<String>> f = new CompletableFuture<>(); executor.submit(() -> f.complete(traverseItems(item + "-1", item + "-2", item + "-3"))); return f; }) */
- The methods
withMaxPendingCallsPerProcessor(int)
and
withUnorderedAsyncResponses()
were removed fromServiceFactory
. These properties are relevant only in the context of asynchronous operations and were used in conjunction withGeneralStage.mapUsingServiceAsync(…)
. In Jet 4.0 theGeneralStage.mapUsingServiceAsync(…)
method has a new variant with explicit parameters for the above settings: -
stage.mapUsingServiceAsync( ServiceFactories.sharedService(ctx -> Executors.newFixedThreadPool(8)), 2, false, (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); /* stage.mapUsingContextAsync( ContextFactory.withCreateFn(jet -> Executors.newFixedThreadPool(8)) .withMaxPendingCallsPerProcessor(2) .withUnorderedAsyncResponses(), (exec, task) -> CompletableFuture.supplyAsync(() -> task, exec) ); */
com.hazelcast.jet.pipeline.Sinks#mapWithEntryProcessor
got a new signature in order to accommodate the improvedEntryProcessor
, which became more lambda-friendly in IMDG (see the relevant section in the IMDG Migration Guide). The return type ofEntryProcessor
is now an explicit parameter inmapWithEntryProcessor
‘s method signature:FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer, Void>> entryProcFn = entry -> (EntryProcessor<String, Integer, Void>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); /* FunctionEx<Map.Entry<String, Integer>, EntryProcessor<String, Integer>> entryProcFn = entry -> (EntryProcessor<String, Integer>) e -> { e.setValue(e.getValue() == null ? 1 : e.getValue() + 1); return null; }; Sinks.mapWithEntryProcessor(map, Map.Entry::getKey, entryProcFn); */
- HDFS source and sink methods are now
Hadoop.inputFormat
andHadoop.outputFormat
. MetricsConfig
is no longer part ofJetConfig
, but resides in the IMDGConfig
class:jetConfig.getHazelcastConfig().getMetricsConfig().setCollectionFrequencySeconds(1); //jetConfig.getMetricsConfig().setCollectionIntervalSeconds(1);
Traverser
type got a slight change in theflatMap
lambda’s generic type wildcards. This change shouldn’t affect anything in practice.- In sources and sinks we changed the method signatures so that the lambda becomes the last parameter, where applicable.
JetBootstrap.getInstance()
moved toJet.bootstrappedInstance()
and now it automatically creates an isolated local instance when not running throughjet submit
. If used fromjet submit
, the behaviour remains the same.JobConfig.addResource(…
) is nowaddClasspathResource(…
).ResourceType
,ResourceConfig
andJobConfig.getResourceConfigs()
are now labeled as private API and we discourage their direct usage. We also renamedResourceType.REGULAR_FILE
toResourceType.FILE
, but this is now an internal change.
Further help
In case you encounter any difficulties with migrating to Jet 4.0 feel free to contact us any time.