C++ Streams API

Streams is a C++ library that provides lazy evaluation and functional-style transformations on data, to ease the use of C++ standard library containers and algorithms. Streams support many common functional operations such as map, filter, and reduce, as well as various other useful operations such as various set operations (union, intersection, difference), partial sum, and adjacent difference, as well as many others. Please visit the Github page to download and try it yourself.

Streams is developed by Jonah Scheinerman, and is always looking to improve. Please get in touch if you find bugs or have ideas on how to make Streams even better.

To use streams download them from Github. To use, simply #include "Stream.h". Streams require C++14 to compile. Streams consist only of header files, so you shouldn't have to modify your build process to use them.

Introduction

Note: Everything in the Streams library is enclosed within the stream namespace. Additionally, all stream operators and (terminal and intermediate) are in the stream::op namespace, which has not been hidden. For the sake of readability and brevity, these have been excluded here (assume an implicit using namespace stream; and a using namespace stream::op in all of the examples).

Streams are an abstraction on a set of data that has a specific order (be it a strict ordering or just some encounter order). Various operations can be applied to streams such that data passes through the stream pipeline in a lazy manner, only getting passed to then next operation when it is requested. There are 3 main kinds of stream operations: generators, intermediate operators, and terminal operators.

A stream is a single source of data, and thus will uniquely own its data source. There is no way of copying streams without accruing a lot of state and thus streams are not copiable. However, streams are movable. Moving a stream will result in the source stream being "vacant." One can check the vacancy of a stream using the occupied() method. Attempting to apply any stream operation to a vacant stream will result in a VacantStreamException being thrown. Additionally, all intermediate operations will create new streams which now own the original data source of the calling stream. Thus calling and intermediate operation on a stream will result in the original stream being vacant. See the example for occupied().

