Source code for storey.steps.partition

from collections import namedtuple
from typing import Callable, Any

from storey import Flow
from storey.dtypes import _termination_obj

Partitioned = namedtuple("Partitioned", ["left", "right"], defaults=[None, None])


[docs]class Partition(Flow): """ Partitions events by calling a predicate function on each event. Each processed event results in a `Partitioned` namedtuple of (left=Optional[Event], right=Optional[Event]). For a given event, if the predicate function results in `True`, the event is assigned to `left`. Otherwise, the event is assigned to `right`. :param predicate: A predicate function that results in a boolean. """ def __init__(self, predicate: Callable[[Any], bool], **kwargs): super().__init__(**kwargs) self.predicate = predicate async def _do(self, event): if event is _termination_obj: return await self._do_downstream(_termination_obj) else: if self.predicate(event): event.body = Partitioned(left=event.body, right=None) else: event.body = Partitioned(left=None, right=event.body) await self._do_downstream(event)