Streams is a C++ library that provides lazy evaluation and functional-style transformations on data, to ease the use of C++ standard library containers and algorithms. Streams support many common functional operations such as map, filter, and reduce, as well as various other useful operations such as various set operations (union, intersection, difference), partial sum, and adjacent difference, as well as many others. Please visit the Github page to download and try it yourself.
Streams is developed by Jonah Scheinerman, and is always looking to improve. Please get in touch if you find bugs or have ideas on how to make Streams even better.
To use streams download them from Github. To use, simply #include "Stream.h". Streams require C++14 to compile. Streams consist only of header files, so you shouldn't have to modify your build process to use them.
Introduction
Streams are an abstraction on a set of data that has a specific order (be it a strict ordering or just some encounter order). Various operations can be applied to streams such that data passes through the stream pipeline in a lazy manner, only getting passed to then next operation when it is requested. There are 3 main kinds of stream operations: generators, intermediate operators, and terminal operators.
- Generators are methods that create streams, and are thus considered stream sources. Stream generators will not be evaluated at all until some terminal operation on a stream has been called.
- Intermediate operators take in streams and return streams, applying some additional operation to the end of the stream pipeline. Intermediate operations will not be evaluated until some terminal operation on the stream has been called. There are two kinds of intermediate operators: stateless and stateful. Stateless operators require O(1) memory, whereas stateful operators may accumulate up to O(n) memory where n is the stream length.
- Terminal operators close a stream and finally cause all of the stream operations to evaluate up the stream pipeline, until some final non-stream value gets returned by the terminal operation.
A stream is a single source of data, and thus will uniquely own its data source. There is no way of copying streams without accruing a lot of state and thus streams are not copiable. However, streams are movable. Moving a stream will result in the source stream being "vacant." One can check the vacancy of a stream using the occupied() method. Attempting to apply any stream operation to a vacant stream will result in a VacantStreamException being thrown. Additionally, all intermediate operations will create new streams which now own the original data source of the calling stream. Thus calling and intermediate operation on a stream will result in the original stream being vacant. See the example for occupied().
Streams can be iterated through in the standard C++ way using the begin() and end() methods. However, one should be careful when using these (as they don't work exactly like standard library iterators), and using these may not be as efficient. For example, even though it may be nicer to iterate through a stream via a range-for loop, it may be more efficient to use the for_each() method, as it does not have to incur the overhead cost of making stream iterators safe to use.
Operator Application and Composition
Stream operators are objects that can be applied to streams to alter the contents (usually) in a lazy value, or to compute a terminal result. Most of the library consists of functions which return stream operators that can be applied to a stream using the bitwise or operator, | as follows: stream | streamop. These operations can be chained: stream | op1 | op2 | ... | opN. Most operations can applied to streams of many types.
Stream opeartors can also be composed with one another, to create new stream operators. Thus, application of streams is associative. For example, stream | op1 | op2 | op3 is equivalent to stream | (op1 | op2 | op3) or stream | (op1 | op2) | op3 or stream | op1 | (op2 | op3). This means the creation of new stream operators is quite easy:
auto square_and_sum = map_([] (auto x) { return x * x; }) | sum();
Specializations
Streams are specialized based on the element type of the stream to allow extended functionality based on that type. Specializations for most types are listed in their relevant sections. However, for class types, there are specializations that are further reaching, and we shall discuss here. For any method that takes function of the element type (e.g. map_(), filter(), take_while(), to name a few), there is a specialization that allows for passing member function pointers to the function. For example consider the following example:
struct Thing {
int x;
int value() const { return x }
};
Stream<Thing> things = /* ... */
In this case, all of the following are equivalent, with the last one being the one that is provided by the stream specialization for class types:
auto values = things | map_([](Thing&& thing) { return thing.value(); });
auto values = things | map_(std::mem_fn(&Thing::value));
auto values = things | map_(&Thing::value);
The full set of operators with this property is as follows:
Methods
These are methods on the stream class that do not directly effect the data in the stream, and thus are not considered stream operators.
iterator Stream<T>::begin();
Returns an iterator to the beginning of the stream.
Iterators for streams have all of the standard iterator operator overloads,
and act for the most part just like normal forward iterators. However,
there are some caveats to their use.First of all, when getting a starting iterator from begin(), you have not yet consumed anything in the stream. But, as soon as you perform some operation (dereference, increment, test equality or inequality) on this. iterator, the first element of the stream (if there is one) will be consumed and this iterator will resolve itself to its actual value. Example:
auto stream = MakeStream::counter(1)
| peek([](int x) { std::cout << "Peek = " << x << "\n" });
auto iter = stream.begin();
int value = *iter;
std::cout << "Iter = " << value << "\n";
Produces the following output:
Peek = 1
Iter = 1
Second, be very careful when using the return value of the postfix increment
operator. The return value of the postfix increment of an iterator is an iterator
that is consider to be "consumed," meaning you can dereference it, but you
can't do anything else with it, you can't increment it or check its equality.
Why is this? Well the idea is that we don't want to split the stream or copy it. If you had two independent iterators that were both attempting to iterate through the same stream, very strange behavior could occur, because a stream isn't a true container, its simply an wrapper around a next method. This isn't to say getting two iterators is impossible (simply call begin() twice). However, we want to safeguard against this type of behavior. Attempting to perform unauthorized actions on a consumed iterator results in a ConsumedIteratorException. Example:
auto stream = MakeStream::closed_range(1, 10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
++temp_iter;
} catch(ConsumedIteratorException& e) {
cout << e.what() << endl;
}
Produces the following output:
1
2
Cannot perform prefix increment on consumed stream iterator.
That being said, using stream iterators should be fine with most if not
all standard library algorithms. However, be cognizant of the fact that
you are paying a slight overhead cost for using an iterator, so if
speed is your concern, use a reduction method instead of something
that uses iterators. For example, even though this is prettier:
for(auto element : stream) {
/* do something */
}
This is more efficient:
stream | for_each([](auto element) {
/* do something */
});
The choice is yours.
iterator Stream<T>::end();
Returns an iterator to something one past the end of the stream. Since
the end of the stream is unknown, this is simply a sentinel iterator.
For a discussion of the intricacies of using iterators see the
begin() method.
bool Stream<T>::occupied();
Returns true if the stream object owns stream data. Streams are only
movable (not copiable), and moving a stream results in a "vacant"
stream (calling this method will return false). Attempting to
perform any stream operation on a vacant stream will result in a
VacantStreamException. Similarly, every
stream operation returns a new stream that owns the data of the stream(s)
it was called on. Thus attempting to do two operations on the same
stream will result in an exception being thrown.
Example:
Stream<int> stream1 = MakeStream::counter(1);
std::cout << std::boolalpha;
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
Stream<int> stream2 = stream1 | limit(10);
std::cout << "Stream 1 occupied: " << stream1.occupied() << std::endl;
std::cout << "Stream 2 occupied: " << stream2.occupied() << std::endl;
try {
stream1 | filter([] (int x) { return x % 2 == 0; })
} catch(VacantStreamException& e) {
std::cout << e.what() << std::endl;
}
Produces the following output:
Stream 1 occupied: true
Stream 1 occupied: false
Stream 2 occupied: true
Cannot call stream::op::filter on a vacant stream
void Stream<T>::close();
Closes a stream, causing all of its unevaluated data to be lost, and
the stream will be but into a vacant state. This is called automatically
after any terminal operation.
Example:
auto s = MakeStream::counter(1);
s.close();
s.occupied(); // returns false
std::string Stream<T>::pipeline();
Returns a string representation of the stream pipeline, including all of the basic
transformations on the stream and all sources, the number
of pipelines, and the number of stream sources.
Example:
std::vector<std::string> v = /* ... */
auto s = MakeStream::from(v)
| limit(100)
| filter([](std::string& s) { return !s.empty(); })
| map_([](std::string& s) { return s[0]; })
| zip_with(MakeStream::counter(1) * 5)
| zip_with(MakeStream::repeat(10));
std::cout << s.pipeline() << std::endl
Produces the following output:
> Zip:
> Zip:
> Map:
> Filter:
> Slice[0, 100, 1]:
> [iterator stream]
> Map:
> [iterated stream]
> [repeated value stream]
Stream pipeline with 6 stages and 3 sources.
Stream Generators
The stream generator factory methods can all be found as static methods in the MakeStream class. These are not wrapped in the stream, so that they can perform type deduction for you.
Be careful when using factory methods that draw from referenced data sources (for example, MakeStream::from(const Container&)). These are perfectly safe to use if the usage of the stream is entirely contained with in the current scope. However, if the scope is left, and the stream is referencing a container within that scope, bad stuff can happen. In this case, prefer the MakeStream::from_move() generator.
template<typename T>
Stream<T> MakeStream::empty();
Creates an empty stream of the given type. Calling
MakeStream::empty<T>() is equivalent
to default constructing a stream.
Example:
MakeStream::empty<int>() | count(); // 0
template<typename T>
Stream<T> MakeStream::singleton(T&& value);
Creates a stream with a single given value.
Example:
MakeStream::singleton(5) | to_vector(); // {5}
Stream<T> MakeStream::from(Iterator begin, Iterator end);
Stream<T> MakeStream::from(const Container& cont);
Stream<T> MakeStream::from(T* arr, size_t length);
Stream<T> MakeStream::from(std::initializer_list<T> init);
Creates a stream from some existing set of data, a container for which
std::begin(), and std::end() are defined, a C-style array or an initializer list.Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the from_move() generator method.
Example:
std::vector<int> x = {1, 3, 4, 2};
MakeStream::from(x);
MakeStream::from(x.begin(), x.end())
int arr[4] = {1, 3, 4, 2};
MakeStream::from(arr, 4);
MakeStream::from({1, 3, 4, 2});
template<typename Container>
Stream<T> MakeStream::from_move(Container&& cont);
Creates a stream from a container of data, moving that container into
itself, so that the stream may be used safely outside of the current scope.
Example:
#include "Stream.h"
#include <vector>
#include <iostream>
std::vector<int> make_vector() {
return {1, 2, 3};
}
Stream<int> make_stream_safe1() {
return MakeStream::from_move(make_vector());
}
Stream<int> make_stream_safe2() {
std::vector<int> vec = {4, 5, 6};
return MakeStream::from_move(vec);
}
Stream<int> make_stream_unsafe() {
std::vector<int> vec = {7, 8, 9};
return MakeStream::from(vec); // BAD!
}
int main(int argc, char const *argv[]) {
(make_stream_safe1() | print_to(std::cout)) << std::endl;
(make_stream_safe2() | print_to(std::cout)) << std::endl;
(make_stream_unsafe() | print_to(std::cout)) << std::endl;
}
Produces the following output (on one run on my computer):
1 2 3
4 5 6
7 -536870912 -2026110050
Rule of thumb: If you're only going to be accessing data through the
stream, probably just use from_move().
template<typename T>
Stream<T> MakeStream::repeat(T&& value);
template<typename T>
Stream<T> MakeStream::repeat(T&& value, size_t times);
Creates a stream consisting of the same value repeated over and
over again. The first method creates an infinite stream of the repeated
value. The second method only repeats the value a fixed number of
times. Calling MakeStream::repeat(x, n) is
equivalent to calling MakeStream::repeat(x) | limit(n).
Example:
auto s = MakeStream::from({ /* ... */ })
| concat(MakeStream::repeat(0)) // Stream padded with 0's
Stream<T> MakeStream::cycle(Iterator begin, Iterator end);
Stream<T> MakeStream::cycle(Iterator begin, Iterator end, size_t times);
Stream<T> MakeStream::cycle(const Container& cont);
Stream<T> MakeStream::cycle(const Container& cont, size_t times);
Stream<T> MakeStream::cycle(std::initializer_list<T> init);
Stream<T> MakeStream::cycle(std::initializer_list<T> init, size_t times);
Creates a stream of a sequence of elements repeated over and over again.
The signatures without a times parameter
will loop over a range indefinitely, whereas the ones with the parameter
will only loop over the sequence that many times. Warning! Beware using most of these methods if your Stream is going to be used outside of the current scope. The exception to this is the initializer_list generator, which will capture the list, and therefore is safe to use outside of the current scope. To safely capture a container in a cycle, use the cycle_move() generator method.
Example:
vector x{1, 3, 8};
MakeStream::cycle(x.begin(), x.end()) // Contains 1, 3, 8, 1, 3, 8, 1, ...
MakeStream::cycle(x, 2) // Contains 1, 3, 8, 1, 3, 8.
template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont);
template<typename Container>
Stream<T> MakeStream::cycle_move(Container&& cont, size_t times)
Creates a stream of a sequence of elements repeated over and over again.
The signatures without a times parameter
will loop over a range indefinitely, whereas the ones with the parameter
will only loop over the sequence that many times. Example:
#include "Stream.h"
#include <vector>
#include <iostream>
std::vector<int> make_vector() {
return {1, 2, 3};
}
Stream<int> make_stream_safe1() {
return MakeStream::cycle_move(make_vector(), 2);
}
Stream<int> make_stream_safe2() {
std::vector<int> vec = {4, 5, 6};
return MakeStream::cycle_move(vec, 2);
}
Stream<int> make_stream_unsafe() {
std::vector<int> vec = {7, 8, 9};
return MakeStream::cycle(vec, 2); // BAD!
}
int main(int argc, char const *argv[]) {
(make_stream_safe1() | print_to(std::cout)) << std::endl;
(make_stream_safe2() | print_to(std::cout)) << std::endl;
(make_stream_unsafe() | print_to(std::cout)) << std::endl;
}
Produces the following output (on one run on my computer):
1 2 3 1 2 3
4 5 6 4 5 6
0 -1879048192 0 0 -1879048192 0
template<typename Generator>
Stream<T> MakeStream::generate(Generator&& generator);
Creates a stream whose values the return values of repeated calls to
the generate function with no arguments.
Example:
MakeStream::generate(rand); // Stream of random integers
template<typename T, typename Function>
Stream<T> MakeStream::iterate(T&& value, Function&& function);
This is special case of recurrence()
that creates a stream that, given a value x and
a function f returns the stream produced by
x, f(x),
f(f(x)) and so on. In the below example
we produce a stream which investigates the
Collatz conjecture.
Example:
auto stream = MakeStream::iterate(1245, [](int x) {
if(x % 2 == 0) {
return x / 2;
} else {
return 3 * x + 1;
}
});
Note that iterate(n, f) is a convenience for
recurrence(f, n).
template<typename... Args, typename Function>
Stream<T> MakeStream::recurrence(Function&& function, Args&&... initial);
A more general version of iterate(),
creates an infinite stream that is a recurrence relation starting with some initial
set of values. For example for a second order recurrence relation (two initial values),
the output of the stream would be: a1, a2,
a3 = f(a1, a2),
a4 = f(a2, a3),
a5 = f(a3, a4) ...
Example:
MakeStream::recurrence(std::plus<int>(), 0, 1); // 0 1 1 2 3 5 8 ...
Note that in the case that sizeof...(Args) == 1, this
is identical to iterate().
template<typename T>
Stream<T> MakeStream::counter(T&& start);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::counter(T&& start, const U& increment);
Creates a stream of elements produced by incrementing a given element
indefinitely. Incrementing in the case of the first method is done
via a prefix increment operator. The last two methods increment by a
fixed value each time, by adding the increment to the current value
(on the right side of the operator).
Example:
MakeStream::counter(1); // 1, 2, 3, 4, ...
MakeStream::counter('A', 2) // A, C, E, G, ...
template<typename T>
Stream<T> MakeStream::range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::range(T&& lower, T&& upper, const U& increment);
Creates a stream of elements that iterate through a range starting at
lower, up to but not including
upper. Testing against the upper bound
is done via the built in != operator.
The rules of incrementation for this method are identical to those of
counter().
Example:
MakeStream::range(0, 5); // 0, 1, 2, 3, 4
MakeStream::range(0, 8, 2); // 0, 2, 4, 6
template<typename T>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, U&& increment);
template<typename T, typename U>
Stream<T> MakeStream::closed_range(T&& lower, T&& upper, const U& increment);
Creates a stream of elements that iterate through a range starting at
lower, up and including
upper. Testing against the upper bound
is done via the built in <= operator.
The rules of incrementation for this method are identical to those of
counter().
Example:
MakeStream::closed_range(1, 5); // 1, 2, 3, 4, 5
MakeStream::closed_range(0, 8, 2); // 0, 2, 4, 6, 8
template<typename T = bool,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::coin_flips();
template<typename T = bool,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::coin_flips(Seed&& seed);
Creates an infinite stream of random integers whose values are uniformly 0 or 1.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time. By default, this returns a stream of
bools, though the type can be specified
as any time which makes sense.
Example:
The following runs a test to see how many coin flips come up heads after 1000 flips.
int heads = MakeStream::coin_flips()
| limit(1000)
| filter()
| count();
template<typename T,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper);
template<typename T,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::uniform_random_ints(T lower, T upper, Seed&& seed);
Creates an infinite stream of random integers whose values are distributed uniformly
between the upper and lower bounds. The user can specify the underlying
random engine which defaults to std::default_random_engine
and the initial seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::uniform_random_ints(0, 10);
template<typename T = double,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::uniform_random_reals(T lower = 0.0, T upper = 1.0);
template<typename T,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::uniform_random_reals(T lower, T upper, Seed&& seed);
Creates an infinite stream of random real numbers whose values are
distributed uniformly between the upper and lower bounds. The first
signature defaults the lower and upper bounds to 0 and 1 respectively.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::uniform_random_reals(); // Doubles between 0 and 1
MakeStream::uniform_random_reals<float>(.3, .7); // Floats between .3 and .7
template<typename T = double,
typename Engine = std::default_random_engine>
Stream<T> MakeStream::normal_randoms(T mean = 0.0, T stddev = 1.0);
template<typename T = double,
typename Engine = std::default_random_engine,
typename Seed>
Stream<T> MakeStream::normal_randoms(T mean, T stddev, Seed&& seed);
Creates an infinite stream of random real numbers whose values are normally
distributed with a given mean and stand deviation. The first signature
defaults the parameters to the standard normal distribution, μ = 0
and s = 1.
The user can specify the underlying random engine which defaults to
std::default_random_engine and the initial
seed of the random number generator. If not given, the seed will be
initialized to the current time.
Example:
MakeStream::normal_randoms(); // Doubles distributed normally
MakeStream::normal_randoms<float>(5.3, 1.8); // Floats distributed normally.
template<typename T,
template<typename> class Distribution,
typename Engine = std::default_random_engine,
typename Seed,
typename... GenArgs>
Stream<T> MakeStream::randoms(Seed&& seed, GenArgs&&... args);
template<typename T,
template<typename> class Distribution,
typename Engine = std::default_random_engine,
typename... GenArgs>
Stream<T> MakeStream::randoms(GenArgs&&... args);
Creates an infinite stream of random values as generically as possible.
In this function, T is the type of the resulting
stream, Distribution is an unqualified number
distribution (e.g. std::uniform_int_distribution),
Engine is the underlying random number generator
and the args... are the parameters passed into
the constructor of the number generator. If the seed is not provided,
the current time will be used.
If there is no factory method that supports your specific distribution, this
is the method for you.
Example:
MakeStream::randoms<double, std::poisson_distribution>(5.0);
// Stream of doubles coming from a poisson process with mean 5.
Stateless Intermediate Stream Operators
These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. Intermediate operators do not get evaluated until some terminal operation is called on the stream. These operators are all methods on the Stream class.
template<typename Predicate>
auto filter(Predicate&& predicate);
Returns a stream operator that only allows elements of the stream that pass the given predicate.
Example:
MakeStream::counter(1)
| filter([](int x) { return x % 2 == 0; });
// Stream now contains only even positive integers
Specializations:Specializations of filter exist for types that have an implicit conversion to bool. In this case, filter() returns a stream operator which will filter for values whose bool value is true. See the example for coin_flips().
template<typename Transform>
auto map_(Transform&& transform);
Returns a stream operator that transforms each element of the stream using the given transformation
function. The resulting stream is the produced by applying the
transformation to each element of the input stream.
Example:
MakeStream::counter(1)
| map_([](int x) { return x * x; });
// Stream of perfect squares
template<typename Transform>
auto flat_map(Transform&& transform);
Returns a stream operator that applies a transformation that is
expected to return a stream as a result to every element of the stream.
The resulting stream is the concatenation of these output streams.
Example:
MakeStream::counter(1)
| flat_map([](int x) {
return MakeStream::counter(1) | limit(x);
}); // Stream contains 1, 1, 2, 1, 2, 3, 1, 2, 3, 4, ...
auto limit(size_t bound);
Returns a stream operator that ensures that the stream contains a maximum number of elements.
Example:
MakeStream::uniform_random_ints(1, 10)
| limit(5); // Stream contains 5 random integers
Note that stream | limit(n) is equivalent to
stream | slice(0, n, 1);.
auto skip(size_t amount);
Returns a stream operator that skips the first k elements of a stream.
Example:
MakeStream::counter(1)
| skip(10); // Stream contains 11, 12, 13, ...
Note that stream | skip(n) is equivalent to
stream | slice_to_end(n, 1);.
auto slice(size_t startIndex,
size_t endIndex,
size_t increment = 1);
Returns a stream operator that retrieves elements of the stream starting at startIndex,
up to, but not including, endIndex, iterating
by increment. By default, increment
is 1.
Example:
auto stream1 = MakeStream::counter(0) | slice(5, 10); // 5 6 7 8 9
auto stream2 = MakeStream::counter(0) | slice(1, 8, 2); // 1 3 5 7
auto slice_to_end(size_t startIndex, size_t increment);
Returns a stream operator that is a version of the slice operation which allows an unbounded slice.
Slices are taken starting at startIndex
and go by increment.
Example:
auto stream1 = MakeStream::counter(0) | slice_to_end(0, 5) // 0 5 10 15 ...
auto stream2 = MakeStream::counter(0) | slice_to_end(3, 2) // 3 5 7 9 ...
template<typename Predicate>
auto take_while(Predicate&& predicate);
Returns a stream operator that takes elements from the stream until the given predicate becomes false.
Example:
MakeStream::counter(1)
| take_while([](int x) { return x < 5; });
// Stream contains 1, 2, 3, 4
Specializations:Specializations of take_while exist for types that have an implicit conversion to bool. In this case, take_while() returns a stream operator which will take values whose bool value is true.
template<typename Predicate>
auto drop_while(Predicate&& predicate);
Returns a stream operator that drops the initial elements of the stream until the given predicate becomes
false after which the stream is returned intact.
Example:
MakeStream::counter(1)
| drop_while([](int x) { return x < 5; });
// Stream contains 5, 6, 7, ...
Specializations:Specializations of drop_while exist for types that have an implicit conversion to bool. In this case, drop_while() returns a stream operator which will drop values whose bool value is true.
template<typename Function>
auto peek(Function&& function);
Returns a stream operator that allows a lazy peek into the stream. When a value passes through the
pipeline and hits a peek, the function will be called on that value,
the functions return value is ignored and the original value is passed
onto the next pipeline. Like all stateless operations, this will not
be executed until some stateful or terminal operation is called on
the stream.
Example:
std::vector<int> result = MakeStream::counter(1)
| filter([](int x) { x % 3 == 0; })
| limit(3)
| peek([](int x) { std::cout << "Value = " << x << std::endl; })
| to_vector();
Output:
Value = 3
Value = 6
Value = 9
template<typename Equal = std::equal_to<void>>
auto adjacent_distinct(Equal&& equal = Equal());
Returns a stream operator that removes adjacent duplicates from the stream. By default, duplicates
are determined using the standard ==
operator. However, a different equality operation can be given.
Example:
MakeStream::from({1, 1, 3, 2, 2, 5, 5, 5, 5, 2, 3, 3})
| adjacent_distinct(); // Stream contains 1, 3, 2, 5, 2, 3;
template<typename Subtract = std::minus<void>>
auto adjacent_difference(Subtract&& subtract = Subtract());
Returns a stream operator that returns a stream of the pairwise differences of the elements.
By default, subtraction is performed using the standard -
operator. However, a different subtraction operation can be given. The type
of the resulting stream is the difference type returned by the subtraction
function.
Example:
MakeStream::counter(1)
| map_([](int x) { return x * x; })
| adjacent_difference(); // Stream contains 3, 5, 7, 9, ...
template<typename Adder = std::plus<T>>
auto partial_sum(Adder&& add = Adder());
Returns a stream operator that computes the partial sum of the elements in the stream. By default,
addition is performed using the standard +
operator. However, a different addition operation can be given. This
addition operation must take elements of type T
and return an element of type T.
Example:
MakeStream::from({1, 5, 3, 7, -2, 6})
| partial_sum(); // Stream contains 1, 6, 9, 16, 14, 20
template<typename Iterator>
auto concat(Iterator begin, Iterator end);
auto concat(Stream<T>&& tail);
Returns a stream operator that concatenates the given stream to the end of the
stream it is applied to, to be processed when the current stream reaches its end. The first
form of concat is a convenience for
concatenating with
MakeStream::from(begin, end).
Example:
std::vector<int> x = {4, 5, 6};
std::vector<int> y = {1, 2, 3};
MakeStream::from(x)
| concat(MakeStream::from(y)); // Stream contains 4, 5, 6, 1, 2, 3
template<typename Right, typename Function = Zipper>
auto zip_with(Stream<Right>&& other,
Function&& zipper = Function());
Returns a stream operator that zips two streams together using the provided function, stopping when
either of the streams finishes. By default, the zipping function takes
the two elements of the streams and returns a tuple of the two elements.
This function is specialized so that if either stream is a stream of
tuples, the resulting of the zipping function is the concatenation of
the tuples (rather than nested tuples).
Example:
std::vector<std::string> input = {"Hello", "this", "is", "a", "stream"};
MakeStream::from(input)
| zip_with(MakeStream::counter(1))// Stream<std::tuple<std::string, int>>
| for_each([](std::tuple<std::string, int>&& tup) {
std::cout << tup << std::endl;
});
Produces the following output:
(Hello, 1)
(this, 2)
(is, 3)
(a, 4)
(stream, 5)
Note: As can be seen above, we've included an overload for
operator<< for inserting tuples
into I/O streams.Zipping multiple times produces a concatenated tuple rather than nested tuples:
MakeStream::counter(0)
| zip_with(MakeStream::counter(10))
| zip_with(MakeStream::counter(20))
// Stream is now a stream of std::tuple<int, int, int> which contains:
// (0, 10, 20), (1, 11, 21), (2, 12, 22), ...
One particularly nice tuple operation we've included is a function
called splat which takes a function and
returns a function that takes a tuple and splats that tuples elements
in as the arguments to the function. For example:
auto splatted = splat([](std::string x, int y) {
return x.length() + y;
});
splatted(std::make_tuple("Hello", 5)); // returns 10
This is useful, because it makes working with zipped streams particularly easy.
So instead of having to do the ugly and unreadable:
MakeStream::counter(1))
| zip_with(MakeStream::counter(11))
| map_([](const std::tuple<int, int>& tup) {
return std::get<0>(tup) * std::get<1>(1);
});
You can now have the following:
MakeStream::counter(1)
| zip_with(MakeStream::counter(11))
| map_(splat([](int first, int second) {
return first * second;
}));
A different function can be used for zipping, as an excellent way of
performing element-wise operations to combine streams. All of the
binary operators have stream overloads that are effectively zips.
For example, consider the multiplication operator, which can be
implemented on two streams in the following way (its not quite this,
but close):
template<typename L, typename R>
auto operator* (Stream<L>&& left, Stream<R>&& right) {
return left | zip_with(right, [](const L& lvalue, const R& rvalue) {
return lvalue * rvalue;
});
}
Method signature:
template<typename Less = std::less<T>>
auto merge_with(Stream<T>&& other, Less&& less = Less());
Returns a stream operator that computes the merging of two streams (the merge step in mergesort).
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | merge_with(right); // 1, 2, 2, 4, 4, 5, 12, 12, 13, 17, 17, 18
Method signature:
template<typename Less = std::less<T>>
auto union_with(Stream<T>&& other, Less&& less = Less());
Returns a stream operator that computes the set union of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | union_with(right); // 1, 2, 4, 12, 13, 17, 18
Method signature:
template<typename Less = std::less<T>>
auto intersection_with(Stream<T>&& other, Less&& less = Less());
Returns a stream operator that computes the set intersection of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | intersection_with(right); // 2, 12, 17
Method signature:
template<typename Less = std::less<T>>
auto difference_with(Stream<T>&& other, Less&& less = Less());
Returns a stream operator that computes the set difference of two streams, with the base stream as
the left argument, and the argument stream as the right argument to
the difference.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | difference_with(right); // 1, 4, 13
Method signature:
template<typename Less = std::less<T>>
auto symmetric_difference_with(Stream<T>&& other, Less&& less = Less());
Returns a stream operator that computes the set symmetric difference of two streams.
By default, elements are compared using their built in
< operator. However, a different less
than operator can be provided.
Example:
Stream left = MakeStream::from({1, 2, 4, 12, 13, 17});
Stream right = MakeStream::from({2, 5, 12, 17, 18});
left | symmetric_difference_with(right); // 1, 4, 5, 13, 18
Stream<std::pair<T, T>> Stream<T>::pairwise();
Returns a stream operator that returns a stream with adjacent elements grouped into pairs.
Example:
MakeStream::counter(1)
| pairwise() // Stream contains (1, 2), (3, 4), (5, 6), ...
Note that this is a convenience for stream | group<2>().
template<size_t N>
auto group();
auto group(size_t N);
Returns a stream operator that returns a stream with adjacent elements
grouped into groups of size N. N
can either be a compile time constant or a runtime value. In the former case,
the resulting stream is a stream of N-tuples,
all of whose elements are of the original stream type, T.
In the latter case, the resulting stream is a stream of
std::vector<T>.
Example:
MakeStream::counter(1) | group<3>() // (1, 2, 3), (4, 5, 6), ...
MakeStream::counter(1) | group(3) // [1, 2, 3], [4, 5, 6], ...
This is specialied for N = 2, so that
the resulting stream is a stream of pairs rather than tuple. Calling
stream | group<2>() is the same as calling
stream | pairwise().
template<size_t N>
auto overlap();
auto overlap(size_t N);
Returns a stream operator that returns a stream with overlapping adjacent elements
grouped into groups of size N.
N can either be a compile time constant or a runtime value.
In the former case, the resulting stream is a stream of N-tuples,
all of whose elements are of the original stream type, T.
In the latter case, the resulting stream is a stream of std::deque<T>.
Example:
MakeStream::counter(1) | overlap<2>() // (1, 2), (2, 3), (4, 5), (5, 6), ...
MakeStream::counter(1) | overlap<3>() // (1, 2, 3), (2, 3, 4), (4, 5, 6), ...
MakeStream::counter(1) | overlap(4) // [1, 2, 3, 4], [2, 3, 4, 5], ...
Method signatures (template definitions excluded for brevity):
Stream<X> operator$ (Stream<L>&& left, R&& right);
Stream<X> operator$ (L&& left, Stream<R>&& right);
Stream<X> operator$ (Stream<L>&& left, Stream<R>&& right);
The first two of these signatures provide mapping operations, applying the
$ operator to every element of the stream with
the given value on the left or the right. For example:
auto stream1 = MakeStream::counter(1.0) / 3.0; // 1/3, 2/3, 1, 4/3, ....
auto stream2 = 5 * MakeStream::counter(0); // 0, 5, 10, 15, ...
The third signature provides an zipping operation of the elements in
both streams, using the $ operator as
the zipping function. For example, one can compute the dot product
of two streams as follows:
Stream<double> v1 = /* ... */, v2 = /* ... */;
double dot_product = (v1 * v2) | sum();
Unsupported overloadable binary operators:
- Assignment operators, =, +=, *=, etc.
- Subscript operator, [] (In consideration)
- Member pointer operator, ->*
- Comma operator, ,
Method signature:
Stream<X> operator$ (Stream<T>&& stream);
This provides a mapping operation, applying the $
operator to every element of the stream. For example, to deference a stream
of pointers, we simply call *stream.Unsupported overloadable unary operators:
- Postfix/prefix increment/decrement operators, ++, --
- Reference operator, &
- Structure dereference operator, ->
- Conversion operator operator, (type)
- Allocation operators, new, new[]
- Deallocation operators, delete, delete[]
- Function application, (args...) (Coming soon!)
Stateful Intermediate Stream Operators
These operators insert another stage of a stream pipeline or combine two streams in some way forming a new stream, which contains the data of the old streams. These operations accumulate some non constant amount of space to store their necessary state.
auto state_point();
Returns a stream operator that forces the stream to collect its state before continuing with operations. This
is a good way to allow a complete peek into the data at a certain point in the
stream pipeline.
Example:
MakeStream::counter(1)
| peek([](int x) { std::cout << "x = " << x << std::endl; })
| limit(3)
| state_point()
| map_([](int x) { return 2 * x; })
| for_each([](int x) {
std::cout << "2x = " << x << std::endl;
});
Produces the following output:
x = 1
x = 2
x = 3
2x = 2
2x = 4
2x = 6
Without the call to state_point(),
the following would have been the output:
x = 1
2x = 2
x = 2
2x = 4
x = 3
2x = 6
Note that calling stream | state_point() is
functionally equivalent to calling
MakeStream::from(stream | to_deque()).
template<typename Less = std::less<void>>
auto sort(Less&& less = Less());
Returns a stream operator that sorts the elements of the streams, using the given less than. By default,
uses the built in < operator.
Example:
MakeStream::from({3, 1, 5, 3, 2, 8})
| sort(); // Stream contains 1, 2, 3, 3, 5, 8
template<typename Less = std::less<T>>
auto distinct(Less&& less = Less());
Returns a stream operator that removes duplicate elements from the stream and returns the results
in sorted order.
Example:
MakeStream::from({3, 1, 3, 5, 2, 5, 8, 8})
| distinct(); // Stream contains 1 2 3 5 8
Terminal Stream Operators
auto count();
Returns a stream terminator that returns the number of elements in the stream.
auto sum(const T& identity);
auto sum();
Returns a stream terminator that adds the elements of the stream, returning the total. If no additive
identity is provided and the stream is empty, throws an
EmptyStreamException. If an identity is
provided and the stream is empty, the identity is returned.
auto product(const T& identity);
auto product();
Returns a stream terminator that multiplies the elements of the strean, returning the product. If no multiplicative
identity is provided and the stream is empty, throws an
EmptyStreamException. If an identity is
provided and the stream is empty, the identity is returned.
template<typename Less = std::less<void>>
auto min(Less&& less = Less());
Returns a stream terminator that returns the smallest element of the stream as given by the provided
less than operator (by default, the built in <
operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Function, typename Less = std::less<void>>
auto min_by(Function&& function, Less&& less = Less());
Returns a stream terminator that returns the element of the stream that results in the smallest
value of the given function as determined by the provided less than operator (by default, the
built in < operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Less = std::less<void>>
auto max(Less&& less = Less());
Returns a stream terminator that returns the largest element of the stream as given by the provided
less than operator (by default, the built in <
operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Function, typename Less = std::less<void>>
auto max_by(Function&& function, Less&& less = Less());
Returns a stream terminator that returns the element of the stream that results in the largest
value of the given function as determined by the provided less than operator (by default, the
built in < operator); If the stream is empty, throws an
EmptyStreamException.
template<typename Less = std::less<T>>
auto minmax(Less&& less = Less());
Returns a stream terminator that returns the smallest and largest elements of the stream as given by the provided
less than operator (by default, the built in <
operator); The first field of the resulting pair is the minimum,
the second field is the maximum element. If the stream is empty, throws an
EmptyStreamException.
template<typename Function, typename Less = std::less<void>>
auto minmax_by(Function&& function, Less&& less = Less());
Returns a stream terminator that returns the elements of the stream that result in the smallest
and largest values of the given function as determined by the provided less than operator (by
default, the built in < operator); The first field of the resulting
pair is the element that minimizes the function, and the second field is the element that maximizes
it; If the stream is empty, throws an EmptyStreamException.
auto first();
Returns a stream terminator that returns the first element of the stream. If the stream is empty, throws an
EmptyStreamException.
auto last();
Returns a stream terminator that returns the last element of the stream. If the stream is empty, throws an
EmptyStreamException.
auto nth(size_t index);
Returns a stream terminator that returns the nth element of the stream, indexed starting at 0. If the
stream does not contain that many elements, throws an
EmptyStreamException.
Calling stream | nth(n) is functionally
equivalent to calling stream | skip(n) | first().
auto reduce(Accumulator&& accum);
auto reduce(IdentityFn&& identityFn, Accumulator&& accum);
Returns a stream terminator that performs an identity-less reduction
(or a fold) of the elements in the stream. Since there is no identity value
for each of these, if the stream is empty an EmptyStreamException
is thrown. There are two forms of the reduce operation, given by the signatures above.The first reduce operation has an accumulator takes both arguments of the stream type and returns a value of the same type. The functionality is equivalent to (in pseudocode):
T result = stream[0];
for(T element : stream[1 ...]) {
result = accum(result, element);
}
return result;
This reduce operation can be used to compute a sum without identity:
stream | reduce(std::add<void>());
The second reduce operation takes an identity function which converts
elements of the stream into elements of the result type, and an accumulator
which takes a first argument of the result type, and a second argument of
the result type. The functionality is equivalent to (in
pseudocode):
U result = identity(stream[0]);
for(T element : stream[1...]) {
result = accum(result, element);
}
return result;
This is the type of reduction that is used to compute
minmax(), which can be implemented as
follows:
auto to_pair = [](auto&& x) {
return std::make_pair(x, x);
};
auto next_minmax = [](auto&& prev_minmax, auto&& value) {
if(value < prev_minmax.first) {
return std::make_pair(value, prev_minmax.second);
} else if (value > prev_minmax.second) {
return std::make_pair(prev_minmax.first, value);
} else {
return prev_minmax;
}
};
auto minmax = reduce(to_pair, next_minmax);
template<typename U, typename Accumulator>
auto identity_reduce(const U& identity, Accumulator&& accumulator);
Returns a stream terminator that performs a reduction with an identity element.
If the stream is empty, then the identity value is returned. The accumulator
function works the same as it does for reduce,
the first argument is the result type, the second argument is the stream type.
The functionality is equivalent to:
U result = identity
for(T element : stream) {
result = accum(result, element);
}
return result;
auto random_element();
Returns a stream terminator that returns a random element drawn uniformly from the elements of the stream.
If the stream is empty, throws an
EmptyStreamException.
If the stream is infinite, this will never terminate.
auto random_sample(size_t size);
Returns a stream terminator that returns a vector containing a random sample of elements of the given size drawn randomly
from the stream. If the stream has less elements than the requested size,
the sample will consist of all of the elements of the stream. If the
stream is infinite, this will never terminate.
template<typename Predicate>
auto any(Predicate&& predicate);
Returns a stream terminator that returns true if some element of the stream matches the given predicate.
If the stream is empty, the result is vacuously false. If the stream is
infinite, this operation will shortcut in the case that an element that
matches the predicate is found.Specializations:
A specialization of any() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
auto any();
This returns true if any element of the stream has a conversion to a
bool with value true.
template<typename Predicate>
auto all(Predicate&& predicate);
Returns a stream terminator that returns true if all of the elements of the stream matches the given predicate.
If the stream is empty, the result is vacuously true. If the stream is
infinite, this operation will shortcut in the case that an element that
doesn't match the predicate is found.Specializations:
Specializations of any() exist for types that have an implicit conversion to bool. For these types, the following specializations exist:
auto all();
The first returns true if all of the elements of the stream have a conversion to a
bool with value true. The second is a convenience for
the logical not of the first.
template<typename Predicate>
auto none(Predicate&& predicate);
Returns a stream terminator that returns true if none of the elements of the stream matches the given predicate.
If the stream is empty, the result is vacuously true. If the stream is
infinite, this operation will shortcut in the case that an element that
doesn't match the predicate is found.Specializations:
A specialization of none() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
auto none();
This returns true if none of the elements of the stream have a conversion to a
bool with value true.
template<typename Predicate>
auto not_all(Predicate&& predicate);
Returns a stream terminator that returns true if not all of the elements of the stream matches the given predicate.
If the stream is empty, the result is vacuously true. If the stream is
infinite, this operation will shortcut in the case that an element that
doesn't match the predicate is found.Specializations:
A specialization of not_all() exists for types that have an implicit conversion to bool. For these types, the following specialization exists:
auto not_all();
This returns true if not all of the elements of the stream have a conversion to a
bool with value true.
template<typename Function>
auto for_each(Function&& function);
Returns a stream terminator that calls the given function on each element of the stream.
auto to_vector();
auto to_list();
auto to_deque();
auto to_set();
auto to_multiset();
auto to_unordered_set();
auto to_unordered_multiset();
Returns a stream terminator that creates a container of the given type and inserts the elements of the
stream into that container thereafter returning the contianer. In addition, the
Stream class contains conversion operators for each of the above container
types. For example, the following two lines are equivalent
std::vector<int> result = stream;
auto result = stream | to_vector();
template<typename OutputIterator>
auto copy_to(OutputIterator iterator);
Returns a stream terminator that copies the elements of the stream into the given iterator, returning
the iterator one past the end of the sequence, much like
std::copy.
template<typename OutputIterator>
auto move_to(OutputIterator iterator);
Returns a stream terminator that moves the elements of the stream into the given iterator, returning
the iterator one past the end of the sequence, much like
std::move (the one in the algorithm package).
auto print_to(std::ostream& os, const char* delimiter = ' ');
Returns a stream terminator that prints the elements of the stream to the given
ostream, with a delimiter after every element. The return value
is simply the input stream, thus allowing chaining of stream insertions.
Calling stream | print_to(os, delimiter) is a convenience method for:
stream | copy_to(std::ostream_iterator<T>(os, delimiter));
return os;
Stream Reducers
Stream Reducers are class that provide convenient common reductions of streams for a variety of types.
template<typename In, typename Out>
class Reducer {
virtual Out initial(In&& in) const = 0;
virtual Out accumulate(Out&& out, In&& in) const = 0;
auto reducer() const;
}
Reducers provide the two main functions provided to the
reduce() terminator as class methods
for a subclass to overload. One constructs an instance of a reducer subclass and then
calls the reducer() method to turn it into a terminal operation
that reduces a stream using the desired functions.
Reducers are not included when you include "Stream.h", you must additionally include "Reducers.h" to get them.
template<typename In, typename Less = std::less<In>>
class Histogram : public Histogram<In, std::map<In, size_t, Less>> {
public:
Histogram(Less&& less = Less());
}
A reducer that accumulate the contents of a stream by counting the occurences of each
item that's tconsidered distinct by the given comparison operator.
Example:
std::map<int, int> result = MakeStream::from({1, 3, 1, 2, 1, 2}
| reducer::Histogram<int>().reducer();
// result contains {1: 3, 2: 2, 3: 1}
template<typename In, typename Result = double>
class SummaryStats : public Reducer<In, Stats<In, Result>> {
}
template<typename In, typename Out>
class Stat {
size_t number() const;
Out mean() const;
Out stddev() const;
In min() const;
In max() const;
};
A reducer that accumulates various summary statistics about the input data,
and returns a statistics object containing those.
Example:
auto stats = MakeStream::normal_randoms()
| limit(1000)
| reducers::SummaryStats<int>().reducer();
std::cout << stats << std::endl;
// Example output: N=1000, u=0.015, s=0.473248, min=-3, max=3
Stream Exceptions
class StreamException {
public:
explicit StreamException(const std::string& msg);
explicit StreamException(const char* msg);
std::string what() const;
}
The root class of all stream based exceptions.
class EmptyStreamException : public StreamException {
public:
explicit EmptyStreamException(const std::string& method);
}
The exception that gets thrown when attempting to call some terminal
stream operation that has no identity element for that operation on
a stream that is empty.
Example:
try {
MakeStream::empty() | min()
} catch(EmptyStreamException& e) {
std::cout << e.what() << std::endl;
}
Produces the following output:
No terminal result for operation stream::op::min.
class VacantStreamException : public StreamException {
public:
explicit EmptyStreamException(const std::string& method);
}
The exception that gets thrown when attempting to call any stream operation
on some stream that no longer holds data. A stream is considered vacant when
it has been moved from some other stream, or some stream operation has been
called on the stream. To check vacancy, use the
occupied() method.
Example:
auto stream1 = MakeStream::range(0, 5);
auto stream2 = std::move(stream1); // Copying disallowed
try {
stream1 | skip(2);
} catch(VacantStreamException& e) {
std::cout << e.what() << std::endl;
}
stream2 | skip(2); // Didn't save it so stream2's data is lost!
try {
stream2 | first();
} catch(VacantStreamException& e) {
cout << e.what() << endl;
}
Produces the following output:
Cannot perform operation stream::op::skip on a vacant stream
Cannot perform operation stream::op::first on a vacant stream
class ConsumedIteratorException : public StreamException {
public:
explicit ConsumedIteratorException(const std::string& op);
}
The exception that gets thrown when attempting to call some non dereferencing
operation on a stream iterator after that iterator is declared to be
in a "consumed" state. For a discussion of this, see the comments for
the begin() method.
Example:
auto stream = MakeStream::closed_range(1,10);
auto iter = stream.begin();
cout << *iter << endl;
auto temp_iter = iter++;
cout << *temp_iter << endl;
try {
++temp_iter;
} catch(ConsumedIteratorException& e) {
cout << e.what() << endl;
}
Produces the following output:
1
2
Cannot perform prefix increment on consumed stream iterator.
class StopStream : public StreamException {
public:
explicit StopStream();
}
An exception to be thrown by the user in any intermediary or generating
(but not terminating) stream operation, if they want the stream to stop
iterating at that point. This is most often helpful with the
generate()
method.
Example:
std::ifstream fin = /* ... */
auto stream = MakeStream::generate([&fin]() {
std::string line;
if(!std::getline(fin, line))
throw StopStream(); // Stop iterating when we reach EOF
return line;
});
// Silly version of take while
stream | peek([](auto& value) {
if(!pred(value))
throw StopStream();
});
Stream Recipes
These will go here eventually.C++ Streams are developed by Jonah Scheinerman. Please contact me if you have question or concerns.
C++ Streams are distributed under the MIT open source license.
Copyright © 2014 by Jonah Scheinerman