Streams can be iterated through in the standard C++ way using the begin() and end() methods. However, one should be careful when using these (as they don't work exactly like standard library iterators), and using these may not be as efficient. For example, even though it may be nicer to iterate through a stream via a range-for loop, it may be more efficient to use the for_each() method, as it does not have to incur the overhead cost of making stream iterators safe to use.

Operator Application and Composition

Stream operators are objects that can be applied to streams to alter the contents (usually) in a lazy value, or to compute a terminal result. Most of the library consists of functions which return stream operators that can be applied to a stream using the bitwise or operator, | as follows: stream | streamop. These operations can be chained: stream | op1 | op2 | ... | opN. Most operations can applied to streams of many types.

Stream opeartors can also be composed with one another, to create new stream operators. Thus, application of streams is associative. For example, stream | op1 | op2 | op3 is equivalent to stream | (op1 | op2 | op3) or stream | (op1 | op2) | op3 or stream | op1 | (op2 | op3). This means the creation of new stream operators is quite easy:

            auto square_and_sum = map_([] (auto x) { return x * x; }) | sum();
        

Specializations

Streams are specialized based on the element type of the stream to allow extended functionality based on that type. Specializations for most types are listed in their relevant sections. However, for class types, there are specializations that are further reaching, and we shall discuss here. For any method that takes function of the element type (e.g. map_(), filter(), take_while(), to name a few), there is a specialization that allows for passing member function pointers to the function. For example consider the following example:

            struct Thing {
    int x;
    int value() const { return x }
};
Stream<Thing> things = /* ... */
        

In this case, all of the following are equivalent, with the last one being the one that is provided by the stream specialization for class types:

        auto values = things | map_([](Thing&& thing) { return thing.value(); });
auto values = things | map_(std::mem_fn(&Thing::value));
auto values = things | map_(&Thing::value);
    
The full set of operators with this property is as follows:

Methods

These are methods on the stream class that do not directly effect the data in the stream, and thus are not considered stream operators.

::begin() Stream method
Method signatures:
                iterator Stream<T>::begin();
            
Returns an iterator to the beginning of the stream. Iterators for streams have all of the standard iterator operator overloads, and act for the most part just like normal forward iterators. However, there are some caveats to their use.

First of all, when getting a starting iterator from begin(), you have not yet consumed anything in the stream. But, as soon as you perform some operation (dereference, increment, test equality or inequality) on this. iterator, the first element of the stream (if there is one) will be consumed and this iterator will resolve itself to its actual value. Example:
                auto stream = MakeStream::counter(1)
    | peek([](int x) { std::cout << "Peek = " << x << "\n" });
auto iter = stream.begin();
int value = *iter;
std::cout << "Iter = " << value << "\n";
            
Produces the following output:
                Peek = 1
Iter = 1
            
Second, be very careful when using the return value of the postfix increment operator. The return value of the postfix increment of an iterator is an iterator that is consider to be "consumed," meaning you can dereference it, but you can't do anything else with it, you can't increment it or check its equality.

Why is this? Well the idea is that we don't want to split the stream or copy it. If you had two independent iterators that were both attempting to iterate through the same stream, very strange behavior could occur, because a stream isn't a true container, its simply an wrapper around a next method. This isn't to say getting two iterators is impossible (simply call begin() twice). However, we want to safeguard against this type of behavior. Attempting to perform unauthorized actions on a consumed iterator results in a ConsumedIteratorException. Example:
                auto stream = MakeStream::closed_range(1, 10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
    ++temp_iter;
} catch(ConsumedIteratorException& e) {
    cout << e.what() << endl;
}
            
Produces the following output:
                1
2
Cannot perform prefix increment on consumed stream iterator.
            
That being said, using stream iterators should be fine with most if not all standard library algorithms. However, be cognizant of the fact that you are paying a slight overhead cost for using an iterator, so if speed is your concern, use a reduction method instead of something that uses iterators. For example, even though this is prettier:
                for(auto element : stream) {
    /* do something */
}
            
This is more efficient:
                stream | for_each([](auto element) {
    /* do something */
});
            
The choice is yours.
::end() Stream method
Method signatures:
                iterator Stream<T>::end();
            
Returns an iterator to something one past the end of the stream. Since the end of the stream is unknown, this is simply a sentinel iterator. For a discussion of the intricacies of using iterators see the begin() method.
::occupied() Stream method
Method signatures:
                bool Stream<T>::occupied();
            
Returns true if the stream object owns stream data. Streams are only movable (not copiable), and moving a stream results in a "vacant" stream (calling this method will return false). Attempting to perform any stream operation on a vacant stream will result in a VacantStreamException. Similarly, every stream operation returns a new stream that owns the data of the stream(s) it was called on. Thus attempting to do two operations on the same stream will result in an exception being thrown.

Example:
                Stream<int> stream1 = MakeStream::counter(1);
std::cout << std::boolalpha;
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
Stream<int> stream2 = stream1 | limit(10);
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
std::cout << "Stream 2 occupied: " << stream2.occupied() << std::endl;
try {
    stream1 | filter([] (int x) { return x % 2 == 0; })
} catch(VacantStreamException& e) {
    std::cout << e.what() << std::endl;
}
            
Produces the following output:
                Stream 1 occupied: true
Stream 1 occupied: false
Stream 2 occupied: true
Cannot call stream::op::filter on a vacant stream
            
::close() Stream method
Method signature:
                void Stream<T>::close();
            
Closes a stream, causing all of its unevaluated data to be lost, and the stream will be but into a vacant state. This is called automatically after any terminal operation.

Example:
                auto s = MakeStream::counter(1);
s.close();
s.occupied(); // returns false
            
::pipeline() Stream method
Method signature:
                std::string Stream<T>::pipeline();
            
Returns a string representation of the stream pipeline, including all of the basic transformations on the stream and all sources, the number of pipelines, and the number of stream sources.

Example:
                std::vector<std::string> v = /* ... */
auto s = MakeStream::from(v)
    | limit(100)
    | filter([](std::string& s) { return !s.empty(); })
    | map_([](std::string& s) { return s[0]; })
    | zip_with(MakeStream::counter(1) * 5)
    | zip_with(MakeStream::repeat(10));
std::cout << s.pipeline() << std::endl
            
Produces the following output:
                > Zip:
  > Zip:
    > Map:
      > Filter:
        > Slice[0, 100, 1]:
          > [iterator stream]
    > Map:
      > [iterated stream]
  > [repeated value stream]
Stream pipeline with 6 stages and 3 sources.
            

Stream Generators

The stream generator factory methods can all be found as static methods in the MakeStream class. These are not wrapped in the stream, so that they can perform type deduction for you.

Be careful when using factory methods that draw from referenced data sources (for example, MakeStream::from(const Container&)). These are perfectly safe to use if the usage of the stream is entirely contained with in the current scope. However, if the scope is left, and the stream is referencing a container within that scope, bad stuff can happen. In this case, prefer the MakeStream::from_move() generator.

::empty() Stream generator
Method signature:
                template<typename T>
Stream<T> MakeStream::empty();
            
Creates an empty stream of the given type. Calling MakeStream::empty<T>() is equivalent to default constructing a stream.

Example:
                MakeStream::empty<int>() | count(); // 0
            
::singleton() Stream generator
Method signature:
                template<typename T>
Stream<T> MakeStream::singleton(T&& value);
            
Creates a stream with a single given value.

Example:
                MakeStream::singleton(5) | to_vector(); // {5}
            
::from() Stream generator
Method signatures (template definitions excluded for brevity):
                Stream<T> MakeStream::from(Iterator begin, Iterator end);
Stream<T> MakeStream::from(const Container& cont);
Stream<T> MakeStream::from(T* arr, size_t length);
Stream<T> MakeStream::from(std::initializer_list<T> init);
            
Creates a stream from some existing set of data, a container for which std::begin(), and std::end() are defined, a C-style array or an initializer list.

Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the from_move() generator method.

Example:
                std::vector<int> x = {1, 3, 4, 2};
MakeStream::from(x);
MakeStream::from(x.begin(), x.end())
int arr[4] = {1, 3, 4, 2};
MakeStream::from(arr, 4);
MakeStream::from({1, 3, 4, 2});
            
::from_move() Stream generator
Method signature:
                template<typename Container>
Stream<T> MakeStream::from_move(Container&& cont);
            
Creates a stream from a container of data, moving that container into itself, so that the stream may be used safely outside of the current scope.

Example:
                #include "Stream.h"
#include <vector>
#include <iostream>

std::vector<int> make_vector() {
    return {1, 2, 3};
}

Stream<int> make_stream_safe1() {
    return MakeStream::from_move(make_vector());
}

Stream<int> make_stream_safe2() {
    std::vector<int> vec = {4, 5, 6};
    return MakeStream::from_move(vec);
}

Stream<int> make_stream_unsafe() {
    std::vector<int> vec = {7, 8, 9};
    return MakeStream::from(vec); // BAD!
}

int main(int argc, char const *argv[]) {
    (make_stream_safe1()  | print_to(std::cout)) << std::endl;
    (make_stream_safe2()  | print_to(std::cout)) << std::endl;
    (make_stream_unsafe() | print_to(std::cout)) << std::endl;
}
            
Produces the following output (on one run on my computer):
                1 2 3
4 5 6
7 -536870912 -2026110050
            
Rule of thumb: If you're only going to be accessing data through the stream, probably just use from_move().
::repeat() Stream generator
Method signatures:
                template<typename T>
Stream<T> MakeStream::repeat(T&& value);
template<typename T>
Stream<T> MakeStream::repeat(T&& value, size_t times);
            
Creates a stream consisting of the same value repeated over and over again. The first method creates an infinite stream of the repeated value. The second method only repeats the value a fixed number of times. Calling MakeStream::repeat(x, n) is equivalent to calling MakeStream::repeat(x) | limit(n).

Example:
                auto s = MakeStream::from({ /* ... */ })
    | concat(MakeStream::repeat(0)) // Stream padded with 0's
            
::cycle() Stream generator
Method signatures (template definitions excluded for brevity):
                Stream<T> MakeStream::cycle(Iterator begin, Iterator end);
Stream<T> MakeStream::cycle(Iterator begin, Iterator end, size_t times);
Stream<T> MakeStream::cycle(const Container& cont);
Stream<T> MakeStream::cycle(const Container& cont, size_t times);
Stream<T> MakeStream::cycle(std::initializer_list<T> init);
Stream<T> MakeStream::cycle(std::initializer_list<T> init, size_t times); 
            
Creates a stream of a sequence of elements repeated over and over again. The signatures without a times parameter will loop over a range indefinitely, whereas the ones with the parameter will only loop over the sequence that many times.

Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the cycle_move() generator method.

Example:
                vector x{1, 3, 8};
MakeStream::cycle(x.begin(), x.end()) // Contains 1, 3, 8, 1, 3, 8, 1, ...
MakeStream::cycle(x, 2) // Contains 1, 3, 8, 1, 3, 8.
            
::cycle_move() Stream generator
Method signatures:
                template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont);
template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont, size_t times)
            
Creates a stream of a sequence of elements repeated over and over again. The signatures without a times parameter will loop over a range indefinitely, whereas the ones with the parameter will only loop over the sequence that many times.



Example:
                #include "Stream.h"
#include <vector>
#include <iostream>

std::vector<int> make_vector() {
    return {1, 2, 3};
}

