Questions? Feedback? powered by Olark live chat software
Document Mini Preview

Benchmark

Jet vs java.util.stream

Comparison

Overview

While Jet is a distributed execution engine, a community question prompted us to measure its performance on a single JVM versus the built-in java.util.Stream implementation which comes with Java 8. Ours is a distributed implementation.

Benchmark

Jet DAG was benchmarked against JDK j.u.s on the Word Count task. This is the setup:

  1. On the local SSD there are 97 text files of various sizes, containing 97 MB of text in total. See https://github.com/hazelcast/hazelcast-jet-code-samples/tree/master/books/src/main/resources/books
  2. There is a character input stream consisting of 97 lines of text, each being equal to the name of one file.
  3. The input stream must be read and each file named by it must be ingested and tokenized to words.
  4. For each distinct word its total number of occurrences across all files must be reported as the result of the computation.

Test Environment

These tests were performed on physical machines hosted at a data center.

Specification Description
Hardware MacBook Pro 2015
CPU 2.5GHz Intel Core i7-4870HQ 4 cores
Storage 512GB solid-state drive
OS MacOS Sierra
RAM 16 GB RAM
Java Oracle JDK 1.8.0_72.

Test Parameters/Code

The full code of the benchmarks is in the hazelcast-jet-code-samples repository on GitHub.

Here’s a quick breakdown of the code:

1. Top-level code that drives the benchmark:

// Warmup
measure();
measure();
measure();
List<Long> timings = new ArrayList<>();
for (int i = 0; i < 9; i++) {
    timings.add(measure());
    System.gc();
}
System.out.println(timings.stream().collect(summarizingLong(x -> x)));	

2. Common methods that create the character streams (both the one containing the filenames, and the one with the contents of a text document):

private static Stream<String> docFilenames() {
    final ClassLoader cl = WordCount.class.getClassLoader();
    final BufferedReader r = new BufferedReader(new
InputStreamReader(cl.getResourceAsStream(“books”), UTF_8));
    return r.lines().onClose(() -> close(r));
}
private static void close(Closeable c) {
    try {
        c.close();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}
private static Stream<String> bookLines(String name) {
    try {
        return
Files.lines(Paths.get(WordCountSingleNode.class.getResource(“books/” +
name).toURI()));
    } catch (IOException | URISyntaxException e) {
        throw new RuntimeException(e);
    } 
}	

3. The measure() function for JDK j.u.s:

private static long measure() {
    final Pattern delimiter = Pattern.compile(“\\W+”);
    long start = System.nanoTime();
    Map<String, Long> counts = docFilenames()
            // .collect(toList()).stream() -- see the Results section
for explanation
            .parallel()
            .flatMap(WordCountJdk::bookLines)
            .flatMap(line ->
Arrays.stream(delimiter.split(line.toLowerCase())))
            .filter(w -> !w.isEmpty())
            .collect(groupingBy(identity(), counting()));
    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}	

4. The measure() function for Jet DAG:

private long measure() throws InterruptedException, ExecutionException {
         final Map<String, Long> counts = new ConcurrentHashMap<>();
         final Job job = jet.newJob(buildDag(counts));
         long start = System.nanoTime();
         job.execute().get();
         return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
     }
     private static DAG buildDag(Map<String, Long> counts) {
         final Pattern delimiter = Pattern.compile(“\\W+”);
         DAG dag = new DAG();
         Vertex source = dag.newVertex(“source”, DocLinesP::new);
         Vertex tokenize = dag.newVertex(“tokenize”,
                 flatMap((String line) ->
     traverseArray(delimiter.split(line.toLowerCase()))
                                             .filter(word ->
     !word.isEmpty()))
         );
         Vertex reduce = dag.newVertex(“reduce”,
                 groupAndAccumulate(() -> 0L, (count, x) -> count + 1)
         );
         Vertex sink = dag.newVertex(“sink”, () -> new MapSinkP(counts));
         return dag.edge(between(source.localParallelism(1), tokenize))
.edge(between(tokenize, reduce).partitioned(wholeItem(),
.edge(between(reduce, sink));
HASH_CODE))
}
private static class DocLinesP extends AbstractProcessor {
    private final Traverser<String> docLines =
traverseStream(docFilenames().flatMap(WordCountSingleNode::bookLines));
    @Override
    public boolean complete() {
        return emitCooperatively(docLines);
    }
    @Override
    public boolean isCooperative() {
        return false;
    }
}
private static class MapSinkP extends AbstractProcessor {
    private final Map<String, Long> counts;
    MapSinkP(Map<String, Long> counts) {
        this.counts = counts;
}
    @Override
    protected boolean tryProcess(int ordinal, Object item) {
        final Entry<String, Long> e = (Entry<String, Long>) item;
        counts.put(e.getKey(), e.getValue());
			return true;
    }
}	

Results Summary

JDK’s code, as given above, will perform poorly because the engine will not be able to parallelize it. This is due to the fact that the character stream reports “unknown size” and the engine expects a known-size source which it can recursively split in half. The workaround for this is to load the whole stream into RAM and then run the stream job on the resulting ArrayList. This is achieved by uncommenting the line .collect(toL ist()).stream() and the table below shows the effects of this in the row “JDK j.u.s (pre-loaded)”.

Engine Time (Seconds) Throughput (MB/S)
avg min max avg
JDK j.u.s 3.97 3.91 4.03 24.4
Jet DAG 1.39 1.29 1.44 69.8
JDK j.u.s (pre-loaded) 1.16 1.09 1.31 83.6
Word count throughput

Get the Benchmark

Oops!

There's supposed to be a form right here, but its been hidden by your adblocker. Please disable your adblocker so you can get the benchmark you came for.

Hazelcast.com

Menu