Source code for storey.dataframe
# Copyright 2020 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import List, Optional
import pandas as pd
from .flow import Flow, _termination_obj
class ReduceToDataFrame(Flow):
"""Builds a pandas DataFrame from events and returns that DataFrame on flow termination.
:param index: Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.
:param columns: List of column names to be passed as-is to the DataFrame constructor. Optional.
:param insert_key_column_as: Name of the column to be inserted for event keys. Optional.
If not set, event keys will not be inserted into the DataFrame.
:param insert_id_column_as: Name of the column to be inserted for event IDs. Optional.
If not set, event IDs will not be inserted into the DataFrame.
for additional params, see documentation of :class:`storey.flow.Flow`
"""
def __init__(
self,
index: Optional[str] = None,
columns: Optional[List[str]] = None,
insert_key_column_as: Optional[str] = None,
insert_processing_time_column_as: Optional[str] = None,
insert_id_column_as: Optional[str] = None,
**kwargs,
):
super().__init__(**kwargs)
self._index = index
self._columns = columns
self._insert_key_column_as = insert_key_column_as
self._insert_processing_time_column_as = insert_processing_time_column_as
self._insert_id_column_as = insert_id_column_as
def _init(self):
super()._init()
self._key_column = []
self._processing_time_column = []
self._id_column = []
self._data = []
def to(self, outlet):
"""Pipe this step to next one. Throws exception since illegal"""
raise ValueError("ToDataFrame is a terminal step. It cannot be piped further.")
async def _do(self, event):
if event is _termination_obj:
df = pd.DataFrame(self._data, columns=self._columns)
if not df.empty:
if self._insert_key_column_as:
df[self._insert_key_column_as] = pd.DataFrame(self._key_column)
if self._insert_processing_time_column_as:
df[self._insert_processing_time_column_as] = self._processing_time_column
if self._insert_id_column_as:
df[self._insert_id_column_as] = self._id_column
if self._index:
df.set_index(self._index, inplace=True)
return df
else:
body = event.body
if isinstance(body, dict) or isinstance(body, list):
self._data.append(body)
if self._insert_key_column_as:
self._key_column.append(event.key)
if self._insert_processing_time_column_as:
self._processing_time_column.append(event.processing_time)
if self._insert_id_column_as:
self._id_column.append(event.id)
else:
raise ValueError(f"ToDataFrame step only supports input of type dictionary or list, not {type(body)}")
[docs]class ToDataFrame(Flow):
"""Create pandas data frame from events. Can appear in the middle of the flow, as opposed to ReduceToDataFrame
:param index: Name of the column to be used as index. Optional. If not set, DataFrame will be range indexed.
:param columns: List of column names to be passed as-is to the DataFrame constructor. Optional.
for additional params, see documentation of :class:`storey.flow.Flow`
"""
def __init__(self, index: Optional[str] = None, columns: Optional[List[str]] = None, **kwargs):
super().__init__(**kwargs)
self._index = index
self._columns = columns
async def _do(self, event):
if event is _termination_obj:
return await self._do_downstream(_termination_obj)
else:
df = pd.DataFrame(event.body, columns=self._columns)
if self._index:
df.set_index(self._index, inplace=True)
event.body = df
return await self._do_downstream(event)