Stream<int> make_stream_safe1() {
    return MakeStream::cycle_move(make_vector(), 2);
}

Stream<int> make_stream_safe2() {
    std::vector<int> vec = {4, 5, 6};
    return MakeStream::cycle_move(vec, 2);
}

Stream<int> make_stream_unsafe() {
    std::vector<int> vec = {7, 8, 9};
    return MakeStream::cycle(vec, 2); // BAD!
}

int main(int argc, char const *argv[]) {
    (make_stream_safe1()  | print_to(std::cout)) << std::endl;
    (make_stream_safe2()  | print_to(std::cout)) << std::endl;
    (make_stream_unsafe() | print_to(std::cout)) << std::endl;
}
            
Produces the following output (on one run on my computer):
                1 2 3 1 2 3
4 5 6 4 5 6
0 -1879048192 0 0 -1879048192 0
            
::generate() Stream generator
Method signature:
                template<typename Generator>
Stream<T> MakeStream::generate(Generator&& generator);
            
Creates a stream whose values the return values of repeated calls to the generate function with no arguments.

Example:
                MakeStream::generate(rand); // Stream of random integers
            
::iterate() Stream generator
Method signature:
                template<typename T, typename Function>
Stream<T> MakeStream::iterate(T&& value, Function&& function);
            
This is special case of recurrence() that creates a stream that, given a value x and a function f returns the stream produced by x, f(x), f(f(x)) and so on. In the below example we produce a stream which investigates the Collatz conjecture.

Example:
                auto stream = MakeStream::iterate(1245, [](int x) {
    if(x % 2 == 0) {
        return x / 2;
    } else {
        return 3 * x + 1;
    }
});
            
Note that iterate(n, f) is a convenience for recurrence(f, n).
::recurrence() Stream generator
Method signature:
                template<typename... Args, typename Function>
Stream<T> MakeStream::recurrence(Function&& function, Args&&... initial);
            
A more general version of iterate(), creates an infinite stream that is a recurrence relation starting with some initial set of values. For example for a second order recurrence relation (two initial values), the output of the stream would be: a1, a2, a3 = f(a1, a2), a4 = f(a2, a3), a5 = f(a3, a4) ...

Example:
                MakeStream::recurrence(std::plus<int>(), 0, 1); // 0 1 1 2 3 5 8 ...
            
Note that in the case that sizeof...(Args) == 1, this is identical to iterate().
::counter() Stream generator
Method signatures:
                template<typename T>
Stream<T> MakeStream::counter(T&& start);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, const U& increment);
            
Creates a stream of elements produced by incrementing a given element indefinitely. Incrementing in the case of the first method is done via a prefix increment operator. The last two methods increment by a fixed value each time, by adding the increment to the current value (on the right side of the operator).

Example:
                MakeStream::counter(1); // 1, 2, 3, 4, ...
MakeStream::counter('A', 2) // A, C, E, G, ...
            
::range() Stream generator
Method signatures:
                template<typename T>
Stream<T> MakeStream::range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, const U& increment);
            
Creates a stream of elements that iterate through a range starting at lower, up to but not including upper. Testing against the upper bound is done via the built in != operator. The rules of incrementation for this method are identical to those of counter().

Example:
                MakeStream::range(0, 5); // 0, 1, 2, 3, 4
MakeStream::range(0, 8, 2); // 0, 2, 4, 6 
            
::closed_range() Stream generator
Method signatures:
                template<typename T>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, const U& increment);
            
Creates a stream of elements that iterate through a range starting at lower, up and including upper. Testing against the upper bound is done via the built in <= operator. The rules of incrementation for this method are identical to those of counter().

Example:
                MakeStream::closed_range(1, 5); // 1, 2, 3, 4, 5
MakeStream::closed_range(0, 8, 2); // 0, 2, 4, 6, 8
            
::coin_flips() Stream generator
Method signature:
                template<typename T = bool,
         typename Engine = std::default_random_engine>
Stream<T> MakeStream::coin_flips();
template<typename T = bool,
         typename Engine = std::default_random_engine,
         typename Seed>
Stream<T> MakeStream::coin_flips(Seed&& seed);
            
Creates an infinite stream of random integers whose values are uniformly 0 or 1. The user can specify the underlying random engine which defaults to std::default_random_engine and the initial seed of the random number generator. If not given, the seed will be initialized to the current time. By default, this returns a stream of bools, though the type can be specified as any time which makes sense.

Example:

The following runs a test to see how many coin flips come up heads after 1000 flips.
                int heads = MakeStream::coin_flips()
    | limit(1000)
    | filter()
    | count();
            
::uniform_random_ints() Stream generator
Method signature:
                template<typename T,
         typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper);
template<typename T,
         typename Engine = std::default_random_engine,
         typename Seed>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper, Seed&& seed);
            
Creates an infinite stream of random integers whose values are distributed uniformly between the upper and lower bounds. The user can specify the underlying random engine which defaults to std::default_random_engine and the initial seed of the random number generator. If not given, the seed will be initialized to the current time.

Example:
                MakeStream::uniform_random_ints(0, 10);
            
::uniform_random_reals() Stream generator
Method signature:
                template<typename T = double,
         typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_reals(T lower = 0.0, T upper = 1.0);
template<typename T,
         typename Engine = std::default_random_engine,
         typename Seed>
Stream<T> MakeStream::uniform_random_reals(T lower, T upper, Seed&& seed);
            
Creates an infinite stream of random real numbers whose values are distributed uniformly between the upper and lower bounds. The first signature defaults the lower and upper bounds to 0 and 1 respectively. The user can specify the underlying random engine which defaults to std::default_random_engine and the initial seed of the random number generator. If not given, the seed will be initialized to the current time.

Example:
                MakeStream::uniform_random_reals(); // Doubles between 0 and 1
MakeStream::uniform_random_reals<float>(.3, .7); // Floats between .3 and .7
            
::normal_randoms() Stream generator
Method signature:
                template<typename T = double,
         typename Engine = std::default_random_engine>
Stream<T> MakeStream::normal_randoms(T mean = 0.0, T stddev = 1.0);
template<typename T = double,
         typename Engine = std::default_random_engine,
         typename Seed>
Stream<T> MakeStream::normal_randoms(T mean, T stddev, Seed&& seed);
            
Creates an infinite stream of random real numbers whose values are normally distributed with a given mean and stand deviation. The first signature defaults the parameters to the standard normal distribution, μ = 0 and s = 1. The user can specify the underlying random engine which defaults to std::default_random_engine and the initial seed of the random number generator. If not given, the seed will be initialized to the current time.

Example:
                MakeStream::normal_randoms(); // Doubles distributed normally
