import pandas as pd
from typing import Optional, List
from .flow import _termination_obj, Flow
class ReduceToDataFrame(Flow):
"""Builds a pandas DataFrame from events and returns that DataFrame on flow termination.
:param index: Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.
:param columns: List of column names to be passed as-is to the DataFrame constructor. Optional.
:param insert_key_column_as: Name of the column to be inserted for event keys. Optional.
If not set, event keys will not be inserted into the DataFrame.
:param insert_time_column_as: Name of the column to be inserted for event times. Optional.
If not set, event times will not be inserted into the DataFrame.
:param insert_id_column_as: Name of the column to be inserted for event IDs. Optional.
If not set, event IDs will not be inserted into the DataFrame.
for additional params, see documentation of :class:`storey.flow.Flow`
"""
def __init__(self, index: Optional[str] = None, columns: Optional[List[str]] = None, insert_key_column_as: Optional[str] = None,
insert_time_column_as: Optional[str] = None, insert_id_column_as: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self._index = index
self._columns = columns
self._insert_key_column_as = insert_key_column_as
self._insert_time_column_as = insert_time_column_as
self._insert_id_column_as = insert_id_column_as
def _init(self):
super()._init()
self._key_column = []
self._time_column = []
self._id_column = []
self._data = []
def to(self, outlet):
"""Pipe this step to next one. Throws exception since illegal"""
raise ValueError("ToDataFrame is a terminal step. It cannot be piped further.")
async def _do(self, event):
if event is _termination_obj:
df = pd.DataFrame(self._data, columns=self._columns)
if not df.empty:
if self._insert_key_column_as:
df[self._insert_key_column_as] = pd.DataFrame(self._key_column)
if self._insert_time_column_as:
df[self._insert_time_column_as] = self._time_column
if self._insert_id_column_as:
df[self._insert_id_column_as] = self._id_column
if self._index:
df.set_index(self._index, inplace=True)
return df
else:
body = event.body
if isinstance(body, dict) or isinstance(body, list):
self._data.append(body)
if self._insert_key_column_as:
self._key_column.append(event.key)
if self._insert_time_column_as:
self._time_column.append(event.time)
if self._insert_id_column_as:
self._id_column.append(event.id)
else:
raise ValueError(f'ToDataFrame step only supports input of type dictionary or list, not {type(body)}')
[docs]class ToDataFrame(Flow):
"""Create pandas data frame from events. Can appear in the middle of the flow, as opposed to ReduceToDataFrame
:param index: Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.
:param columns: List of column names to be passed as-is to the DataFrame constructor. Optional.
for additional params, see documentation of :class:`storey.flow.Flow`
"""
def __init__(self, index: Optional[str] = None, columns: Optional[List[str]] = None, **kwargs):
super().__init__(**kwargs)
self._index = index
self._columns = columns
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
df = pd.DataFrame(event.body, columns=self._columns)
if self._index:
df.set_index(self._index, inplace=True)
event.body = df
return await self._do_downstream(event)