Source code for storey.steps.partition
# Copyright 2020 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from collections import namedtuple
from typing import Any, Callable
from storey import Flow
from storey.dtypes import _termination_obj
Partitioned = namedtuple("Partitioned", ["left", "right"], defaults=[None, None])
[docs]class Partition(Flow):
"""
Partitions events by calling a predicate function on each event. Each processed event results in a `Partitioned`
namedtuple of (left=Optional[Event], right=Optional[Event]).
For a given event, if the predicate function results in `True`, the event is assigned to `left`. Otherwise, the
event is assigned to `right`.
:param predicate: A predicate function that results in a boolean.
"""
def __init__(self, predicate: Callable[[Any], bool], **kwargs):
super().__init__(**kwargs)
self.predicate = predicate
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
if self.predicate(event):
event.body = Partitioned(left=event.body, right=None)
else:
event.body = Partitioned(left=None, right=event.body)
await self._do_downstream(event)