MakeStream::normal_randoms<float>(5.3, 1.8); // Floats distributed normally.
            
::randoms() Stream generator
Method signatures:
                template<typename T,
         template<typename> class Distribution,
         typename Engine = std::default_random_engine,
         typename Seed,
         typename... GenArgs>
Stream<T> MakeStream::randoms(Seed&& seed, GenArgs&&... args);
template<typename T,
         template<typename> class Distribution,
         typename Engine = std::default_random_engine,
         typename... GenArgs>
Stream<T> MakeStream::randoms(GenArgs&&... args);
            
Creates an infinite stream of random values as generically as possible. In this function, T is the type of the resulting stream, Distribution is an unqualified number distribution (e.g. std::uniform_int_distribution), Engine is the underlying random number generator and the args... are the parameters passed into the constructor of the number generator. If the seed is not provided, the current time will be used. If there is no factory method that supports your specific distribution, this is the method for you.

Example:
                MakeStream::randoms<double, std::poisson_distribution>(5.0);
// Stream of doubles coming from a poisson process with mean 5.
            

Stateless Intermediate Stream Operators

These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. Intermediate operators do not get evaluated until some terminal operation is called on the stream. These operators are all methods on the Stream class.

filter() Stateless intermediate stream operator
Method signature:
                template<typename Predicate>
auto filter(Predicate&& predicate);
            
Returns a stream operator that only allows elements of the stream that pass the given predicate.

Example:
                MakeStream::counter(1)
    | filter([](int x) { return x % 2 == 0; });
// Stream now contains only even positive integers 
            
Specializations:

Specializations of filter exist for types that have an implicit conversion to bool. In this case, filter() returns a stream operator which will filter for values whose bool value is true. See the example for coin_flips().
map_() Stateless intermediate stream operator
Method signature:
                template<typename Transform>
auto map_(Transform&& transform);
            
Returns a stream operator that transforms each element of the stream using the given transformation function. The resulting stream is the produced by applying the transformation to each element of the input stream.

Example:
                MakeStream::counter(1)
    | map_([](int x) { return x * x; });
// Stream of perfect squares
            
flat_map() Stateless intermediate stream operator
Method signature:
                template<typename Transform>
auto flat_map(Transform&& transform);
            
Returns a stream operator that applies a transformation that is expected to return a stream as a result to every element of the stream. The resulting stream is the concatenation of these output streams.

Example:
                MakeStream::counter(1)
    | flat_map([](int x) {
        return MakeStream::counter(1) | limit(x);
    }); // Stream contains 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, ...
            
limit() Stateless intermediate stream operator
Method signature:
                auto limit(size_t bound);
            
Returns a stream operator that ensures that the stream contains a maximum number of elements.

Example:
                MakeStream::uniform_random_ints(1, 10)
    | limit(5); // Stream contains 5 random integers
            
Note that stream | limit(n) is equivalent to stream | slice(0, n, 1);.
slice() Stateless intermediate stream operator
Method signature:
                auto slice(size_t startIndex,
           size_t endIndex,
           size_t increment = 1);
            
Returns a stream operator that retrieves elements of the stream starting at startIndex, up to, but not including, endIndex, iterating by increment. By default, increment is 1.

Example:
                auto stream1 = MakeStream::counter(0) | slice(5, 10); // 5 6 7 8 9
auto stream2 = MakeStream::counter(0) | slice(1, 8, 2); // 1 3 5 7
            
slice_to_end() Stateless intermediate stream operator
Method signature:
                auto slice_to_end(size_t startIndex, size_t increment);
            
Returns a stream operator that is a version of the slice operation which allows an unbounded slice. Slices are taken starting at startIndex and go by increment.

Example:
                auto stream1 = MakeStream::counter(0) | slice_to_end(0, 5) // 0 5 10 15 ...
auto stream2 = MakeStream::counter(0) | slice_to_end(3, 2) // 3 5 7 9 ...
            
take_while() Stateless intermediate stream operator
Method signature:
                template<typename Predicate>
auto take_while(Predicate&& predicate);
            
Returns a stream operator that takes elements from the stream until the given predicate becomes false.

Example:
                MakeStream::counter(1)
    | take_while([](int x) { return x < 5; });
// Stream contains 1, 2, 3, 4
            
Specializations:

Specializations of take_while exist for types that have an implicit conversion to bool. In this case, take_while() returns a stream operator which will take values whose bool value is true.
drop_while() Stateless intermediate stream operator
Method signature:
                template<typename Predicate>
auto drop_while(Predicate&& predicate);
            
Returns a stream operator that drops the initial elements of the stream until the given predicate becomes false after which the stream is returned intact.

Example:
                MakeStream::counter(1)
    | drop_while([](int x) { return x < 5; });
// Stream contains 5, 6, 7, ...
            
Specializations:

Specializations of drop_while exist for types that have an implicit conversion to bool. In this case, drop_while() returns a stream operator which will drop values whose bool value is true.
peek() Stateless intermediate stream operator
Method signature:
                template<typename Function>
auto peek(Function&& function);
            
Returns a stream operator that allows a lazy peek into the stream. When a value passes through the pipeline and hits a peek, the function will be called on that value, the functions return value is ignored and the original value is passed onto the next pipeline. Like all stateless operations, this will not be executed until some stateful or terminal operation is called on the stream.

Example:
                std::vector<int> result = MakeStream::counter(1)
    | filter([](int x) { x % 3 == 0; })
    | limit(3)
    | peek([](int x) { std::cout << "Value = " << x << std::endl; })
    | to_vector();
            
Output:
                Value = 3
Value = 6
Value = 9
            
adjacent_distinct() Stateless intermediate stream operator
Method signature:
                template<typename Equal = std::equal_to<void>>
auto adjacent_distinct(Equal&& equal = Equal());
            
Returns a stream operator that removes adjacent duplicates from the stream. By default, duplicates are determined using the standard == operator. However, a different equality operation can be given.

Example:
                MakeStream::from({1, 1, 3, 2, 2, 5, 5, 5, 5, 2, 3, 3})
    | adjacent_distinct(); // Stream contains 1, 3, 2, 5, 2, 3;
            
adjacent_difference() Stateless intermediate stream operator
Method signature:
                template<typename Subtract = std::minus<void>>
auto adjacent_difference(Subtract&& subtract = Subtract());
            
Returns a stream operator that returns a stream of the pairwise differences of the elements. By default, subtraction is performed using the standard - operator. However, a different subtraction operation can be given. The type of the resulting stream is the difference type returned by the subtraction function.

