Source code for storey.steps.sample

# Copyright 2020 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from enum import Enum
from typing import Callable, Optional, Union

from storey import Flow
from storey.dtypes import Event, _termination_obj


class EmitPeriod(Enum):
    FIRST = 1
    LAST = 2


[docs]class SampleWindow(Flow): """ Emits a single event in a window of `window_size` events, in accordance with `emit_period` and `emit_before_termination`. :param window_size: The size of the window we want to sample a single event from. :param emit_period: What event should this step emit for each `window_size` (default: EmitPeriod.First). Available options: 1.1) EmitPeriod.FIRST - will emit the first event in a window `window_size` events. 1.2) EmitPeriod.LAST - will emit the last event in a window of `window_size` events. :param emit_before_termination: On termination signal, should the step emit the last event it seen (default: False). Available options: 2.1) True - The last event seen will be emitted downstream. 2.2) False - The last event seen will NOT be emitted downstream. :param key: The key by which events are sampled. By default (None), events are not sampled by key. Other options may be: Set to '$key' to sample events by the Event.key property. set to 'str' key to sample events by Event.body[str]. set a Callable[[Event], str] to sample events by a custom key extractor. """ def __init__( self, window_size: int, emit_period: EmitPeriod = EmitPeriod.FIRST, emit_before_termination: bool = False, key: Optional[Union[str, Callable[[Event], str]]] = None, **kwargs, ): kwargs["full_event"] = True super().__init__(**kwargs) if window_size <= 1: raise ValueError(f"Expected window_size > 1, found {window_size}") if not isinstance(emit_period, EmitPeriod): raise ValueError(f"Expected emit_period of type `EmitPeriod`, got {type(emit_period)}") self._window_size = window_size self._emit_period = emit_period self._emit_before_termination = emit_before_termination self._per_key_count = dict() self._count = 0 self._last_event = None self._extract_key: Callable[[Event], str] = self._create_key_extractor(key) @staticmethod def _create_key_extractor(key) -> Callable: if key is None: return lambda event: None elif callable(key): return key elif isinstance(key, str): if key == "$key": return lambda event: event.key else: return lambda event: event.body[key] else: raise ValueError(f"Unsupported key type {type(key)}") async def _do(self, event): if event is _termination_obj: if self._last_event is not None: await self._do_downstream(self._last_event) return await self._do_downstream(_termination_obj) else: key = self._extract_key(event) if key is not None: if key not in self._per_key_count: self._per_key_count[key] = 1 else: self._per_key_count[key] += 1 count = self._per_key_count[key] else: self._count += 1 count = self._count if self._emit_before_termination: self._last_event = event if count == self._window_size: if key is not None: self._per_key_count[key] = 0 else: self._count = 0 if self._should_emit(count): self._last_event = None await self._do_downstream(event) def _should_emit(self, count): if self._emit_period == EmitPeriod.FIRST and count == 1: return True elif self._emit_period == EmitPeriod.LAST and count == self._window_size: return True return False