Jet vs java.util.Stream

Benchmark
/ PDF

Comparison

Overview

While Jet is a distributed stream processing framework, a community question prompted us to measure its performance on a single JVM versus the built-in java.util.Stream implementation which comes with Java 8. Ours is a distributed implementation.

Jet vs java.util.Stream Benchmark

Jet DAG was benchmarked against JDK java.util.Stream on the Word Count task. This is the setup:

  1. On the local SSD there are 97 text files of various sizes, containing 97 MB of text in total. The entire dataset is hosted on our GitHub repository.
  2. There is a character input stream consisting of 97 lines of text, each being equal to the name of one file.
  3. The input stream must be read and each file named by it must be ingested and tokenized to words.
  4. For each distinct word its total number of occurrences across all files must be reported as the result of the computation.

Test Environment

The benchmark was executed on a single physical machine with the specs given below.

Specification Description
Hardware MacBook Pro 2015
CPU 2.5GHz Intel Core i7-4870HQ 4 cores
Storage 512GB solid-state drive
OS MacOS Sierra
RAM 16 GB RAM
Java Oracle JDK 1.8.0_72.

Test Parameters/Code

The full code of the java.util.Stream benchmarks are in the hazelcast-jet-code-samples repository on GitHub.

Here’s a quick breakdown of the code:

1. Top-level code that drives the benchmark:

// Warmup
measure();
measure();
measure();
List<Long> timings = new ArrayList<>();
for (int i = 0; i < 9; i++) {
timings.add(measure());
System.gc();
}
System.out.println(timings.stream().collect(summarizingLong(x -> x)));

2. Common methods that create the character streams (both the one containing the filenames, and the one with the contents of a text document):

private static Stream<String> docFilenames() {
final ClassLoader cl = WordCount.class.getClassLoader();
final BufferedReader r = new BufferedReader(new
InputStreamReader(cl.getResourceAsStream(“books”), UTF_8));
return r.lines().onClose(() -> close(r));
}
private static void close(Closeable c) {
try {
c.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private static Stream<String> bookLines(String name) {
try {
return
Files.lines(Paths.get(WordCountSingleNode.class.getResource(“books/” +
name).toURI()));
} catch (IOException | URISyntaxException e) {
throw new RuntimeException(e);
}
}

3. The measure() function for JDK java.util.Stream:

private static long measure() {
final Pattern delimiter = Pattern.compile(“\W+”);
long start = System.nanoTime();
Map<String, Long> counts = docFilenames()
// .collect(toList()).stream() -- see the Results section
for explanation
.parallel()
.flatMap(WordCountJdk::bookLines)
.flatMap(line ->
Arrays.stream(delimiter.split(line.toLowerCase())))
.filter(w -> !w.isEmpty())
.collect(groupingBy(identity(), counting()));
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}

4. The measure() function for Jet DAG:

private long measure() throws InterruptedException, ExecutionException {
final Map<String, Long> counts = new ConcurrentHashMap<>();
final Job job = jet.newJob(buildDag(counts));
long start = System.nanoTime();
job.execute().get();
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}
private static DAG buildDag(Map<String, Long> counts) {
final Pattern delimiter = Pattern.compile(“\W+”);
DAG dag = new DAG();
Vertex source = dag.newVertex(“source”, DocLinesP::new);
Vertex tokenize = dag.newVertex(“tokenize”,
flatMap((String line) ->
traverseArray(delimiter.split(line.toLowerCase()))
.filter(word ->
!word.isEmpty()))
);
Vertex reduce = dag.newVertex(“reduce”,
groupAndAccumulate(() -> 0L, (count, x) -> count + 1)
);
Vertex sink = dag.newVertex(“sink”, () -> new MapSinkP(counts));
return dag.edge(between(source.localParallelism(1), tokenize))
.edge(between(tokenize, reduce).partitioned(wholeItem(),
.edge(between(reduce, sink));
HASH_CODE))
}
private static class DocLinesP extends AbstractProcessor {
private final Traverser<String> docLines =
traverseStream(docFilenames().flatMap(WordCountSingleNode::bookLines));
@Override
public boolean complete() {
return emitCooperatively(docLines);
}
@Override
public boolean isCooperative() {
return false;
}
}
private static class MapSinkP extends AbstractProcessor {
private final Map<String, Long> counts;
MapSinkP(Map<String, Long> counts) {
this.counts = counts;
}
@Override
protected boolean tryProcess(int ordinal, Object item) {
final Entry<String, Long> e = (Entry<String, Long>) item;
counts.put(e.getKey(), e.getValue());
return true;
}
}

Results Summary

JDK’s code, as given above, will perform poorly because the engine will not be able to parallelize it. This is due to the fact that the character stream reports “unknown size” and the engine expects a known-size source which it can recursively split in half. The workaround for this is to load the whole stream into RAM and then run the stream job on the resulting ArrayList. This is achieved by uncommenting the line .collect(toL ist()).stream() and the table below shows the effects of this in the row “JDK java.util.Stream (pre-loaded)”.

Engine Time (Seconds) Throughput (MB/S)
avg min max avg
JDK java.util.Stream 3.97 3.91 4.03 24.4
Jet DAG 1.39 1.29 1.44 69.8
JDK java.util.Stream (pre-loaded) 1.16 1.09 1.31 83.6