Java Stream Gatherers

Shazin Sadakath
2 min readSep 25, 2024

--

Java Streams which was introduced in Java 8 enables developers declaratively code what needs to be done instead of how it needs to be done. Similar to SQL, this declarative style of coding reduces developer introduced errors which can occur in imperative (how to do) style of coding.

Java Streams has three main segments.

  1. A Stream of Elements which is lazily evaluated (Stream, IntStream, etc)
  2. A set of Intermediary operations which are also lazily evaluated (map, filter, flatMap, etc)
  3. A set of Terminal operations which begins evaluating the stream (collect, findFirst, count, etc)

An example would look like below;

List<Integer> evenNumbers = Stream.of(1,2,3,4,5) // (1)
.filter(n -> n % 2 == 0) // (2)
.collect(Collectors.toList()); // (3)

But there are a very limited number of intermediary operations and there is no way to include custom code to do any filtering or transformation while preserving state. This is the exact problem Java Stream Gatherers extension point is trying to address as part of Java 23 release.

Let’s look at this with an example. Let’s say we have the following List.

List<String> values = Arrays.asList("1", "2", "3");

And we want to concat all these values to a single value and have it in a List like following:

["123"]

Before Stream Gatherers was introduced Java Developers were stuck with writing a Collector.of with their own implementation like below

List<String> concatList = values.stream().collect(Collector.of(() -> new ArrayList<String>(), 
(collection, nextValue) -> {
String value = "";
if (!collection.isEmpty()) {
value = collection.remove(0);
}
value += nextValue;
collection.add(value);
}, (left, right) -> {
throw new UnsupportedOperationException();
}));

There are some problems related to this. First the custom code may cause errors as we are writing this. Now this stream can’t be parallelised as it is difficult for us to determine how to combine sub lists of the main list together as they may come out of order.

This is where Java Stream Gatherer comes to the rescue. We can achieve the same result with this concise code which can also be parallelised.

List<String> concatList = values.stream()
.gather(Gatherers.fold(() -> "", (n1, n2) -> n1 + n2))
.collect(Collectors.toList())

The gather acts as an extensible intermediate operation for which we can submit our own operation or built in operations available in Gatherers which implements the Gatherer interface. This Gatherer interface provides initializer, integrator and finisher extension points which can be used to implement any custom logic.

Some more builtin Gatherers which addresses common tasks.

List<String> values = Arrays.asList("1", "2", "3");

// Process the stream and group in to sliding windows of 2 items.
System.out.println(values.stream().gather(Gatherers.windowSliding(2)).collect(Collectors.toList()));
//[[1, 2], [2, 3]]

// Process the stream and group in to windows of fixed 2 items.
System.out.println(values.stream().gather(Gatherers.windowFixed(2)).collect(Collectors.toList()));
//[[1, 2], [3]]

// Process the stream Parallely with maximum of 2 Virtual Threads.
System.out.println(values.stream().gather(Gatherers.mapConcurrent(2, (n1) -> n1 + "n")).collect(Collectors.toList()));
//[1n, 2n, 3n]

References

JEP 461: Stream Gatherers (Preview) — https://openjdk.org/jeps/461

--

--

Shazin Sadakath
Shazin Sadakath

No responses yet