Example:
                MakeStream::counter(1)
    | map_([](int x) { return x * x; })
    | adjacent_difference(); // Stream contains 3, 5, 7, 9, ...
            
partial_sum() Stateless intermediate stream operator
Method signature:
                template<typename Adder = std::plus<T>>
auto partial_sum(Adder&& add = Adder());
            
Returns a stream operator that computes the partial sum of the elements in the stream. By default, addition is performed using the standard + operator. However, a different addition operation can be given. This addition operation must take elements of type T and return an element of type T.

Example:
                MakeStream::from({1, 5, 3, 7, -2, 6})
    | partial_sum(); // Stream contains 1, 6, 9, 16, 14, 20
            
concat() Stateless intermediate stream operator
Method signatures:
                template<typename Iterator>
auto concat(Iterator begin, Iterator end);
auto concat(Stream<T>&& tail);
            
Returns a stream operator that concatenates the given stream to the end of the stream it is applied to, to be processed when the current stream reaches its end. The first form of concat is a convenience for concatenating with MakeStream::from(begin, end).

Example:
                std::vector<int> x = {4, 5, 6};
std::vector<int> y = {1, 2, 3};
MakeStream::from(x)
    | concat(MakeStream::from(y)); // Stream contains 4, 5, 6, 1, 2, 3
            
zip_with() Stateless intermediate stream operator
Method signature:
                template<typename Right, typename Function = Zipper>
auto zip_with(Stream<Right>&& other,
              Function&& zipper = Function());
            
Returns a stream operator that zips two streams together using the provided function, stopping when either of the streams finishes. By default, the zipping function takes the two elements of the streams and returns a tuple of the two elements. This function is specialized so that if either stream is a stream of tuples, the resulting of the zipping function is the concatenation of the tuples (rather than nested tuples).

Example:
                std::vector<std::string> input = {"Hello", "this", "is", "a", "stream"};
MakeStream::from(input)
    | zip_with(MakeStream::counter(1))// Stream<std::tuple<std::string, int>>
    | for_each([](std::tuple<std::string, int>&& tup) {
        std::cout << tup << std::endl;
    });
            
Produces the following output:
                (Hello, 1)
(this, 2)
(is, 3)
(a, 4)
(stream, 5)
            
Note: As can be seen above, we've included an overload for operator<< for inserting tuples into I/O streams.

Zipping multiple times produces a concatenated tuple rather than nested tuples:
                MakeStream::counter(0)
    | zip_with(MakeStream::counter(10))
    | zip_with(MakeStream::counter(20))
// Stream is now a stream of std::tuple<int, int, int> which contains:
// (0, 10, 20), (1, 11, 21), (2, 12, 22), ...
            
One particularly nice tuple operation we've included is a function called splat which takes a function and returns a function that takes a tuple and splats that tuples elements in as the arguments to the function. For example:
                auto splatted = splat([](std::string x, int y) {
    return x.length() + y;
});
splatted(std::make_tuple("Hello", 5)); // returns 10
            
This is useful, because it makes working with zipped streams particularly easy. So instead of having to do the ugly and unreadable:
                MakeStream::counter(1))
    | zip_with(MakeStream::counter(11))
    | map_([](const std::tuple<int, int>& tup) {
        return std::get<0>(tup) * std::get<1>(1);
    });
            
You can now have the following:
                MakeStream::counter(1)
    | zip_with(MakeStream::counter(11))
    | map_(splat([](int first, int second) {
        return first * second;
    }));
            
A different function can be used for zipping, as an excellent way of performing element-wise operations to combine streams. All of the binary operators have stream overloads that are effectively zips. For example, consider the multiplication operator, which can be implemented on two streams in the following way (its not quite this, but close):
                template<typename L, typename R>
auto operator* (Stream<L>&& left, Stream<R>&& right) {
    return left | zip_with(right, [](const L& lvalue, const R& rvalue) {
        return lvalue * rvalue;
    });
}
            
merge_with() Stateless intermediate stream operator
Note: This operator only applies to streams that are guaranteed to be sorted. Use on any other streams produces undefined results.

Method signature:
                template<typename Less = std::less<T>>
auto merge_with(Stream<T>&& other, Less&& less = Less());
            
Returns a stream operator that computes the merging of two streams (the merge step in mergesort). By default, elements are compared using their built in < operator. However, a different less than operator can be provided.

