# Copyright 2020 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import copy
import datetime
import inspect
import multiprocessing
import os
import pickle
import time
import traceback
import uuid
from asyncio import Task
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from typing import Any, Callable, Collection, Dict, Iterable, List, Optional, Set, Union
import aiohttp
from .dtypes import Event, FlowError, V3ioError, _termination_obj, known_driver_schemes
from .queue import AsyncQueue
from .table import Table
from .utils import _split_path, get_in, stringify_key, update_in
class Flow:
_legal_first_step = False
def __init__(
self,
recovery_step=None,
termination_result_fn=lambda x, y: x if x is not None else y,
context=None,
**kwargs,
):
self._outlets = []
self._inlets = []
self._recovery_step = recovery_step
if recovery_step:
if isinstance(recovery_step, dict):
for step in recovery_step.values():
step._inlets.append(self)
else:
recovery_step._inlets.append(self)
self._termination_result_fn = termination_result_fn
self.context = context
self.verbose = context and getattr(context, "verbose", False)
self.logger = getattr(self.context, "logger", None) if self.context else None
self._kwargs = kwargs
self._full_event = kwargs.get("full_event")
self._input_path = kwargs.get("input_path")
self._result_path = kwargs.get("result_path")
self._runnable = False
name = kwargs.get("name", None)
if name:
self.name = name
else:
self.name = type(self).__name__
self._closeables = []
def _init(self):
self._termination_received = 0
self._termination_result = None
def to_dict(self, fields=None, exclude=None):
"""convert the step object to a python dictionary"""
fields = fields or getattr(self, "_dict_fields", None)
if not fields:
fields = list(inspect.signature(self.__init__).parameters.keys())
if exclude:
fields = [field for field in fields if field not in exclude]
meta_keys = [
"context",
"name",
"input_path",
"result_path",
"full_event",
"kwargs",
]
args = {
key: getattr(self, key) for key in fields if getattr(self, key, None) is not None and key not in meta_keys
}
# add storey kwargs or extra kwargs
if "kwargs" in fields and (hasattr(self, "kwargs") or hasattr(self, "_kwargs")):
kwargs = getattr(self, "kwargs", {}) or getattr(self, "_kwargs", {})
for key, value in kwargs.items():
if key not in meta_keys:
args[key] = value
mod_name = self.__class__.__module__
class_path = self.__class__.__qualname__
if mod_name != "__main__":
class_path = f"{mod_name}.{class_path}"
struct = {
"class_name": class_path,
"name": self.name or self.__class__.__name__,
"class_args": args,
}
for parameter_name in [
("kind", "_STEP_KIND"),
"input_path",
"result_path",
"full_event",
]:
if isinstance(parameter_name, tuple):
parameter_name, field_name = parameter_name
else:
field_name = f"_{parameter_name}"
if hasattr(self, field_name):
field_value = getattr(self, field_name)
if field_value is not None:
struct[parameter_name] = field_value
return struct
def _to_code(self, taken: Set[str]):
class_name = type(self).__name__
base_var_name = ""
for c in class_name:
if c.isupper() and base_var_name:
base_var_name += "_"
base_var_name += c.lower()
i = 0
while True:
var_name = f"{base_var_name}{i}"
if var_name not in taken:
taken.add(var_name)
break
i += 1
taken.add(var_name)
param_list = []
for key, value in self._kwargs.items():
if isinstance(value, str):
value = f"'{value}'"
param_list.append(f"{key}={value}")
param_str = ", ".join(param_list)
step = f"{var_name} = {class_name}({param_str})"
steps = [step]
tos = []
for outlet in self._outlets:
outlet_var_name, outlet_steps, outlet_tos = outlet._to_code(taken)
steps.extend(outlet_steps)
tos.append(f"{var_name}.to({outlet_var_name})")
tos.extend(outlet_tos)
return var_name, steps, tos
def to_code(self):
_, steps, tos = self._to_code(set())
result = "\n".join(steps)
result += "\n\n"
result += "\n".join(tos)
result += "\n"
return result
def to(self, outlet):
if outlet._legal_first_step:
raise ValueError(f"{outlet.name} can only appear as the first step of a flow")
self._outlets.append(outlet)
outlet._inlets.append(self)
return outlet
def set_recovery_step(self, outlet):
self._recovery_step = outlet
return self
def _get_recovery_step(self, exception):
if isinstance(self._recovery_step, dict):
return self._recovery_step.get(type(exception), None)
else:
return self._recovery_step
def run(self):
if not self._legal_first_step and not self._runnable:
raise ValueError("Flow must start with a source")
self._init()
outlets = []
outlets.extend(self._outlets)
outlets.extend(self._get_recovery_steps())
for outlet in outlets:
outlet._runnable = True
self._closeables.extend(outlet.run())
return self._closeables
def _get_recovery_steps(self):
if self._recovery_step:
if isinstance(self._recovery_step, dict):
return list(self._recovery_step.values())
else:
return [self._recovery_step]
return []
async def run_async(self):
raise NotImplementedError
async def _do(self, event):
raise NotImplementedError
async def _do_and_recover(self, event):
try:
return await self._do(event)
except BaseException as ex:
if getattr(ex, "_raised_by_storey_step", None) is not None:
raise ex
ex._raised_by_storey_step = self
recovery_step = self._get_recovery_step(ex)
if recovery_step is None:
if self.context and hasattr(self.context, "push_error"):
message = traceback.format_exc()
if event._awaitable_result:
none_or_coroutine = event._awaitable_result._set_error(ex)
if none_or_coroutine:
await none_or_coroutine
if self.logger:
self.logger.error(f"Pushing error to error stream: {ex}\n{message}")
self.context.push_error(event, f"{ex}\n{message}", source=self.name)
return
else:
raise ex
event.origin_state = self.name
event.error = ex
return await recovery_step._do(event)
@staticmethod
def _event_string(event):
result = "Event("
if event.id:
result += f"id={event.id}, "
if getattr(event, "key", None):
result += f"key={event.key}, "
if getattr(event, "time", None):
result += f"processing_time={event.processing_time}, "
if getattr(event, "path", None):
result += f"path={event.path}, "
result += f"body={event.body})"
return result
def _should_terminate(self):
return self._termination_received == len(self._inlets)
async def _do_downstream(self, event, outlets=None):
outlets = self._outlets if outlets is None else outlets
if not outlets:
return
if event is _termination_obj:
# Only propagate the termination object once we received one per inlet
outlets[0]._termination_received += 1
if outlets[0]._should_terminate():
self._termination_result = await outlets[0]._do(_termination_obj)
for outlet in outlets[1:] + self._get_recovery_steps():
outlet._termination_received += 1
if outlet._should_terminate():
self._termination_result = self._termination_result_fn(
self._termination_result, await outlet._do(_termination_obj)
)
return self._termination_result
# If there is more than one outlet, allow concurrent execution.
tasks = []
if len(outlets) > 1:
awaitable_result = event._awaitable_result
event._awaitable_result = None
original_events = getattr(event, "_original_events", None)
# Temporarily delete self-reference to avoid deepcopy getting stuck in an infinite loop
event._original_events = None
for i in range(1, len(outlets)):
event_copy = copy.deepcopy(event)
event_copy._awaitable_result = awaitable_result
event_copy._original_events = original_events
tasks.append(asyncio.get_running_loop().create_task(outlets[i]._do_and_recover(event_copy)))
# Set self-reference back after deepcopy
event._original_events = original_events
event._awaitable_result = awaitable_result
if self.verbose and self.logger:
step_name = self.name
event_string = self._event_string(event)
self.logger.debug(f"{step_name} -> {outlets[0].name} | {event_string}")
await outlets[0]._do_and_recover(event) # Optimization - avoids creating a task for the first outlet.
for i, task in enumerate(tasks, start=1):
if self.verbose and self.logger:
self.logger.debug(f"{step_name} -> {outlets[i].name} | {event_string}")
await task
def _get_event_or_body(self, event):
if self._full_event:
return event
elif self._input_path:
if not hasattr(event.body, "__getitem__"):
raise TypeError("input_path parameter supports only dict-like event bodies")
return get_in(event.body, self._input_path)
else:
return event.body
def _user_fn_output_to_event(self, event, fn_result):
if self._full_event:
return fn_result
else:
mapped_event = copy.copy(event)
if self._result_path:
if not hasattr(event.body, "__getitem__"):
raise TypeError("result_path parameter supports only dict-like event bodies")
update_in(mapped_event.body, self._result_path, fn_result)
else:
mapped_event.body = fn_result
return mapped_event
def _check_step_in_flow(self, type_to_check):
if isinstance(self, type_to_check):
return True
for outlet in self._outlets:
if outlet._check_step_in_flow(type_to_check):
return True
if isinstance(self._recovery_step, Flow):
if self._recovery_step._check_step_in_flow(type_to_check):
return True
elif isinstance(self._recovery_step, dict):
for step in self._recovery_step.values():
if step._check_step_in_flow(type_to_check):
return True
return False
class WithUUID:
def __init__(self):
self._current_uuid_base = None
self._current_uuid_count = 0
def _get_uuid(self):
if not self._current_uuid_base or self._current_uuid_count == 1024:
self._current_uuid_base = uuid.uuid4().hex
self._current_uuid_count = 0
result = f"{self._current_uuid_base}-{self._current_uuid_count:04}"
self._current_uuid_count += 1
return result
[docs]class Choice(Flow):
"""
Redirects each input element into any number of predetermined downstream steps. Override select_outlets()
to route events to any number of downstream steps.
"""
def _init(self):
super()._init()
self._name_to_outlet = {}
for outlet in self._outlets:
if outlet.name in self._name_to_outlet:
raise ValueError(f"Ambiguous outlet name '{outlet.name}' in Choice step")
self._name_to_outlet[outlet.name] = outlet
# TODO: hacky way of supporting mlrun preview, which replaces targets with a DFTarget
self._passthrough_for_preview = list(self._name_to_outlet) == ["dataframe"]
[docs] def select_outlets(self, event) -> Collection[str]:
"""
Override this method to route events based on a customer logic. The default implementation will route all
events to all outlets.
"""
return self._name_to_outlet.keys()
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
event_body = event if self._full_event else event.body
outlet_names = self.select_outlets(event_body)
outlets = []
if self._passthrough_for_preview:
outlet = self._name_to_outlet["dataframe"]
outlets.append(outlet)
else:
if len(set(outlet_names)) != len(outlet_names):
raise ValueError(
"select_outlets() returned duplicate outlets among the defined outlets: "
+ ", ".join(outlet_names)
)
for outlet_name in outlet_names:
if outlet_name not in self._name_to_outlet:
raise ValueError(
f"select_outlets() returned outlet name '{outlet_name}', which is not one of the "
f"defined outlets: " + ", ".join(self._name_to_outlet)
)
outlet = self._name_to_outlet[outlet_name]
outlets.append(outlet)
return await self._do_downstream(event, outlets=outlets)
class Recover(Flow):
def __init__(self, exception_to_downstream, **kwargs):
Flow.__init__(self, **kwargs)
self._exception_to_downstream = exception_to_downstream
async def _do(self, event):
if not self._outlets or event is _termination_obj:
return await super()._do_downstream(event)
else:
try:
await super()._do_downstream(event)
except BaseException as ex:
typ = type(ex)
if typ in self._exception_to_downstream:
await self._exception_to_downstream[typ]._do(event)
else:
raise ex
class _UnaryFunctionFlow(Flow):
def __init__(self, fn, long_running=None, pass_context=None, **kwargs):
super().__init__(**kwargs)
if not callable(fn):
raise TypeError(f"Expected a callable, got {type(fn)}")
self._is_async = asyncio.iscoroutinefunction(fn)
if self._is_async and long_running:
raise ValueError("long_running=True cannot be used in conjunction with a coroutine")
self._long_running = long_running
self._fn = fn
self._pass_context = pass_context
async def _call(self, element):
if self._long_running:
res = await asyncio.get_running_loop().run_in_executor(None, self._fn, element)
else:
kwargs = {}
if self._pass_context:
kwargs = {"context": self.context}
res = self._fn(element, **kwargs)
if self._is_async:
res = await res
return res
async def _do_internal(self, element, fn_result):
raise NotImplementedError()
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
element = self._get_event_or_body(event)
fn_result = await self._call(element)
await self._do_internal(event, fn_result)
class DropColumns(Flow):
def __init__(self, columns, **kwargs):
super().__init__(**kwargs)
if not isinstance(columns, list):
columns = [columns]
self._columns = columns
async def _do(self, event):
if event is not _termination_obj:
new_body = copy.copy(event.body)
for column in self._columns:
new_body.pop(column, None)
event.body = new_body
return await self._do_downstream(event)
[docs]class Map(_UnaryFunctionFlow):
"""Maps, or transforms, incoming events using a user-provided function.
:param fn: Function to apply to each event
:type fn: Function (Event=>Event)
:param long_running: Whether fn is a long-running function. Long-running functions are run in an executor to
avoid blocking other concurrent processing. Default is False.
:type long_running: boolean
:param name: Name of this step, as it should appear in logs. Defaults to class name (Map).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True),
or only the payload (when False). Defaults to False.
:type full_event: boolean
"""
async def _do_internal(self, event, fn_result):
mapped_event = self._user_fn_output_to_event(event, fn_result)
await self._do_downstream(mapped_event)
[docs]class Filter(_UnaryFunctionFlow):
"""Filters events based on a user-provided function.
:param fn: Function to decide whether to keep each event.
:type fn: Function (Event=>boolean)
:param long_running: Whether fn is a long-running function. Long-running functions are run in an executor to
avoid blocking other concurrent processing. Default is False.
:type long_running: boolean
:param name: Name of this step, as it should appear in logs. Defaults to class name (Filter).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True), or only the
payload (when False). Defaults to False.
:type full_event: boolean
"""
async def _do_internal(self, event, keep):
if keep:
await self._do_downstream(event)
[docs]class FlatMap(_UnaryFunctionFlow):
"""Maps, or transforms, each incoming event into any number of events.
:param fn: Function to transform each event to a list of events.
:type fn: Function (Event=>list of Event)
:param long_running: Whether fn is a long-running function. Long-running functions are run in an executor
to avoid blocking other concurrent processing. Default is False.
:type long_running: boolean
:param name: Name of this step, as it should appear in logs. Defaults to class name (FlatMap).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True), or only
the payload (when False). Defaults to False.
:type full_event: boolean
"""
async def _do_internal(self, event, fn_result):
for fn_result_element in fn_result:
mapped_event = self._user_fn_output_to_event(event, fn_result_element)
await self._do_downstream(mapped_event)
[docs]class Extend(_UnaryFunctionFlow):
"""Adds fields to each incoming event.
:param fn: Function to transform each event to a dictionary. The fields in the returned dictionary are then added
to the original event.
:type fn: Function (Event=>Dict)
:param long_running: Whether fn is a long-running function. Long-running functions are run in an executor to avoid
blocking other concurrent processing. Default is False.
:type long_running: boolean
:param name: Name of this step, as it should appear in logs. Defaults to class name (Extend).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True), or only the
payload (when False). Defaults to False.
:type full_event: boolean
"""
async def _do_internal(self, event, fn_result):
for key, value in fn_result.items():
event.body[key] = value
await self._do_downstream(event)
class _FunctionWithStateFlow(Flow):
def __init__(self, initial_state, fn, group_by_key=False, **kwargs):
super().__init__(**kwargs)
if not callable(fn):
raise TypeError(f"Expected a callable, got {type(fn)}")
self._is_async = asyncio.iscoroutinefunction(fn)
self._state = initial_state
if isinstance(self._state, str):
should_get_from_context = False
for known_scheme in known_driver_schemes:
if self._state.startswith(f"{known_scheme}://"):
should_get_from_context = True
break
if should_get_from_context:
if not self.context:
raise TypeError("Table can not be string if no context was provided to the step")
self._state = self.context.get_table(self._state)
self._fn = fn
self._group_by_key = group_by_key
if hasattr(self._state, "close"):
self._closeables = [self._state]
async def _call(self, event):
element = self._get_event_or_body(event)
if self._group_by_key:
safe_key = stringify_key(event.key)
if isinstance(self._state, Table):
key_data = await self._state._get_or_load_static_attributes_by_key(safe_key)
else:
key_data = self._state[event.key]
res, new_state = self._fn(element, key_data)
async with self._state._get_lock(safe_key):
self._state._update_static_attrs(safe_key, new_state)
self._state._pending_events.append(event)
self._state._init_flush_task()
else:
res, self._state = self._fn(element, self._state)
if self._is_async:
res = await res
return res
async def _do_internal(self, element, fn_result):
raise NotImplementedError()
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
fn_result = await self._call(event)
await self._do_internal(event, fn_result)
[docs]class MapWithState(_FunctionWithStateFlow):
"""Maps, or transforms, incoming events using a stateful user-provided function, and an initial state,
which may be a database table.
:param initial_state: Initial state for the computation. If group_by_key is True, this must be a dictionary or
a Table object.
:type initial_state: dictionary or Table if group_by_key is True. Any object otherwise.
:param fn: A function to run on each event and the current state. Must yield an event and an updated state.
:type fn: Function ((Event, state)=>(Event, state))
:param group_by_key: Whether the state is computed by key. Optional. Default to False.
:type group_by_key: boolean
:param full_event: Whether fn will receive and return an Event object or only the body (payload).
Optional. Defaults to False (body only).
:type full_event: boolean
"""
async def _do_internal(self, event, mapped_element):
mapped_event = self._user_fn_output_to_event(event, mapped_element)
await self._do_downstream(mapped_event)
[docs]class MapClass(Flow):
"""Similar to Map, but instead of a function argument, this class should be extended and its do()
method overridden."""
def __init__(self, long_running=None, **kwargs):
super().__init__(**kwargs)
self._is_async = asyncio.iscoroutinefunction(self.do)
if self._is_async and long_running:
raise ValueError("long_running=True cannot be used in conjunction with a coroutine do()")
self._long_running = long_running
self._filter = False
def filter(self):
# used in the .do() code to signal filtering
self._filter = True
def do(self, event):
raise NotImplementedError()
async def _call(self, event):
if self._long_running:
res = await asyncio.get_running_loop().run_in_executor(None, self.do, event)
else:
res = self.do(event)
if self._is_async:
res = await res
return res
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
element = self._get_event_or_body(event)
fn_result = await self._call(element)
if not self._filter:
mapped_event = self._user_fn_output_to_event(event, fn_result)
await self._do_downstream(mapped_event)
else:
self._filter = False # clear the flag for future runs
class Rename(Flow):
"""
Rename fields in event body.
:param mapping: Dictionary from old name to new name.
:param name: Name of this step, as it should appear in logs. Defaults to class name (Rename).
:type name: string
"""
def __init__(self, mapping: Dict[str, str], **kwargs):
super().__init__(**kwargs)
self.mapping = mapping
async def _do(self, event):
if event is not _termination_obj:
for old_name, new_name in self.mapping.items():
if old_name in event.body:
event.body[new_name] = event.body.get(old_name)
del event.body[old_name]
return await self._do_downstream(event)
class Complete(Flow):
"""
Completes the AwaitableResult associated with incoming events.
:param name: Name of this step, as it should appear in logs. Defaults to class name (Complete).
:type name: string
:param full_event: Whether to complete with an Event object (when True) or only the payload
(when False). Default to False.
:type full_event: boolean
"""
async def _do(self, event):
termination_result = await self._do_downstream(event)
if event is not _termination_obj:
if event._awaitable_result:
result = self._get_event_or_body(event)
res = event._awaitable_result._set_result(result)
if res:
await res
return termination_result
class Reduce(Flow):
"""
Reduces incoming events into a single value which is returned upon the successful termination of the flow.
:param initial_value: Starting value. When the first event is received, fn will be applied to the
initial_value and that event.
:type initial_value: object
:param fn: Function to apply to the current value and each event.
:type fn: Function ((object, Event) => object)
:param name: Name of this step, as it should appear in logs. Defaults to class name (Reduce).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True),
or only the payload (when False). Defaults to False.
:type full_event: boolean
"""
def __init__(self, initial_value, fn, **kwargs):
kwargs["initial_value"] = initial_value
super().__init__(**kwargs)
if not callable(fn):
raise TypeError(f"Expected a callable, got {type(fn)}")
self._is_async = asyncio.iscoroutinefunction(fn)
self._fn = fn
self._initial_value = initial_value
def _init(self):
super()._init()
self._result = self._initial_value
def to(self, outlet):
raise ValueError("Reduce is a terminal step. It cannot be piped further.")
async def _do(self, event):
if event is _termination_obj:
return self._result
else:
if self._full_event:
elem = event
else:
elem = event.body
res = self._fn(self._result, elem)
if self._is_async:
res = await res
self._result = res
class HttpRequest:
"""A class representing an HTTP request, with method, url, body, and headers.
:param method: HTTP method (e.g. GET).
:type method: string
:param url: Target URL (http and https schemes supported).
:type url: string
:param body: Request body.
:type body: bytes or string
:param headers: Request headers, in the form of a dictionary. Optional. Defaults to no headers.
:type headers: dictionary, or None.
"""
def __init__(self, method, url, body, headers: Optional[dict] = None):
self.method = method
self.url = url
self.body = body
if headers is None:
headers = {}
self.headers = headers
class HttpResponse:
"""A class representing an HTTP response, with a status code and body.
:param body: Response body.
:type body: bytes
:param status: HTTP status code.
:type status: int
"""
def __init__(self, status, body):
self.status = status
self.body = body
class _ConcurrentJobExecution(Flow):
_BACKOFF_MAX = 120
_DEFAULT_MAX_IN_FLIGHT = 8
def __init__(self, max_in_flight=None, retries=None, backoff_factor=None, **kwargs):
Flow.__init__(self, **kwargs)
if max_in_flight is not None and max_in_flight < 1:
raise ValueError(f"max_in_flight may not be less than 1 (got {max_in_flight})")
self.max_in_flight = max_in_flight
self.retries = retries
self.backoff_factor = backoff_factor
self._queue_size = (max_in_flight or self._DEFAULT_MAX_IN_FLIGHT) - 1
def _init(self):
super()._init()
self._q = None
self._lazy_init_complete = False
async def _worker(self):
try:
while True:
# Allow event to be garbage collected
job = None # noqa
event = None
completed = None # noqa
try:
# If we don't handle the event before we remove it from the queue, the effective max_in_flight will
# be 1 higher than requested. Hence, we peek.
job = await self._q.peek()
if job is _termination_obj:
await self._q.get()
break
event = job[0]
completed = await job[1]
await self._handle_completed(event, completed)
await self._q.get()
except BaseException as ex:
await self._q.get()
ex._raised_by_storey_step = self
recovery_step = self._get_recovery_step(ex)
try:
if recovery_step is not None:
event.origin_state = self.name
event.error = ex
await recovery_step._do(event)
else:
if event._awaitable_result:
none_or_coroutine = event._awaitable_result._set_error(ex)
if none_or_coroutine:
await none_or_coroutine
if self.context and hasattr(self.context, "push_error"):
message = traceback.format_exc()
if self.logger:
self.logger.error(f"Pushing error to error stream: {ex}\n{message}")
self.context.push_error(event, f"{ex}\n{message}", source=self.name)
else:
raise ex
except BaseException:
if not self._q.empty():
await self._q.get()
raise
finally:
await self._cleanup()
async def _process_event(self, event):
raise NotImplementedError()
async def _handle_completed(self, event, response):
raise NotImplementedError()
async def _cleanup(self):
pass
async def _lazy_init(self):
pass
async def _process_event_with_retries(self, event):
times_attempted = 0
max_attempts = (self.retries or 0) + 1
while True:
try:
return await self._process_event(event)
except Exception as ex:
times_attempted += 1
attempts_left = max_attempts - times_attempted
if self.logger:
self.logger.warn(f"{self.name} failed to process event ({attempts_left} retries left): {ex}")
if attempts_left <= 0:
raise ex
backoff_value = (self.backoff_factor or 1) * (2 ** (times_attempted - 1))
backoff_value = min(self._BACKOFF_MAX, backoff_value)
if backoff_value >= 0:
await asyncio.sleep(backoff_value)
async def _do(self, event):
if not self._lazy_init_complete:
await self._lazy_init()
self._lazy_init_complete = True
if not self._q and self._queue_size > 0:
self._q = AsyncQueue(self._queue_size)
self._worker_awaitable = asyncio.get_running_loop().create_task(self._worker())
if self._queue_size > 0 and self._worker_awaitable.done():
await self._worker_awaitable
raise FlowError("ConcurrentJobExecution worker has already terminated")
if event is _termination_obj:
if self._queue_size > 0:
await self._q.put(_termination_obj)
await self._worker_awaitable
else:
await self._cleanup()
return await self._do_downstream(_termination_obj)
else:
coroutine = self._process_event_with_retries(event)
if self._queue_size == 0:
completed = await coroutine
await self._handle_completed(event, completed)
else:
task = asyncio.get_running_loop().create_task(coroutine)
await self._q.put((event, task))
if self._worker_awaitable.done():
await self._worker_awaitable
class ConcurrentExecution(_ConcurrentJobExecution):
"""
Inherit this class and override `process_event()` to process events concurrently.
:param process_event: Function that will be run on each event
:param concurrency_mechanism: One of:
* "asyncio" (default) – for I/O implemented using asyncio
* "threading" – for blocking I/O
* "multiprocessing" – for processing-intensive tasks
:param max_in_flight: Maximum number of events to be processed at a time (default 8)
:param retries: Maximum number of retries per event (default 0)
:param backoff_factor: Wait time in seconds before the first retry (default 1). Subsequent retries will each wait
twice as the previous retry, up to a maximum of two minutes.
:param pass_context: If False, the process_event function will be called with just one parameter (event). If True,
the process_event function will be called with two parameters (event, context). Defaults to False.
:param full_event: Whether event processor should receive and return Event objects (when True),
or only the payload (when False). Defaults to False.
"""
_supported_concurrency_mechanisms = ["asyncio", "threading", "multiprocessing"]
def __init__(
self,
event_processor: Union[Callable[[Event], Any], Callable[[Event, Any], Any]],
concurrency_mechanism=None,
pass_context=None,
**kwargs,
):
super().__init__(**kwargs)
if concurrency_mechanism == "multiprocessing" and kwargs.get("full_event"):
raise ValueError(
'concurrency_mechanism="multiprocessing" may not be used in conjunction with full_event=True'
)
self._event_processor = event_processor
if concurrency_mechanism and concurrency_mechanism not in self._supported_concurrency_mechanisms:
raise ValueError(f"Concurrency mechanism '{concurrency_mechanism}' is not supported")
if concurrency_mechanism == "multiprocessing" and pass_context:
try:
pickle.dumps(self.context)
except Exception as ex:
raise ValueError(
'When concurrency_mechanism="multiprocessing" is used in conjunction with '
"pass_context=True, context must be serializable"
) from ex
self._executor = None
if concurrency_mechanism == "threading":
self._executor = ThreadPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT)
elif concurrency_mechanism == "multiprocessing":
self._executor = ProcessPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT)
self._pass_context = pass_context
async def _process_event(self, event):
args = [event if self._full_event else event.body]
if self._pass_context:
args.append(self.context)
if self._executor:
result = await asyncio.get_running_loop().run_in_executor(self._executor, self._event_processor, *args)
else:
result = self._event_processor(*args)
if asyncio.iscoroutine(result):
result = await result
if self._full_event:
return result
else:
event.body = result
return event
async def _handle_completed(self, event, response):
await self._do_downstream(response)
[docs]class SendToHttp(_ConcurrentJobExecution):
"""Joins each event with data from any HTTP source. Used for event augmentation.
:param request_builder: Creates an HTTP request from the event. This request is then sent to its destination.
:type request_builder: Function (Event=>HttpRequest)
:param join_from_response: Joins the original event with the HTTP response into a new event.
:type join_from_response: Function ((Event, HttpResponse)=>Event)
:param name: Name of this step, as it should appear in logs. Defaults to class name (SendToHttp).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True),
or only the payload (when False). Defaults to False.
:type full_event: boolean
"""
def __init__(self, request_builder, join_from_response, **kwargs):
super().__init__(**kwargs)
self._request_builder = request_builder
self._join_from_response = join_from_response
def _init(self):
super()._init()
self._client_session = None
async def _lazy_init(self):
connector = aiohttp.TCPConnector()
self._client_session = aiohttp.ClientSession(connector=connector)
async def _cleanup(self):
await self._client_session.close()
async def _process_event(self, event):
req = self._request_builder(event)
return await self._client_session.request(req.method, req.url, headers=req.headers, data=req.body, ssl=False)
async def _handle_completed(self, event, response):
response_body = await response.text()
joined_element = self._join_from_response(event.body, HttpResponse(response.status, response_body))
if joined_element is not None:
new_event = self._user_fn_output_to_event(event, joined_element)
await self._do_downstream(new_event)
class _Batching(Flow):
_do_downstream_per_event = True
def __init__(
self,
max_events: Optional[int] = None,
flush_after_seconds: Union[int, float, None] = None,
key_field: Optional[Union[str, Callable[[Event], str]]] = None,
drop_key_field=False,
**kwargs,
):
if max_events:
kwargs["max_events"] = max_events
if flush_after_seconds is not None:
kwargs["flush_after_seconds"] = flush_after_seconds
if isinstance(key_field, str):
kwargs["key_field"] = key_field
super().__init__(**kwargs)
self._max_events = max_events
self._flush_after_seconds = flush_after_seconds
if self._flush_after_seconds is not None and self._flush_after_seconds < 0:
raise ValueError("flush_after_seconds cannot be negative")
self._extract_key: Optional[Callable[[Event], str]] = self._create_key_extractor(key_field, drop_key_field)
def _init(self):
super()._init()
self._batch: Dict[Optional[str], List[Any]] = defaultdict(list)
# Keep the original events that make up each batch
self._batch_events: Dict[Optional[str], List[Any]] = defaultdict(list)
self._batch_first_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_last_event_time: Dict[Optional[str], datetime.datetime] = {}
self._batch_start_time: Dict[Optional[str], float] = {}
self._timeout_task: Optional[Task] = None
@staticmethod
def _create_key_extractor(key_field, drop_key_field) -> Callable:
if key_field is None:
return lambda event: None
elif callable(key_field):
return key_field
elif isinstance(key_field, str):
if key_field.startswith("$"):
attribute = key_field[1:]
return lambda event: getattr(event, attribute)
elif drop_key_field:
return lambda event: event.body.pop(key_field)
else:
return lambda event: event.body[key_field]
else:
raise ValueError(f"Unsupported key_field type {type(key_field)}")
async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
raise NotImplementedError
async def _terminate(self):
pass
async def _do(self, event):
if event is _termination_obj:
await self._emit_all()
await self._terminate()
return await self._do_downstream(_termination_obj)
key = self._extract_key(event)
if hasattr(self, "_get_event_time"):
event_time = self._get_event_time(event)
else:
event_time = event.processing_time
if len(self._batch[key]) == 0:
self._batch_first_event_time[key] = event_time
self._batch_start_time[key] = time.monotonic()
self._batch_last_event_time[key] = event_time
elif self._batch_last_event_time[key] < event_time:
self._batch_last_event_time[key] = event_time
if self._flush_after_seconds is not None and self._timeout_task is None:
self._timeout_task = asyncio.get_running_loop().create_task(self._sleep_and_emit())
self._batch[key].append(self._event_to_batch_entry(event))
self._batch_events[key].append(event)
if len(self._batch[key]) == self._max_events:
await self._emit_batch(key)
if self._do_downstream_per_event:
await self._do_downstream(event)
async def _sleep_and_emit(self):
try:
while self._batch:
key = next(iter(self._batch.keys()))
delta_seconds = time.monotonic() - self._batch_start_time[key]
if delta_seconds < self._flush_after_seconds:
await asyncio.sleep(self._flush_after_seconds - delta_seconds)
await self._emit_batch(key)
except Exception:
message = traceback.format_exc()
if self.logger:
self.logger.error(f"Failed to flush batch in step '{self.name}':\n{message}")
self._timeout_task = None
def _event_to_batch_entry(self, event):
return self._get_event_or_body(event)
async def _emit_batch(self, batch_key: Optional[str] = None):
batch_to_emit = self._batch.pop(batch_key, None)
if batch_to_emit is None:
return
batch_time = self._batch_first_event_time.pop(batch_key)
last_event_time = self._batch_last_event_time.pop(batch_key)
del self._batch_start_time[batch_key]
await self._emit(batch_to_emit, batch_key, batch_time, self._batch_events[batch_key], last_event_time)
del self._batch_events[batch_key]
async def _emit_all(self):
for key in list(self._batch.keys()):
await self._emit_batch(key)
[docs]class Batch(_Batching, WithUUID):
"""Batches events into lists of up to max_events events. Each emitted list contained max_events events, unless
flush_after_seconds seconds have passed since the first event in the batch was received, at which the batch is
emitted with potentially fewer than max_events event.
:param max_events: Maximum number of events per emitted batch. Set to None to emit all events in one batch on flow
termination.
:param flush_after_seconds: Maximum number of seconds to wait before a batch is emitted.
:param key: The key by which events are grouped. By default (None), events are not grouped.
Other options may be:
Set to '$x' to group events by the x attribute of the event. E.g. "$key" or "$path".
set to other string 'str' to group events by Event.body[str].
set a Callable[Any, Any] to group events by a a custom key extractor.
"""
_do_downstream_per_event = False
def __init__(self, *args, **kwargs):
_Batching.__init__(self, *args, **kwargs)
WithUUID.__init__(self)
async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_time=None):
event = Event(batch, id=self._get_uuid())
if not self._full_event:
# Preserve reference to the original events to avoid early commit of offsets
event._original_events = batch_events
return await self._do_downstream(event)
class JoinWithV3IOTable(_ConcurrentJobExecution):
"""Joins each event with a V3IO table. Used for event augmentation.
:param storage: Database driver.
:type storage: Driver
:param key_extractor: Function for extracting the key for table access from an event.
:type key_extractor: Function (Event=>string)
:param join_function: Joins the original event with relevant data received from V3IO.
:type join_function: Function ((Event, dict)=>Event)
:param table_path: Path to the table in V3IO.
:type table_path: string
:param attributes: A comma-separated list of attributes to be requested from V3IO.
Defaults to '*' (all user attributes).
:type attributes: string
:param name: Name of this step, as it should appear in logs. Defaults to class name (JoinWithV3IOTable).
:type name: string
:param full_event: Whether user functions should receive and return Event objects (when True), or only
the payload (when False). Defaults to False.
:type full_event: boolean
"""
def __init__(
self,
storage,
key_extractor,
join_function,
table_path,
attributes="*",
**kwargs,
):
kwargs["table_path"] = table_path
kwargs["attributes"] = attributes
super().__init__(**kwargs)
self._storage = storage
self._key_extractor = key_extractor
self._join_function = join_function
self._container, self._table_path = _split_path(table_path)
self._attributes = attributes
async def _process_event(self, event):
key = str(self._key_extractor(self._get_event_or_body(event)))
return await self._storage._get_item(self._container, self._table_path, key, self._attributes)
async def _handle_completed(self, event, response):
if response.status_code == 200:
response_object = response.output.item
joined = self._join_function(self._get_event_or_body(event), response_object)
if joined is not None:
new_event = self._user_fn_output_to_event(event, joined)
await self._do_downstream(new_event)
elif response.status_code == 404:
return None
else:
raise V3ioError(f"Failed to get item. Response status code was {response.status_code}: {response.body}")
async def _cleanup(self):
await self._storage.close()
[docs]class JoinWithTable(_ConcurrentJobExecution):
"""Joins each event with data from the given table.
:param table: A Table object or name to join with. If a table name is provided, it will be looked up in the context.
:param key_extractor: Key's column name or a function for extracting the key, for table access from an event.
:param attributes: A comma-separated list of attributes to be queried for. Defaults to all attributes.
:param inner_join: Whether to drop events when the table does not have a matching entry (join_function won't be
called in such a case). Defaults to False.
:param join_function: Joins the original event with relevant data received from the storage. Event is dropped when
this function returns None. Defaults to assume the event's body is a dict-like object and updating it.
:param name: Name of this step, as it should appear in logs. Defaults to class name (JoinWithTable).
:param full_event: Whether user functions should receive and return Event objects (when True), or only the
payload (when False). Defaults to False.
:param context: Context object that holds global configurations and secrets.
"""
def __init__(
self,
table: Union[Table, str],
key_extractor: Union[str, Callable[[Event], str]],
attributes: Optional[List[str]] = None,
inner_join: bool = False,
join_function: Optional[Callable[[Any, Dict[str, object]], Any]] = None,
**kwargs,
):
if isinstance(table, str):
kwargs["table"] = table
if isinstance(key_extractor, str):
kwargs["key_extractor"] = key_extractor
if attributes:
kwargs["attributes"] = attributes
kwargs["inner_join"] = inner_join
super().__init__(**kwargs)
self._table = table
if isinstance(table, str):
if not self.context:
raise TypeError("Table can not be string if no context was provided to the step")
self._table = self.context.get_table(table)
if key_extractor:
if callable(key_extractor):
self._key_extractor = key_extractor
elif isinstance(key_extractor, str):
if self._full_event:
self._key_extractor = lambda event: event.body[key_extractor]
else:
self._key_extractor = lambda element: element[key_extractor]
else:
raise TypeError(f"key is expected to be either a callable or string but got {type(key_extractor)}")
def default_join_fn(event, join_res):
event.update(join_res)
return event
def default_join_fn_full_event(event, join_res):
event.body.update(join_res)
return event
self._inner_join = inner_join
self._join_function = join_function or (default_join_fn_full_event if self._full_event else default_join_fn)
self._attributes = attributes or "*"
def _init(self):
super()._init()
self._closeables = [self._table]
async def _process_event(self, event):
key = self._key_extractor(self._get_event_or_body(event))
safe_key = stringify_key(key)
return await self._table._get_or_load_static_attributes_by_key(safe_key, self._attributes)
async def _handle_completed(self, event, response):
if self._inner_join and not response:
return
joined = self._join_function(self._get_event_or_body(event), response)
if joined is not None:
new_event = self._user_fn_output_to_event(event, joined)
await self._do_downstream(new_event)
def build_flow(steps):
"""Builds a flow from a list of steps, by chaining the steps according to their order in the list.
Nested lists are used to represent branches in the flow.
Examples:
build_flow([step1, step2, step3])
is equivalent to
step1.to(step2).to(step3)
build_flow([step1, [step2a, step2b], step3])
is equivalent to
step1.to(step2a)
step1.to(step3)
step2a.to(step2b)
:param steps: a potentially nested list of steps
:returns: the first step
:rtype: Flow
"""
if len(steps) == 0:
raise ValueError("Cannot build an empty flow")
first_step = steps[0]
if isinstance(first_step, list):
first_step = build_flow(first_step)
cur_step = first_step
for next_step in steps[1:]:
if isinstance(next_step, list):
cur_step.to(build_flow(next_step))
else:
cur_step.to(next_step)
cur_step = next_step
return first_step
class Context:
"""
Context object that holds global secrets and configurations to be passed to relevant steps.
:param initial_secrets: Initial dict of secrets.
:param initial_parameters: Initial dict of parameters.
:param initial_tables: Initial dict of tables.
"""
def __init__(
self,
initial_secrets: Optional[Dict[str, str]] = None,
initial_parameters: Optional[Dict[str, object]] = None,
initial_tables: Optional[Dict[str, Table]] = None,
):
self._secrets = initial_secrets or {}
self._parameters = initial_parameters or {}
self._tables = initial_tables or {}
def get_param(self, key, default):
return self._parameters.get(key, default)
def set_param(self, key, value):
self._parameters[key] = value
def get_secret(self, key):
return self._secrets.get(key, None)
def set_secret(self, key, secret):
self._secrets[key] = secret
def get_table(self, key):
return self._tables[key]
def set_table(self, key, table):
self._tables[key] = table
class _ParallelExecutionRunnableResult:
def __init__(self, runnable_name: str, data: Any, runtime: float):
self.runnable_name = runnable_name
self.data = data
self.runtime = runtime
parallel_execution_mechanisms = ("multiprocessing", "threading", "asyncio", "naive")
class ParallelExecutionRunnable:
"""
Runnable to be run by a ParallelExecution step. Subclasses must assign execution_mechanism with one of:
* "multiprocessing" – To run in a separate process. This is appropriate for CPU or GPU intensive tasks as they
would otherwise block the main process by holding Python's Global Interpreter Lock (GIL).
* "threading" – To run in a separate thread. This is appropriate for blocking I/O tasks, as they would otherwise
block the main event loop thread.
* "asyncio" – To run in an asyncio task. This is appropriate for I/O tasks that use asyncio, allowing the event
loop to continue running while waiting for a response.
* "naive" – To run in the main event loop. This is appropriate only for trivial computation and/or file I/O. It
means that the runnable will not actually be run in parallel to anything else.
Subclasses must also override the run() method, or run_async() when execution_mechanism="asyncio", with user code
that handles the event and returns a result.
Subclasses may optionally override the init() method if the user's implementation of run() requires prior
initialization.
:param name: Runnable name
"""
execution_mechanism: Optional[str] = None
# ignore unused keyword arguments such as context which may be passed in by mlrun
def __init__(self, name: str, **kwargs):
if self.execution_mechanism not in parallel_execution_mechanisms:
raise ValueError(
"ParallelExecutionRunnable's execution_mechanism attribute must be overridden with one of: "
'"multiprocessing", "threading", "asyncio", "naive"'
)
self.name = name
def init(self) -> None:
"""Override this method to add initialization logic."""
pass
def run(self, body: Any, path: str) -> Any:
"""
Override this method with the code this runnable should run. If execution_mechanism is "asyncio", override
run_async() instead.
:param body: Event body
:param path: Event path
"""
return body
async def run_async(self, body: Any, path: str) -> Any:
"""
If execution_mechanism is "asyncio", override this method with the code this runnable should run. Otherwise,
override run() instead.
:param body: Event body
:param path: Event path
"""
return body
def _run(self, body: Any, path: str) -> Any:
start = time.monotonic()
body = self.run(body, path)
end = time.monotonic()
return _ParallelExecutionRunnableResult(self.name, body, end - start)
async def _async_run(self, body: Any, path: str) -> Any:
start = time.monotonic()
body = await self.run_async(body, path)
end = time.monotonic()
return _ParallelExecutionRunnableResult(self.name, body, end - start)
class ParallelExecution(Flow):
"""
Runs multiple jobs in parallel for each event.
:param runnables: A list of ParallelExecutionRunnable instances.
:param max_processes: Maximum number of processes to spawn. Defaults to the number of available CPUs, or 16 if
number of CPUs can't be determined.
:param max_threads: Maximum number of threads to start. Defaults to 32.
"""
def __init__(
self,
runnables: list[ParallelExecutionRunnable],
max_processes: Optional[int] = None,
max_threads: Optional[int] = None,
**kwargs,
):
super().__init__(**kwargs)
if not runnables:
raise ValueError("ParallelExecution cannot be instantiated without at least one runnable")
self.runnables = runnables
self._runnable_by_name = {}
self.max_processes = max_processes or os.cpu_count() or 16
self.max_threads = max_threads or 32
def select_runnables(self, event) -> Optional[Union[list[str], list[ParallelExecutionRunnable]]]:
"""
Given an event, returns a list of runnables (or a list of runnable names) to execute on it. It can also return
None, in which case all runnables are executed on the event, which is also the default.
:param event: Event object
"""
pass
def _init(self):
super()._init()
num_processes = 0
num_threads = 0
for runnable in self.runnables:
if runnable.name in self._runnable_by_name:
raise ValueError(f"ParallelExecutionRunnable name '{runnable.name}' is not unique")
self._runnable_by_name[runnable.name] = runnable
runnable.init()
if runnable.execution_mechanism == "multiprocessing":
num_processes += 1
elif runnable.execution_mechanism == "threading":
num_threads += 1
elif runnable.execution_mechanism not in ("asyncio", "naive"):
raise ValueError(f"Unsupported execution mechanism: {runnable.execution_mechanism}")
# enforce max
num_processes = min(num_processes, self.max_processes)
num_threads = min(num_threads, self.max_threads)
self._executors = {}
if num_processes:
mp_context = multiprocessing.get_context("spawn")
self._executors["multiprocessing"] = ProcessPoolExecutor(max_workers=num_processes, mp_context=mp_context)
if num_threads:
self._executors["threading"] = ThreadPoolExecutor(max_workers=num_threads)
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
runnables = self.select_runnables(event)
if runnables is None:
runnables = self.runnables
futures = []
runnables_encountered = set()
for runnable in runnables:
if isinstance(runnable, str):
runnable = self._runnable_by_name[runnable]
if id(runnable) in runnables_encountered:
raise ValueError(f"select_runnables() returned more than one outlet named '{runnable.name}'")
input = event.body if runnable.execution_mechanism == "multiprocessing" else copy.deepcopy(event.body)
runnables_encountered.add(id(runnable))
if runnable.execution_mechanism == "asyncio":
future = asyncio.get_running_loop().create_task(runnable._async_run(input, event.path))
elif runnable.execution_mechanism == "naive":
future = asyncio.get_running_loop().create_future()
future.set_result(runnable._run(input, event.path))
else:
executor = self._executors[runnable.execution_mechanism]
future = asyncio.get_running_loop().run_in_executor(
executor,
runnable._run,
input,
event.path,
)
futures.append(future)
results: list[_ParallelExecutionRunnableResult] = await asyncio.gather(*futures)
if len(self.runnables) == 1:
event.body = results[0].data if results else None
else:
event.body = {result.runnable_name: result.data for result in results}
return await self._do_downstream(event)