mirror of
https://github.com/rembo10/headphones.git
synced 2026-01-14 09:17:59 -05:00
Mostly just updating libraries, removing string encoding/decoding, fixing some edge cases. No new functionality was added in this commit.
699 lines
18 KiB
Python
699 lines
18 KiB
Python
"""Imported from the recipes section of the itertools documentation.
|
|
|
|
All functions taken from the recipes section of the itertools library docs
|
|
[1]_.
|
|
Some backward-compatible usability improvements have been made.
|
|
|
|
.. [1] http://docs.python.org/library/itertools.html#recipes
|
|
|
|
"""
|
|
import warnings
|
|
from collections import deque
|
|
from itertools import (
|
|
chain,
|
|
combinations,
|
|
count,
|
|
cycle,
|
|
groupby,
|
|
islice,
|
|
repeat,
|
|
starmap,
|
|
tee,
|
|
zip_longest,
|
|
)
|
|
import operator
|
|
from random import randrange, sample, choice
|
|
|
|
__all__ = [
|
|
'all_equal',
|
|
'before_and_after',
|
|
'consume',
|
|
'convolve',
|
|
'dotproduct',
|
|
'first_true',
|
|
'flatten',
|
|
'grouper',
|
|
'iter_except',
|
|
'ncycles',
|
|
'nth',
|
|
'nth_combination',
|
|
'padnone',
|
|
'pad_none',
|
|
'pairwise',
|
|
'partition',
|
|
'powerset',
|
|
'prepend',
|
|
'quantify',
|
|
'random_combination_with_replacement',
|
|
'random_combination',
|
|
'random_permutation',
|
|
'random_product',
|
|
'repeatfunc',
|
|
'roundrobin',
|
|
'sliding_window',
|
|
'tabulate',
|
|
'tail',
|
|
'take',
|
|
'triplewise',
|
|
'unique_everseen',
|
|
'unique_justseen',
|
|
]
|
|
|
|
|
|
def take(n, iterable):
|
|
"""Return first *n* items of the iterable as a list.
|
|
|
|
>>> take(3, range(10))
|
|
[0, 1, 2]
|
|
|
|
If there are fewer than *n* items in the iterable, all of them are
|
|
returned.
|
|
|
|
>>> take(10, range(3))
|
|
[0, 1, 2]
|
|
|
|
"""
|
|
return list(islice(iterable, n))
|
|
|
|
|
|
def tabulate(function, start=0):
|
|
"""Return an iterator over the results of ``func(start)``,
|
|
``func(start + 1)``, ``func(start + 2)``...
|
|
|
|
*func* should be a function that accepts one integer argument.
|
|
|
|
If *start* is not specified it defaults to 0. It will be incremented each
|
|
time the iterator is advanced.
|
|
|
|
>>> square = lambda x: x ** 2
|
|
>>> iterator = tabulate(square, -3)
|
|
>>> take(4, iterator)
|
|
[9, 4, 1, 0]
|
|
|
|
"""
|
|
return map(function, count(start))
|
|
|
|
|
|
def tail(n, iterable):
|
|
"""Return an iterator over the last *n* items of *iterable*.
|
|
|
|
>>> t = tail(3, 'ABCDEFG')
|
|
>>> list(t)
|
|
['E', 'F', 'G']
|
|
|
|
"""
|
|
return iter(deque(iterable, maxlen=n))
|
|
|
|
|
|
def consume(iterator, n=None):
|
|
"""Advance *iterable* by *n* steps. If *n* is ``None``, consume it
|
|
entirely.
|
|
|
|
Efficiently exhausts an iterator without returning values. Defaults to
|
|
consuming the whole iterator, but an optional second argument may be
|
|
provided to limit consumption.
|
|
|
|
>>> i = (x for x in range(10))
|
|
>>> next(i)
|
|
0
|
|
>>> consume(i, 3)
|
|
>>> next(i)
|
|
4
|
|
>>> consume(i)
|
|
>>> next(i)
|
|
Traceback (most recent call last):
|
|
File "<stdin>", line 1, in <module>
|
|
StopIteration
|
|
|
|
If the iterator has fewer items remaining than the provided limit, the
|
|
whole iterator will be consumed.
|
|
|
|
>>> i = (x for x in range(3))
|
|
>>> consume(i, 5)
|
|
>>> next(i)
|
|
Traceback (most recent call last):
|
|
File "<stdin>", line 1, in <module>
|
|
StopIteration
|
|
|
|
"""
|
|
# Use functions that consume iterators at C speed.
|
|
if n is None:
|
|
# feed the entire iterator into a zero-length deque
|
|
deque(iterator, maxlen=0)
|
|
else:
|
|
# advance to the empty slice starting at position n
|
|
next(islice(iterator, n, n), None)
|
|
|
|
|
|
def nth(iterable, n, default=None):
|
|
"""Returns the nth item or a default value.
|
|
|
|
>>> l = range(10)
|
|
>>> nth(l, 3)
|
|
3
|
|
>>> nth(l, 20, "zebra")
|
|
'zebra'
|
|
|
|
"""
|
|
return next(islice(iterable, n, None), default)
|
|
|
|
|
|
def all_equal(iterable):
|
|
"""
|
|
Returns ``True`` if all the elements are equal to each other.
|
|
|
|
>>> all_equal('aaaa')
|
|
True
|
|
>>> all_equal('aaab')
|
|
False
|
|
|
|
"""
|
|
g = groupby(iterable)
|
|
return next(g, True) and not next(g, False)
|
|
|
|
|
|
def quantify(iterable, pred=bool):
|
|
"""Return the how many times the predicate is true.
|
|
|
|
>>> quantify([True, False, True])
|
|
2
|
|
|
|
"""
|
|
return sum(map(pred, iterable))
|
|
|
|
|
|
def pad_none(iterable):
|
|
"""Returns the sequence of elements and then returns ``None`` indefinitely.
|
|
|
|
>>> take(5, pad_none(range(3)))
|
|
[0, 1, 2, None, None]
|
|
|
|
Useful for emulating the behavior of the built-in :func:`map` function.
|
|
|
|
See also :func:`padded`.
|
|
|
|
"""
|
|
return chain(iterable, repeat(None))
|
|
|
|
|
|
padnone = pad_none
|
|
|
|
|
|
def ncycles(iterable, n):
|
|
"""Returns the sequence elements *n* times
|
|
|
|
>>> list(ncycles(["a", "b"], 3))
|
|
['a', 'b', 'a', 'b', 'a', 'b']
|
|
|
|
"""
|
|
return chain.from_iterable(repeat(tuple(iterable), n))
|
|
|
|
|
|
def dotproduct(vec1, vec2):
|
|
"""Returns the dot product of the two iterables.
|
|
|
|
>>> dotproduct([10, 10], [20, 20])
|
|
400
|
|
|
|
"""
|
|
return sum(map(operator.mul, vec1, vec2))
|
|
|
|
|
|
def flatten(listOfLists):
|
|
"""Return an iterator flattening one level of nesting in a list of lists.
|
|
|
|
>>> list(flatten([[0, 1], [2, 3]]))
|
|
[0, 1, 2, 3]
|
|
|
|
See also :func:`collapse`, which can flatten multiple levels of nesting.
|
|
|
|
"""
|
|
return chain.from_iterable(listOfLists)
|
|
|
|
|
|
def repeatfunc(func, times=None, *args):
|
|
"""Call *func* with *args* repeatedly, returning an iterable over the
|
|
results.
|
|
|
|
If *times* is specified, the iterable will terminate after that many
|
|
repetitions:
|
|
|
|
>>> from operator import add
|
|
>>> times = 4
|
|
>>> args = 3, 5
|
|
>>> list(repeatfunc(add, times, *args))
|
|
[8, 8, 8, 8]
|
|
|
|
If *times* is ``None`` the iterable will not terminate:
|
|
|
|
>>> from random import randrange
|
|
>>> times = None
|
|
>>> args = 1, 11
|
|
>>> take(6, repeatfunc(randrange, times, *args)) # doctest:+SKIP
|
|
[2, 4, 8, 1, 8, 4]
|
|
|
|
"""
|
|
if times is None:
|
|
return starmap(func, repeat(args))
|
|
return starmap(func, repeat(args, times))
|
|
|
|
|
|
def _pairwise(iterable):
|
|
"""Returns an iterator of paired items, overlapping, from the original
|
|
|
|
>>> take(4, pairwise(count()))
|
|
[(0, 1), (1, 2), (2, 3), (3, 4)]
|
|
|
|
On Python 3.10 and above, this is an alias for :func:`itertools.pairwise`.
|
|
|
|
"""
|
|
a, b = tee(iterable)
|
|
next(b, None)
|
|
yield from zip(a, b)
|
|
|
|
|
|
try:
|
|
from itertools import pairwise as itertools_pairwise
|
|
except ImportError:
|
|
pairwise = _pairwise
|
|
else:
|
|
|
|
def pairwise(iterable):
|
|
yield from itertools_pairwise(iterable)
|
|
|
|
pairwise.__doc__ = _pairwise.__doc__
|
|
|
|
|
|
def grouper(iterable, n, fillvalue=None):
|
|
"""Collect data into fixed-length chunks or blocks.
|
|
|
|
>>> list(grouper('ABCDEFG', 3, 'x'))
|
|
[('A', 'B', 'C'), ('D', 'E', 'F'), ('G', 'x', 'x')]
|
|
|
|
"""
|
|
if isinstance(iterable, int):
|
|
warnings.warn(
|
|
"grouper expects iterable as first parameter", DeprecationWarning
|
|
)
|
|
n, iterable = iterable, n
|
|
args = [iter(iterable)] * n
|
|
return zip_longest(fillvalue=fillvalue, *args)
|
|
|
|
|
|
def roundrobin(*iterables):
|
|
"""Yields an item from each iterable, alternating between them.
|
|
|
|
>>> list(roundrobin('ABC', 'D', 'EF'))
|
|
['A', 'D', 'E', 'B', 'F', 'C']
|
|
|
|
This function produces the same output as :func:`interleave_longest`, but
|
|
may perform better for some inputs (in particular when the number of
|
|
iterables is small).
|
|
|
|
"""
|
|
# Recipe credited to George Sakkis
|
|
pending = len(iterables)
|
|
nexts = cycle(iter(it).__next__ for it in iterables)
|
|
while pending:
|
|
try:
|
|
for next in nexts:
|
|
yield next()
|
|
except StopIteration:
|
|
pending -= 1
|
|
nexts = cycle(islice(nexts, pending))
|
|
|
|
|
|
def partition(pred, iterable):
|
|
"""
|
|
Returns a 2-tuple of iterables derived from the input iterable.
|
|
The first yields the items that have ``pred(item) == False``.
|
|
The second yields the items that have ``pred(item) == True``.
|
|
|
|
>>> is_odd = lambda x: x % 2 != 0
|
|
>>> iterable = range(10)
|
|
>>> even_items, odd_items = partition(is_odd, iterable)
|
|
>>> list(even_items), list(odd_items)
|
|
([0, 2, 4, 6, 8], [1, 3, 5, 7, 9])
|
|
|
|
If *pred* is None, :func:`bool` is used.
|
|
|
|
>>> iterable = [0, 1, False, True, '', ' ']
|
|
>>> false_items, true_items = partition(None, iterable)
|
|
>>> list(false_items), list(true_items)
|
|
([0, False, ''], [1, True, ' '])
|
|
|
|
"""
|
|
if pred is None:
|
|
pred = bool
|
|
|
|
evaluations = ((pred(x), x) for x in iterable)
|
|
t1, t2 = tee(evaluations)
|
|
return (
|
|
(x for (cond, x) in t1 if not cond),
|
|
(x for (cond, x) in t2 if cond),
|
|
)
|
|
|
|
|
|
def powerset(iterable):
|
|
"""Yields all possible subsets of the iterable.
|
|
|
|
>>> list(powerset([1, 2, 3]))
|
|
[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)]
|
|
|
|
:func:`powerset` will operate on iterables that aren't :class:`set`
|
|
instances, so repeated elements in the input will produce repeated elements
|
|
in the output. Use :func:`unique_everseen` on the input to avoid generating
|
|
duplicates:
|
|
|
|
>>> seq = [1, 1, 0]
|
|
>>> list(powerset(seq))
|
|
[(), (1,), (1,), (0,), (1, 1), (1, 0), (1, 0), (1, 1, 0)]
|
|
>>> from more_itertools import unique_everseen
|
|
>>> list(powerset(unique_everseen(seq)))
|
|
[(), (1,), (0,), (1, 0)]
|
|
|
|
"""
|
|
s = list(iterable)
|
|
return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
|
|
|
|
|
|
def unique_everseen(iterable, key=None):
|
|
"""
|
|
Yield unique elements, preserving order.
|
|
|
|
>>> list(unique_everseen('AAAABBBCCDAABBB'))
|
|
['A', 'B', 'C', 'D']
|
|
>>> list(unique_everseen('ABBCcAD', str.lower))
|
|
['A', 'B', 'C', 'D']
|
|
|
|
Sequences with a mix of hashable and unhashable items can be used.
|
|
The function will be slower (i.e., `O(n^2)`) for unhashable items.
|
|
|
|
Remember that ``list`` objects are unhashable - you can use the *key*
|
|
parameter to transform the list to a tuple (which is hashable) to
|
|
avoid a slowdown.
|
|
|
|
>>> iterable = ([1, 2], [2, 3], [1, 2])
|
|
>>> list(unique_everseen(iterable)) # Slow
|
|
[[1, 2], [2, 3]]
|
|
>>> list(unique_everseen(iterable, key=tuple)) # Faster
|
|
[[1, 2], [2, 3]]
|
|
|
|
Similary, you may want to convert unhashable ``set`` objects with
|
|
``key=frozenset``. For ``dict`` objects,
|
|
``key=lambda x: frozenset(x.items())`` can be used.
|
|
|
|
"""
|
|
seenset = set()
|
|
seenset_add = seenset.add
|
|
seenlist = []
|
|
seenlist_add = seenlist.append
|
|
use_key = key is not None
|
|
|
|
for element in iterable:
|
|
k = key(element) if use_key else element
|
|
try:
|
|
if k not in seenset:
|
|
seenset_add(k)
|
|
yield element
|
|
except TypeError:
|
|
if k not in seenlist:
|
|
seenlist_add(k)
|
|
yield element
|
|
|
|
|
|
def unique_justseen(iterable, key=None):
|
|
"""Yields elements in order, ignoring serial duplicates
|
|
|
|
>>> list(unique_justseen('AAAABBBCCDAABBB'))
|
|
['A', 'B', 'C', 'D', 'A', 'B']
|
|
>>> list(unique_justseen('ABBCcAD', str.lower))
|
|
['A', 'B', 'C', 'A', 'D']
|
|
|
|
"""
|
|
return map(next, map(operator.itemgetter(1), groupby(iterable, key)))
|
|
|
|
|
|
def iter_except(func, exception, first=None):
|
|
"""Yields results from a function repeatedly until an exception is raised.
|
|
|
|
Converts a call-until-exception interface to an iterator interface.
|
|
Like ``iter(func, sentinel)``, but uses an exception instead of a sentinel
|
|
to end the loop.
|
|
|
|
>>> l = [0, 1, 2]
|
|
>>> list(iter_except(l.pop, IndexError))
|
|
[2, 1, 0]
|
|
|
|
Multiple exceptions can be specified as a stopping condition:
|
|
|
|
>>> l = [1, 2, 3, '...', 4, 5, 6]
|
|
>>> list(iter_except(lambda: 1 + l.pop(), (IndexError, TypeError)))
|
|
[7, 6, 5]
|
|
>>> list(iter_except(lambda: 1 + l.pop(), (IndexError, TypeError)))
|
|
[4, 3, 2]
|
|
>>> list(iter_except(lambda: 1 + l.pop(), (IndexError, TypeError)))
|
|
[]
|
|
|
|
"""
|
|
try:
|
|
if first is not None:
|
|
yield first()
|
|
while 1:
|
|
yield func()
|
|
except exception:
|
|
pass
|
|
|
|
|
|
def first_true(iterable, default=None, pred=None):
|
|
"""
|
|
Returns the first true value in the iterable.
|
|
|
|
If no true value is found, returns *default*
|
|
|
|
If *pred* is not None, returns the first item for which
|
|
``pred(item) == True`` .
|
|
|
|
>>> first_true(range(10))
|
|
1
|
|
>>> first_true(range(10), pred=lambda x: x > 5)
|
|
6
|
|
>>> first_true(range(10), default='missing', pred=lambda x: x > 9)
|
|
'missing'
|
|
|
|
"""
|
|
return next(filter(pred, iterable), default)
|
|
|
|
|
|
def random_product(*args, repeat=1):
|
|
"""Draw an item at random from each of the input iterables.
|
|
|
|
>>> random_product('abc', range(4), 'XYZ') # doctest:+SKIP
|
|
('c', 3, 'Z')
|
|
|
|
If *repeat* is provided as a keyword argument, that many items will be
|
|
drawn from each iterable.
|
|
|
|
>>> random_product('abcd', range(4), repeat=2) # doctest:+SKIP
|
|
('a', 2, 'd', 3)
|
|
|
|
This equivalent to taking a random selection from
|
|
``itertools.product(*args, **kwarg)``.
|
|
|
|
"""
|
|
pools = [tuple(pool) for pool in args] * repeat
|
|
return tuple(choice(pool) for pool in pools)
|
|
|
|
|
|
def random_permutation(iterable, r=None):
|
|
"""Return a random *r* length permutation of the elements in *iterable*.
|
|
|
|
If *r* is not specified or is ``None``, then *r* defaults to the length of
|
|
*iterable*.
|
|
|
|
>>> random_permutation(range(5)) # doctest:+SKIP
|
|
(3, 4, 0, 1, 2)
|
|
|
|
This equivalent to taking a random selection from
|
|
``itertools.permutations(iterable, r)``.
|
|
|
|
"""
|
|
pool = tuple(iterable)
|
|
r = len(pool) if r is None else r
|
|
return tuple(sample(pool, r))
|
|
|
|
|
|
def random_combination(iterable, r):
|
|
"""Return a random *r* length subsequence of the elements in *iterable*.
|
|
|
|
>>> random_combination(range(5), 3) # doctest:+SKIP
|
|
(2, 3, 4)
|
|
|
|
This equivalent to taking a random selection from
|
|
``itertools.combinations(iterable, r)``.
|
|
|
|
"""
|
|
pool = tuple(iterable)
|
|
n = len(pool)
|
|
indices = sorted(sample(range(n), r))
|
|
return tuple(pool[i] for i in indices)
|
|
|
|
|
|
def random_combination_with_replacement(iterable, r):
|
|
"""Return a random *r* length subsequence of elements in *iterable*,
|
|
allowing individual elements to be repeated.
|
|
|
|
>>> random_combination_with_replacement(range(3), 5) # doctest:+SKIP
|
|
(0, 0, 1, 2, 2)
|
|
|
|
This equivalent to taking a random selection from
|
|
``itertools.combinations_with_replacement(iterable, r)``.
|
|
|
|
"""
|
|
pool = tuple(iterable)
|
|
n = len(pool)
|
|
indices = sorted(randrange(n) for i in range(r))
|
|
return tuple(pool[i] for i in indices)
|
|
|
|
|
|
def nth_combination(iterable, r, index):
|
|
"""Equivalent to ``list(combinations(iterable, r))[index]``.
|
|
|
|
The subsequences of *iterable* that are of length *r* can be ordered
|
|
lexicographically. :func:`nth_combination` computes the subsequence at
|
|
sort position *index* directly, without computing the previous
|
|
subsequences.
|
|
|
|
>>> nth_combination(range(5), 3, 5)
|
|
(0, 3, 4)
|
|
|
|
``ValueError`` will be raised If *r* is negative or greater than the length
|
|
of *iterable*.
|
|
``IndexError`` will be raised if the given *index* is invalid.
|
|
"""
|
|
pool = tuple(iterable)
|
|
n = len(pool)
|
|
if (r < 0) or (r > n):
|
|
raise ValueError
|
|
|
|
c = 1
|
|
k = min(r, n - r)
|
|
for i in range(1, k + 1):
|
|
c = c * (n - k + i) // i
|
|
|
|
if index < 0:
|
|
index += c
|
|
|
|
if (index < 0) or (index >= c):
|
|
raise IndexError
|
|
|
|
result = []
|
|
while r:
|
|
c, n, r = c * r // n, n - 1, r - 1
|
|
while index >= c:
|
|
index -= c
|
|
c, n = c * (n - r) // n, n - 1
|
|
result.append(pool[-1 - n])
|
|
|
|
return tuple(result)
|
|
|
|
|
|
def prepend(value, iterator):
|
|
"""Yield *value*, followed by the elements in *iterator*.
|
|
|
|
>>> value = '0'
|
|
>>> iterator = ['1', '2', '3']
|
|
>>> list(prepend(value, iterator))
|
|
['0', '1', '2', '3']
|
|
|
|
To prepend multiple values, see :func:`itertools.chain`
|
|
or :func:`value_chain`.
|
|
|
|
"""
|
|
return chain([value], iterator)
|
|
|
|
|
|
def convolve(signal, kernel):
|
|
"""Convolve the iterable *signal* with the iterable *kernel*.
|
|
|
|
>>> signal = (1, 2, 3, 4, 5)
|
|
>>> kernel = [3, 2, 1]
|
|
>>> list(convolve(signal, kernel))
|
|
[3, 8, 14, 20, 26, 14, 5]
|
|
|
|
Note: the input arguments are not interchangeable, as the *kernel*
|
|
is immediately consumed and stored.
|
|
|
|
"""
|
|
kernel = tuple(kernel)[::-1]
|
|
n = len(kernel)
|
|
window = deque([0], maxlen=n) * n
|
|
for x in chain(signal, repeat(0, n - 1)):
|
|
window.append(x)
|
|
yield sum(map(operator.mul, kernel, window))
|
|
|
|
|
|
def before_and_after(predicate, it):
|
|
"""A variant of :func:`takewhile` that allows complete access to the
|
|
remainder of the iterator.
|
|
|
|
>>> it = iter('ABCdEfGhI')
|
|
>>> all_upper, remainder = before_and_after(str.isupper, it)
|
|
>>> ''.join(all_upper)
|
|
'ABC'
|
|
>>> ''.join(remainder) # takewhile() would lose the 'd'
|
|
'dEfGhI'
|
|
|
|
Note that the first iterator must be fully consumed before the second
|
|
iterator can generate valid results.
|
|
"""
|
|
it = iter(it)
|
|
transition = []
|
|
|
|
def true_iterator():
|
|
for elem in it:
|
|
if predicate(elem):
|
|
yield elem
|
|
else:
|
|
transition.append(elem)
|
|
return
|
|
|
|
def remainder_iterator():
|
|
yield from transition
|
|
yield from it
|
|
|
|
return true_iterator(), remainder_iterator()
|
|
|
|
|
|
def triplewise(iterable):
|
|
"""Return overlapping triplets from *iterable*.
|
|
|
|
>>> list(triplewise('ABCDE'))
|
|
[('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E')]
|
|
|
|
"""
|
|
for (a, _), (b, c) in pairwise(pairwise(iterable)):
|
|
yield a, b, c
|
|
|
|
|
|
def sliding_window(iterable, n):
|
|
"""Return a sliding window of width *n* over *iterable*.
|
|
|
|
>>> list(sliding_window(range(6), 4))
|
|
[(0, 1, 2, 3), (1, 2, 3, 4), (2, 3, 4, 5)]
|
|
|
|
If *iterable* has fewer than *n* items, then nothing is yielded:
|
|
|
|
>>> list(sliding_window(range(3), 4))
|
|
[]
|
|
|
|
For a variant with more features, see :func:`windowed`.
|
|
"""
|
|
it = iter(iterable)
|
|
window = deque(islice(it, n), maxlen=n)
|
|
if len(window) == n:
|
|
yield tuple(window)
|
|
for x in it:
|
|
window.append(x)
|
|
yield tuple(window)
|