- What streams are
- Terminal and non-terminal operations
- Their “lazy” nature
- Their read-once nature
- Why they were introduced i.e. how they enable easy parallel operations
- Filter
- Map
- Flatmap
- Collect
Introduction to streams
To obtain a stream, you call the new stream() method that has been added to the Collection interface.Stream operations can be divided into two types:
Intermediate operations, that return a stream:
- filter
- skip
- limit
- map
- flatMap
- distinct
- sorted
Terminal operations, that return some kind of result
- anyMatch – boolean
- noneMatch – boolean
- allMatch – boolean
- findAny – Optional
- findFirst – Optional
- forEach – void, e.g. print
- collect
- reduce
The idea behind streams is that you can build up a pipeline of operations by calling multiple intermediate operations, and then finally a terminal operation to obtain a result.
It’s important to note that streams have two important differences compared to collections: Firstly, unlike a collection, which is essentially a set of data in memory, stream elements are only produced one at a time, as you iterate over the stream. This is referred to as the “lazy” nature of streams. Imagine you have a large dataset of a million elements in memory and you create a stream backed by this dataset. If every time you called an intermediate operation, the entire dataset was iterated, this would be hugely inefficient. Rather, you can think of the intermediate operations as recording that an operation needs to be performed, but deferring the actual execution of that operation until you call a terminal method. At this point, the stream is iterated, and each intermediate operation is evaluated. Secondly, you can only read from a stream once. This differs from e.g. Scala, in which you can read a stream as many times as you like. There is a great stackoverflow answer from one of the stream API designers that explains why this design choice was taken, but it is a bit of a monster, so I’ll summarise it:
- You can use streams for things other than collections, that genuinely are read once. e.g. read a file with BufferedReader, which has a lines() method returning Stream
. - Allowing two types of stream, one lazy and the other not, creates its own problems. e.g.
- In Scala you can have bugs if your code attempts to read a stream twice when in fact it has been passed a once-off stream implementation.
- Collection classes optimise some operations by storing / caching data. e.g. calling size() on a collection returns a cached size value. Calling size() on a filtered collection would take O(n) time, as it would have to apply the filter to the collection.
- If you pass round a lazy stream and use it multiple times, each time you operate on it, the entire set of operations need to be evaluated.
Why were streams introduced?
To me, the advantages of streams can be summed up as three points:- Using functional style methods is clearer. i.e. if you use a filter method, someone can see at a glance what you are doing
- Because streams are lazy, you can pass streams around between methods, classes or code modules, and apply operations to the stream, without anything being evaluated until you need it to be.
- Streams processing can be done in parallel, by calling the parallelStream method. i.e. because you aren’t manually iterating over a collection and performing arbitrary actions, but instead calling well defined methods on the stream such as map and filter, the stream classes know how to split themselves up into separate streams to be processed on different processors, and then recombine the results.
Stream operations
To give my code examples, I’m going to use examples from two domains:- Insurance – this is the domain I work in. Here we have insurance claims, which could be of different types (e.g. motor, household), have jobs attached to the them (e.g. motor repair, solicitor, loss adjuster) and payments made.
- Restaurant menu – this is what Java 8 In Action use for their examples.
Filter
I think filter is a great operation to start with, it’s a very common thing to do and a nice intro to stream syntax. In my code examples, if you open the StreamExamples class and find the filter method, you can see the syntax for filtering a collection of claims to motor claims only:
Stream<Claim> motorClaims = claims.stream().filter(claim -> claim.getProductType().equals(Claim.PRODUCT_TYPE.MOTOR));
Stream<Claim> paymentsOver1000 = claims.stream().filter(claim -> claim.getTotalPayments() > 1000);Stream<Claim> twoOrMore = claims.stream().filter(claim -> claim.getJobs().size() >= 2);Map
The map operation means “map” in the mathematical sense – that of mapping one value to another. Suppose we have a stream of Claim objects, but what we need is claim ids? Just map the stream like this:Stream<Long> claimIds = claims.stream().map(claim -> claim.getId());Stream<Long> claimIds2 = claims.stream().map(Claim::getId);Stream<Long> motorClaimIds = claims.stream()
.filter(claim -> claim.getProductType().equals(Claim.PRODUCT_TYPE.MOTOR))
.map(Claim::getId);Note that you don’t just have to “extract” values during a map operation, you can also create new objects. For example, you might convert from domain objects to DTOs, like this:
Stream<ClaimDTO> claimDTOs = claims.stream().map(claim -> new ClaimDTO(claim.getId(), claim.getTotalPayments()));FlatMap
Suppose you want to get a stream or collection of all of the jobs attached to a list of claims. You might start with a map operation, like this:claims.stream().map(Claim::getJobs)Stream<Job> jobs = claims.stream().map(Claim::getJobs).flatMap(Set::stream);Stream<SideOrder> sideOrdersOver750 = menu.stream().filter(dish -> dish.getCalories() > 750).map(Dish::getSideOrders).flatMap(Set::stream);
Collect
The three operations we have covered so far are all intermediate operations. They operate on a stream and return a stream. When you want to convert your stream back into a collection, you will want to call the collect method. There a large number of variations as to how you collect, and this choice can be a bit bewildering at first, so I want to show a good number of examples here to help you get familiar with what is available to you.Firstly, let’s start with the simplest possible collect operations, to a set, list or map. Here is what you could do if you want a stream of motor claims collected to one of these types:
Set<Claim> motorClaimSet = claims.stream().
filter(claim -> claim.getProductType().equals(Claim.PRODUCT_TYPE.MOTOR)).
collect(Collectors.toSet());
List<Claim> motorClaimList = claims.stream().
filter(claim -> claim.getProductType().equals(Claim.PRODUCT_TYPE.MOTOR)).
collect(Collectors.toList());
// to a map (grouping by unique key)
Map<Long,Claim> motorClaimMap = claims.stream().
filter(claim -> claim.getProductType().equals(Claim.PRODUCT_TYPE.MOTOR)).
collect(Collectors.toMap(Claim::getId, Function.<Claim>identity()));Map<Claim.PRODUCT_TYPE,List<Claim>> claimsByType = claims.stream().collect(groupingBy(Claim::getProductType));Map<Claim.PRODUCT_TYPE,Map<String,List<Claim>>> claimsByTypeAndPayment =
claims.stream()
.collect(
groupingBy(Claim::getProductType,
groupingBy(claim -> {
if (claim.getTotalPayments() > 1000) {
return "HIGH";
}
else {
return "LOW";
}
})
));Map<SideOrder.Type,List<Integer>> sideOrderCalories =
menu.stream()
.map(Dish::getSideOrders)
.flatMap(Set::stream)
.collect(groupingBy(SideOrder::getType, mapping(SideOrder::getCalories, toList())));Map<Boolean,List<Dish>> veggieAndNonVeggie = menu.stream().collect(partitioningBy(Dish::isVegetarian));int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));
double totalPayments = claims.stream().collect(summingDouble(Claim::getTotalPayments));
double averagePayment = claims.stream().collect(averagingDouble(Claim::getTotalPayments));DoubleSummaryStatistics paymentStats =
claims.stream().collect(summarizingDouble(Claim::getTotalPayments));
totalPayments = paymentStats.getSum();
averagePayment = paymentStats.getAverage();String claimIdListAsCommaSeparatedString = claims.stream().map(claim -> claim.getId().toString()).collect(joining(","));