storey.transformations - Graph transformations#

Graph transformations are contained in the storey.transformations module. For convenience, they can also be imported directly from the storey package. Note that the transformation functions are actually encapsulated in classes, so that they can be referenced by name of class from graph step definitions.

class storey.transformations.AggregateByKey(aggregates: ~typing.List[~storey.dtypes.FieldAggregator] | ~typing.List[~typing.Dict[str, object]], table: ~storey.table.Table | str, key_field: str | ~typing.List[str] | ~typing.Callable[[~storey.dtypes.Event], object] | None = None, time_field: str | ~typing.Callable[[~storey.dtypes.Event], object] | None = None, emit_policy: ~storey.dtypes.EmitPolicy | ~typing.Dict[str, object] = <storey.dtypes.EmitEveryEvent object>, augmentation_fn: ~typing.Callable[[~storey.dtypes.Event, ~typing.Dict[str, object]], ~storey.dtypes.Event] | None = None, enrich_with: ~typing.List[str] | None = None, aliases: ~typing.Dict[str, str] | None = None, use_windows_from_schema: bool = False, time_format: str | None = None, **kwargs)[source]#

Aggregates the data into the table object provided for later persistence, and outputs an event enriched with the requested aggregation features. Persistence is done via the NoSqlTarget step and based on the Cache object persistence settings.

Parameters:
  • aggregates -- List of aggregates to apply for each event. accepts either list of FieldAggregators or a dictionary describing FieldAggregators.

  • table -- A Table object or name for persistence of aggregations. If a table name is provided, it will be looked up in the context object passed in kwargs.

  • key_field -- Key field to aggregate by, accepts either a string representing the key field or a key extracting function. Defaults to the key in the event's metadata. (Optional)

  • time_field -- Time field to aggregate by, accepts either a string representing the time field or a time extracting function. Defaults to the processing time in the event's metadata. (Optional)

  • emit_policy -- Policy indicating when the data will be emitted. Defaults to EmitEveryEvent

  • augmentation_fn -- Function that augments the features into the event's body. Defaults to updating a dict. (Optional)

  • enrich_with -- List of attributes names from the associated storage object to be fetched and added to every event. (Optional)

  • aliases -- Dictionary specifying aliases for enriched or aggregate columns, of the format {'col_name': 'new_col_name'}. (Optional)

  • time_format -- If the value of the time field is of type string, this format will be used to parse it, as defined in datetime.strptime(). By default, parsing will follow ISO-8601.

class storey.transformations.Assert(**kwargs)[source]#

Exposes an API for testing the flow between steps.

class storey.transformations.Batch(*args, **kwargs)[source]#

Batches events into lists of up to max_events events. Each emitted list contained max_events events, unless flush_after_seconds seconds have passed since the first event in the batch was received, at which the batch is emitted with potentially fewer than max_events event.

Parameters:
  • max_events -- Maximum number of events per emitted batch. Set to None to emit all events in one batch on flow termination.

  • flush_after_seconds -- Maximum number of seconds to wait before a batch is emitted.

  • key -- The key by which events are grouped. By default (None), events are not grouped. Other options may be: Set to '$x' to group events by the x attribute of the event. E.g. "$key" or "$path". set to other string 'str' to group events by Event.body[str]. set a Callable[Any, Any] to group events by a a custom key extractor.

class storey.transformations.Choice(choice_array, default=None, **kwargs)[source]#

Redirects each input element into at most one of multiple downstreams.

Parameters:
  • choice_array (tuple of (Flow, Function (Event=>boolean))) -- a list of (downstream, condition) tuples, where downstream is a step and condition is a function. The first condition in the list to evaluate as true for an input element causes that element to be redirected to that downstream step.

  • default (Flow) -- a default step for events that did not match any condition in choice_array. If not set, elements that don't match any condition will be discarded.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (Choice).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.Extend(fn, long_running=None, pass_context=None, **kwargs)[source]#

Adds fields to each incoming event.

Parameters:
  • fn (Function (Event=>Dict)) -- Function to transform each event to a dictionary. The fields in the returned dictionary are then added to the original event.

  • long_running (boolean) -- Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (Extend).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.Filter(fn, long_running=None, pass_context=None, **kwargs)[source]#

Filters events based on a user-provided function.

Parameters:
  • fn (Function (Event=>boolean)) -- Function to decide whether to keep each event.

  • long_running (boolean) -- Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (Filter).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.FlatMap(fn, long_running=None, pass_context=None, **kwargs)[source]#

Maps, or transforms, each incoming event into any number of events.

Parameters:
  • fn (Function (Event=>list of Event)) -- Function to transform each event to a list of events.

  • long_running (boolean) -- Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (FlatMap).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

storey.transformations.Flatten(**kwargs)[source]#

Flatten is equivalent to FlatMap(lambda x: x).

class storey.transformations.ForEach(fn, long_running=None, pass_context=None, **kwargs)[source]#

Applies given function on each event in the stream, passes original event downstream.