Example:
                Stream left = MakeStream::from({1, 2, 4, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | merge_with(right); // 1, 2, 2, 4, 4, 5, 12, 12, 13, 17, 17, 18
            
union_with() Stateless intermediate stream operator
Note: This operator only applies to streams that are guaranteed to be sorted and distinct. Use on any other streams produces undefined results.

Method signature:
                template<typename Less = std::less<T>>
auto union_with(Stream<T>&& other, Less&& less = Less());
            
Returns a stream operator that computes the set union of two streams. By default, elements are compared using their built in < operator. However, a different less than operator can be provided.

Example:
                Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | union_with(right); // 1, 2, 4, 12, 13, 17, 18
            
intersection_with() Stateless intermediate stream operator
Note: This operator only applies to streams that are guaranteed to be sorted and distinct. Use on any other streams produces undefined results.

Method signature:
                template<typename Less = std::less<T>>
auto intersection_with(Stream<T>&& other, Less&& less = Less());
            
Returns a stream operator that computes the set intersection of two streams. By default, elements are compared using their built in < operator. However, a different less than operator can be provided.

Example:
                Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | intersection_with(right); // 2, 12, 17
            
difference_with() Stateless intermediate stream operator
Note: This operator only applies to streams that are guaranteed to be sorted and distinct. Use on any other streams produces undefined results.

Method signature:
                template<typename Less = std::less<T>>
auto difference_with(Stream<T>&& other, Less&& less = Less());
            
Returns a stream operator that computes the set difference of two streams, with the base stream as the left argument, and the argument stream as the right argument to the difference. By default, elements are compared using their built in < operator. However, a different less than operator can be provided.

Example:
                Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | difference_with(right); // 1, 4, 13
            
symmetric_difference_with() Stateless intermediate stream operator
Note: This operator only applies to streams that are guaranteed to be sorted and distinct. Use on any other streams produces undefined results.

Method signature:
                template<typename Less = std::less<T>>
auto symmetric_difference_with(Stream<T>&& other, Less&& less = Less());
            
Returns a stream operator that computes the set symmetric difference of two streams. By default, elements are compared using their built in < operator. However, a different less than operator can be provided.

Example:
                Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | symmetric_difference_with(right); // 1, 4, 5, 13, 18
            
pairwise() Stateless intermediate stream operator
Method signature:
                Stream<std::pair<T, T>> Stream<T>::pairwise();
            
Returns a stream operator that returns a stream with adjacent elements grouped into pairs.

Example:
                MakeStream::counter(1)
    | pairwise() // Stream contains (1, 2), (3, 4), (5, 6), ...
            
Note that this is a convenience for stream | group<2>().
group() Stateless intermediate stream operator
Method signature:
                template<size_t N>
auto group();
auto group(size_t N);
            
Returns a stream operator that returns a stream with adjacent elements grouped into groups of size N. N can either be a compile time constant or a runtime value. In the former case, the resulting stream is a stream of N-tuples, all of whose elements are of the original stream type, T. In the latter case, the resulting stream is a stream of std::vector<T>.

Example:
                MakeStream::counter(1) | group<3>() // (1, 2, 3), (4, 5, 6), ...
MakeStream::counter(1) | group(3) // [1, 2, 3], [4, 5, 6], ...
            
This is specialied for N = 2, so that the resulting stream is a stream of pairs rather than tuple. Calling stream | group<2>() is the same as calling stream | pairwise().
overlap() Statelesss intermediate stream operator
Method signature:
                template<size_t N>
auto overlap();
auto overlap(size_t N);
            
Returns a stream operator that returns a stream with overlapping adjacent elements grouped into groups of size N. N can either be a compile time constant or a runtime value. In the former case, the resulting stream is a stream of N-tuples, all of whose elements are of the original stream type, T. In the latter case, the resulting stream is a stream of std::deque<T>.

Example:
                MakeStream::counter(1) | overlap<2>() // (1, 2), (2, 3), (4, 5), (5, 6), ...
MakeStream::counter(1) | overlap<3>() // (1, 2, 3), (2, 3, 4), (4, 5, 6), ...
MakeStream::counter(1) | overlap(4)   // [1, 2, 3, 4], [2, 3, 4, 5], ...
            
operator?() Stateless intermediate stream operator
Almost all C++ overloadable operators are overloaded to provide an algebra for streams, allowing easy element-wise operations on streams with both other streams and a given value. For some binary operator which we will denote as $, the following overloads are available:

Method signatures (template definitions excluded for brevity):
                Stream<X> operator$ (Stream<L>&& left, R&& right);
Stream<X> operator$ (L&& left, Stream<R>&& right);
Stream<X> operator$ (Stream<L>&& left, Stream<R>&& right);
            
The first two of these signatures provide mapping operations, applying the $ operator to every element of the stream with the given value on the left or the right. For example:
                auto stream1 = MakeStream::counter(1.0) / 3.0; // 1/3, 2/3, 1, 4/3, ....
auto stream2 = 5 * MakeStream::counter(0); // 0, 5, 10, 15, ...
            
The third signature provides an zipping operation of the elements in both streams, using the $ operator as the zipping function. For example, one can compute the dot product of two streams as follows:
                Stream<double> v1 = /* ... */, v2 = /* ... */;
double dot_product = (v1 * v2) | sum();
            
Unsupported overloadable binary operators:
  • Assignment operators, =, +=, *=, etc.
  • Subscript operator, [] (In consideration)
  • Member pointer operator, ->*
  • Comma operator, ,
Various unary operators are also supported on streams, providing mapping operations. For some unary operator $, the signature for the operator is as follows:

Method signature:
                Stream<X> operator$ (Stream<T>&& stream);
            
This provides a mapping operation, applying the $ operator to every element of the stream. For example, to deference a stream of pointers, we simply call *stream.

Unsupported overloadable unary operators:
  • Postfix/prefix increment/decrement operators, ++, --
  • Reference operator, &
  • Structure dereference operator, ->
  • Conversion operator operator, (type)
Other overloadable but unsupported operators:
  • Allocation operators, new, new[]
  • Deallocation operators, delete, delete[]
  • Function application, (args...) (Coming soon!)

Stateful Intermediate Stream Operators

These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. These operations accumulate some non constant amount of space to store their necessary state.

state_point() Stateful intermediate stream operator
Method signature:
                auto state_point();
            
Returns a stream operator that forces the stream to collect its state before continuing with operations. This is a good way to allow a complete peek into the data at a certain point in the stream pipeline.

Example:
                MakeStream::counter(1)
    | peek([](int x) { std::cout << "x = " << x << std::endl; })
    | limit(3)
    | state_point()
    | map_([](int x) { return 2 * x; })
    | for_each([](int x) {
        std::cout << "2x = " << x << std::endl;
    });
            
Produces the following output:
                x = 1
x = 2
x = 3
2x = 2
2x = 4
2x = 6
            
Without the call to state_point(), the following would have been the output:
                x = 1
2x = 2
x = 2
2x = 4
x = 3
2x = 6
            
Note that calling stream | state_point() is functionally equivalent to calling MakeStream::from(stream | to_deque()).
sort() Stateful intermediate stream operator
Method signature:
                template<typename Less = std::less<void>>
auto sort(Less&& less = Less());
            
Returns a stream operator that sorts the elements of the streams, using the given less than. By default, uses the built in < operator.

Example:
                MakeStream::from({3, 1, 5, 3, 2, 8})
    | sort(); // Stream contains 1, 2, 3, 3, 5, 8
            
distinct() Stateful intermediate stream operator
Method signature:
                template<typename Less = std::less<T>>
auto distinct(Less&& less = Less());
            
Returns a stream operator that removes duplicate elements from the stream and returns the results in sorted order.

Example:
                MakeStream::from({3, 1, 3, 5, 2, 5, 8, 8})
    | distinct(); // Stream contains 1 2 3 5 8
            

Terminal Stream Operators

count() Stateless terminal stream operator
Method signature:
                auto count();
            
Returns a stream terminator that returns the number of elements in the stream.
sum() Stateless terminal stream operator
Method signature:
                auto sum(const T& identity);
auto sum();
            
Returns a stream terminator that adds the elements of the stream, returning the total. If no additive identity is provided and the stream is empty, throws an EmptyStreamException. If an identity is provided and the stream is empty, the identity is returned.
product() Stateless terminal stream operator
Method signature:
                auto product(const T& identity);
auto product();
            
Returns a stream terminator that multiplies the elements of the strean, returning the product. If no multiplicative identity is provided and the stream is empty, throws an EmptyStreamException. If an identity is provided and the stream is empty, the identity is returned.
min() Stateless terminal stream operator
Method signature:
                template<typename Less = std::less<void>>
auto min(Less&& less = Less());
            
Returns a stream terminator that returns the smallest element of the stream as given by the provided less than operator (by default, the built in < operator); If the stream is empty, throws an EmptyStreamException.
min_by() Stateless terminal stream operator
Method signature:
                template<typename Function, typename Less = std::less<void>>
auto min_by(Function&& function, Less&& less = Less());
            
Returns a stream terminator that returns the element of the stream that results in the smallest value of the given function as determined by the provided less than operator (by default, the built in < operator); If the stream is empty, throws an EmptyStreamException.
max() Stateless terminal stream operator
Method signature:
                template<typename Less = std::less<void>>
auto max(Less&& less = Less());
            
Returns a stream terminator that returns the largest element of the stream as given by the provided less than operator (by default, the built in < operator); If the stream is empty, throws an EmptyStreamException.
max_by() Stateless terminal stream operator
Method signature:
                template<typename Function, typename Less = std::less<void>>
auto max_by(Function&& function, Less&& less = Less());
            
Returns a stream terminator that returns the element of the stream that results in the largest value of the given function as determined by the provided less than operator (by default, the built in < operator); If the stream is empty, throws an EmptyStreamException.
minmax() Stateless terminal stream operator
Method signature:
                template<typename Less = std::less<T>>
auto minmax(Less&& less = Less());
            
Returns a stream terminator that returns the smallest and largest elements of the stream as given by the provided less than operator (by default, the built in < operator); The first field of the resulting pair is the minimum, the second field is the maximum element. If the stream is empty, throws an EmptyStreamException.
minmax_by() Stateless terminal stream operator
Method signature:
                template<typename Function, typename Less = std::less<void>>
auto minmax_by(Function&& function, Less&& less = Less());
            
Returns a stream terminator that returns the elements of the stream that result in the smallest and largest values of the given function as determined by the provided less than operator (by default, the built in < operator); The first field of the resulting pair is the element that minimizes the function, and the second field is the element that maximizes it; If the stream is empty, throws an EmptyStreamException.
first() Stateless terminal stream operator
Method signature:
                auto first();
            
Returns a stream terminator that returns the first element of the stream. If the stream is empty, throws an EmptyStreamException.
last() Stateless terminal stream operator
Method signature:
                auto last();
            
Returns a stream terminator that returns the last element of the stream. If the stream is empty, throws an EmptyStreamException.
nth() Stateless terminal stream operator
Method signature:
                auto nth(size_t index);
            
Returns a stream terminator that returns the nth element of the stream, indexed starting at 0. If the stream does not contain that many elements, throws an EmptyStreamException. Calling stream | nth(n) is functionally equivalent to calling stream | skip(n) | first().
reduce() Stateless terminal stream operator
Method signatures (template definitions excluded for brevity):
                auto reduce(Accumulator&& accum);
auto reduce(IdentityFn&& identityFn, Accumulator&& accum);

            
Returns a stream terminator that performs an identity-less reduction (or a fold) of the elements in the stream. Since there is no identity value for each of these, if the stream is empty an EmptyStreamException is thrown. There are two forms of the reduce operation, given by the signatures above.

The first reduce operation has an accumulator takes both arguments of the stream type and returns a value of the same type. The functionality is equivalent to (in pseudocode):
                T result = stream[0];
for(T element : stream[1 ...]) {
    result = accum(result, element);
}
return result;
            
This reduce operation can be used to compute a sum without identity:
                stream | reduce(std::add<void>());
            
The second reduce operation takes an identity function which converts elements of the stream into elements of the result type, and an accumulator which takes a first argument of the result type, and a second argument of the result type. The functionality is equivalent to (in pseudocode):
                U result = identity(stream[0]);
for(T element : stream[1...]) {
    result = accum(result, element);
}
return result;
            
This is the type of reduction that is used to compute minmax(), which can be implemented as follows:
                auto to_pair = [](auto&& x) {
    return std::make_pair(x, x);
};

auto next_minmax = [](auto&& prev_minmax, auto&& value) {
    if(value < prev_minmax.first) {
        return std::make_pair(value, prev_minmax.second);
    } else if (value > prev_minmax.second) {
        return std::make_pair(prev_minmax.first, value);
    } else {
        return prev_minmax;
    }
};

auto minmax = reduce(to_pair, next_minmax);
            
identity_reduce() Stateless terminal stream operator
Method signature:
                template<typename U, typename Accumulator>
auto identity_reduce(const U& identity, Accumulator&& accumulator);
            
Returns a stream terminator that performs a reduction with an identity element. If the stream is empty, then the identity value is returned. The accumulator function works the same as it does for reduce, the first argument is the result type, the second argument is the stream type. The functionality is equivalent to:
                U result = identity
for(T element : stream) {
    result = accum(result, element);
}
return result;
            
random_element() Stateless terminal stream operator
Method signature:
                auto random_element();
            
Returns a stream terminator that returns a random element drawn uniformly from the elements of the stream. If the stream is empty, throws an EmptyStreamException. If the stream is infinite, this will never terminate.
random_sample() Stateless terminal stream operator
Method signature:
                auto random_sample(size_t size);
            
Returns a stream terminator that returns a vector containing a random sample of elements of the given size drawn randomly from the stream. If the stream has less elements than the requested size, the sample will consist of all of the elements of the stream. If the stream is infinite, this will never terminate.
any() Stateless terminal stream operator
Method signature:
                template<typename Predicate>
auto any(Predicate&& predicate);
            
Returns a stream terminator that returns true if some element of the stream matches the given predicate. If the stream is empty, the result is vacuously false. If the stream is infinite, this operation will shortcut in the case that an element that matches the predicate is found.

Specializations:

A specialization of any() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
                auto any();
            
This returns true if any element of the stream has a conversion to a bool with value true.
all() Stateless terminal stream operator
Method signature:
                template<typename Predicate>
auto all(Predicate&& predicate);
            
Returns a stream terminator that returns true if all of the elements of the stream matches the given predicate. If the stream is empty, the result is vacuously true. If the stream is infinite, this operation will shortcut in the case that an element that doesn't match the predicate is found.

Specializations:

Specializations of any() exist for types that have an implicit conversion to bool. For these types, the following specializations exist:
                auto all();
            
The first returns true if all of the elements of the stream have a conversion to a bool with value true. The second is a convenience for the logical not of the first.
none() Stateless terminal stream operator
Method signature:
                template<typename Predicate>
auto none(Predicate&& predicate);
            
Returns a stream terminator that returns true if none of the elements of the stream matches the given predicate. If the stream is empty, the result is vacuously true. If the stream is infinite, this operation will shortcut in the case that an element that doesn't match the predicate is found.

Specializations:

A specialization of none() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
                auto none();
            
This returns true if none of the elements of the stream have a conversion to a bool with value true.
not_all() Stateless terminal stream operator
Method signature:
                template<typename Predicate>
auto not_all(Predicate&& predicate);
            
Returns a stream terminator that returns true if not all of the elements of the stream matches the given predicate. If the stream is empty, the result is vacuously true. If the stream is infinite, this operation will shortcut in the case that an element that doesn't match the predicate is found.

Specializations:

A specialization of not_all() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
                auto not_all();
            
This returns true if not all of the elements of the stream have a conversion to a bool with value true.
for_each() Stateless terminal stream operator
Method signature:
                template<typename Function>
auto for_each(Function&& function);
            
Returns a stream terminator that calls the given function on each element of the stream.
to_container() Stateful terminal stream operator
Method signatures:
                auto to_vector();
auto to_list();
auto to_deque();
auto to_set();
auto to_multiset();
auto to_unordered_set();
auto to_unordered_multiset();
            
Returns a stream terminator that creates a container of the given type and inserts the elements of the stream into that container thereafter returning the contianer. In addition, the Stream class contains conversion operators for each of the above container types. For example, the following two lines are equivalent
                std::vector<int> result = stream;
auto result = stream | to_vector();
            
copy_to() Stateless terminal stream operator
Method signature:
                template<typename OutputIterator>
auto copy_to(OutputIterator iterator);
            
Returns a stream terminator that copies the elements of the stream into the given iterator, returning the iterator one past the end of the sequence, much like std::copy.
move_to() Stateless terminal stream operator
Method signature:
                template<typename OutputIterator>
auto move_to(OutputIterator iterator);
            
Returns a stream terminator that moves the elements of the stream into the given iterator, returning the iterator one past the end of the sequence, much like std::move (the one in the algorithm package).

Stream Reducers

Stream Reducers are class that provide convenient common reductions of streams for a variety of types.
Reducer Stream reducer
Class signature:
                template<typename In, typename Out>
class Reducer {
    virtual Out initial(In&& in) const = 0;
    virtual Out accumulate(Out&& out, In&& in) const = 0;
    auto reducer() const;
}
            
Reducers provide the two main functions provided to the reduce() terminator as class methods for a subclass to overload. One constructs an instance of a reducer subclass and then calls the reducer() method to turn it into a terminal operation that reduces a stream using the desired functions.

Reducers are not included when you include "Stream.h", you must additionally include "Reducers.h" to get them.
Histogram Stream reducer
Class signature:
                template<typename In, typename Less = std::less<In>>
class Histogram : public Histogram<In, std::map<In, size_t, Less>> {
public:
    Histogram(Less&& less = Less());
}
            
A reducer that accumulate the contents of a stream by counting the occurences of each item that's tconsidered distinct by the given comparison operator.

Example:
                std::map<int, int> result = MakeStream::from({1, 3, 1, 2, 1, 2}
    | reducer::Histogram<int>().reducer();
// result contains {1: 3, 2: 2, 3: 1}
            
SummaryStats Stream reducer
Class signature:
                template<typename In, typename Result = double>
class SummaryStats : public Reducer<In, Stats<In, Result>> {
}

template<typename In, typename Out>
class Stat {
    size_t number() const;
    Out mean() const;
    Out stddev() const;
    In min() const;
    In max() const;
};
            
A reducer that accumulates various summary statistics about the input data, and returns a statistics object containing those.

Example:
                auto stats = MakeStream::normal_randoms()
    | limit(1000)
    | reducers::SummaryStats<int>().reducer();
std::cout << stats << std::endl;
// Example output: N=1000, u=0.015, s=0.473248, min=-3, max=3
            

Stream Exceptions

StreamException Stream exception
Class signature:
                class StreamException {
public:
    explicit StreamException(const std::string& msg);
    explicit StreamException(const char* msg);

    std::string what() const;
}
            
The root class of all stream based exceptions.
EmptyStreamException Stream exception
Class signature:
                class EmptyStreamException : public StreamException {
public:
    explicit EmptyStreamException(const std::string& method);
}
            
The exception that gets thrown when attempting to call some terminal stream operation that has no identity element for that operation on a stream that is empty.

Example:
                try {
    MakeStream::empty() | min()
} catch(EmptyStreamException& e) {
    std::cout << e.what() << std::endl;
}
            
Produces the following output:
                No terminal result for operation stream::op::min.
            
VacantStreamException Stream exception
Class signature:
                class VacantStreamException : public StreamException {
public:
    explicit EmptyStreamException(const std::string& method);
}
            
The exception that gets thrown when attempting to call any stream operation on some stream that no longer holds data. A stream is considered vacant when it has been moved from some other stream, or some stream operation has been called on the stream. To check vacancy, use the occupied() method.

Example:
                auto stream1 = MakeStream::range(0, 5);
auto stream2 = std::move(stream1); // Copying disallowed
try {
    stream1 | skip(2);
} catch(VacantStreamException& e) {
    std::cout << e.what() << std::endl;
}
stream2 | skip(2); // Didn't save it so stream2's data is lost!
try {
    stream2 | first();
} catch(VacantStreamException& e) {
    cout << e.what() << endl;
}
            
Produces the following output:
                Cannot perform operation stream::op::skip on a vacant stream
Cannot perform operation stream::op::first on a vacant stream
            
ConsumedIteratorException Stream exception
Class signature:
                class ConsumedIteratorException : public StreamException {
public:
    explicit ConsumedIteratorException(const std::string& op);
}
            
The exception that gets thrown when attempting to call some non dereferencing operation on a stream iterator after that iterator is declared to be in a "consumed" state. For a discussion of this, see the comments for the begin() method.

Example:
                auto stream = MakeStream::closed_range(1,10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
    ++temp_iter;
} catch(ConsumedIteratorException& e) {
    cout << e.what() << endl;
}
            
Produces the following output:
                1
2
Cannot perform prefix increment on consumed stream iterator.
            
StopStream Stream exception
Class signature:
                class StopStream : public StreamException {
public:
    explicit StopStream();
}
            
An exception to be thrown by the user in any intermediary or generating (but not terminating) stream operation, if they want the stream to stop iterating at that point. This is most often helpful with the generate() method.

Example:
                std::ifstream fin = /* ... */
auto stream = MakeStream::generate([&fin]() {
    std::string line;
    if(!std::getline(fin, line))
        throw StopStream(); // Stop iterating when we reach EOF
    return line;
});
            
                // Silly version of take while
stream | peek([](auto& value) {
    if(!pred(value))
        throw StopStream();
});
            

Stream Recipes

These will go here eventually.

C++ Streams are developed by Jonah Scheinerman. Please contact me if you have question or concerns.

C++ Streams are distributed under the MIT open source license.

Copyright © 2014 by Jonah Scheinerman