class storey.transformations.JoinWithTable(table: Table | str, key_extractor: str | Callable[[Event], str], attributes: List[str] | None = None, inner_join: bool = False, join_function: Callable[[Any, Dict[str, object]], Any] | None = None, **kwargs)[source]#

Joins each event with data from the given table.

Parameters:
  • table -- A Table object or name to join with. If a table name is provided, it will be looked up in the context.

  • key_extractor -- Key's column name or a function for extracting the key, for table access from an event.

  • attributes -- A comma-separated list of attributes to be queried for. Defaults to all attributes.

  • inner_join -- Whether to drop events when the table does not have a matching entry (join_function won't be called in such a case). Defaults to False.

  • join_function -- Joins the original event with relevant data received from the storage. Event is dropped when this function returns None. Defaults to assume the event's body is a dict-like object and updating it.

  • name -- Name of this step, as it should appear in logs. Defaults to class name (JoinWithTable).

  • full_event -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

  • context -- Context object that holds global configurations and secrets.

class storey.transformations.Map(fn, long_running=None, pass_context=None, **kwargs)[source]#

Maps, or transforms, incoming events using a user-provided function.

Parameters:
  • fn (Function (Event=>Event)) -- Function to apply to each event

  • long_running (boolean) -- Whether fn is a long-running function. Long-running functions are run in an executor to avoid blocking other concurrent processing. Default is False.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (Map).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.MapClass(long_running=None, **kwargs)[source]#

Similar to Map, but instead of a function argument, this class should be extended and its do() method overridden.

class storey.transformations.MapWithState(initial_state, fn, group_by_key=False, **kwargs)[source]#
Maps, or transforms, incoming events using a stateful user-provided function, and an initial state,

which may be a database table.

Parameters:
  • initial_state (dictionary or Table if group_by_key is True. Any object otherwise.) -- Initial state for the computation. If group_by_key is True, this must be a dictionary or a Table object.

  • fn (Function ((Event, state)=>(Event, state))) -- A function to run on each event and the current state. Must yield an event and an updated state.

  • group_by_key (boolean) -- Whether the state is computed by key. Optional. Default to False.

  • full_event (boolean) -- Whether fn will receive and return an Event object or only the body (payload). Optional. Defaults to False (body only).

class storey.transformations.Partition(predicate: Callable[[Any], bool], **kwargs)[source]#

Partitions events by calling a predicate function on each event. Each processed event results in a Partitioned namedtuple of (left=Optional[Event], right=Optional[Event]).

For a given event, if the predicate function results in True, the event is assigned to left. Otherwise, the event is assigned to right.

Parameters:

predicate -- A predicate function that results in a boolean.

class storey.transformations.ReifyMetadata(mapping: Iterable[str], **kwargs)[source]#

Inserts event metadata into the event body.

Parameters:
  • mapping -- Dictionary from event attribute name to entry key in the event body (which must be a dictionary). Alternatively, an iterable of names may be provided, and these will be used as both attribute name and entry key.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (ReifyMetadata).

class storey.transformations.SampleWindow(window_size: int, emit_period: EmitPeriod = EmitPeriod.FIRST, emit_before_termination: bool = False, key: str | Callable[[Event], str] | None = None, **kwargs)[source]#

Emits a single event in a window of window_size events, in accordance with emit_period and emit_before_termination.

Parameters:
  • window_size -- The size of the window we want to sample a single event from.

  • emit_period -- What event should this step emit for each window_size (default: EmitPeriod.First). Available options: 1.1) EmitPeriod.FIRST - will emit the first event in a window window_size events. 1.2) EmitPeriod.LAST - will emit the last event in a window of window_size events.

  • emit_before_termination -- On termination signal, should the step emit the last event it seen (default: False). Available options: 2.1) True - The last event seen will be emitted downstream. 2.2) False - The last event seen will NOT be emitted downstream.

  • key -- The key by which events are sampled. By default (None), events are not sampled by key. Other options may be: Set to '$key' to sample events by the Event.key property. set to 'str' key to sample events by Event.body[str]. set a Callable[[Event], str] to sample events by a custom key extractor.

class storey.transformations.SendToHttp(request_builder, join_from_response, **kwargs)[source]#

Joins each event with data from any HTTP source. Used for event augmentation.

Parameters:
  • request_builder (Function (Event=>HttpRequest)) -- Creates an HTTP request from the event. This request is then sent to its destination.

  • join_from_response (Function ((Event, HttpResponse)=>Event)) -- Joins the original event with the HTTP response into a new event.

  • name (string) -- Name of this step, as it should appear in logs. Defaults to class name (SendToHttp).

  • full_event (boolean) -- Whether user functions should receive and return Event objects (when True), or only the payload (when False). Defaults to False.

class storey.transformations.ToDataFrame(index: str | None = None, columns: List[str] | None = None, **kwargs)[source]#

Create pandas data frame from events. Can appear in the middle of the flow, as opposed to ReduceToDataFrame

Parameters:
  • index -- Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.

  • columns -- List of column names to be passed as-is to the DataFrame constructor. Optional.

for additional params, see documentation of storey.flow.Flow