Source code for mlrun.feature_store.steps

# Copyright 2018 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re
import uuid
from typing import Any, Dict, List, Union

import numpy as np
import pandas as pd
from storey import MapClass

import mlrun.errors
from mlrun.serving.server import get_event_time
from mlrun.serving.utils import StepToDict
from mlrun.utils import get_in


def get_engine(first_event):
    if hasattr(first_event, "body"):
        first_event = first_event.body
    if isinstance(first_event, pd.DataFrame):
        return "pandas"
    return "storey"


[docs]class MLRunStep(MapClass):
[docs] def __init__(self, **kwargs): """Abstract class for mlrun step. Can be used in pandas/storey feature set ingestion""" super().__init__(**kwargs)
[docs] def do(self, event): """ This method defines the do method of this class according to the first event type. """ engine = get_engine(event) if engine == "pandas": self.do = self._do_pandas else: self.do = self._do_storey return self.do(event)
def _do_pandas(self, event): raise NotImplementedError def _do_storey(self, event): raise NotImplementedError
[docs]class FeaturesetValidator(StepToDict, MLRunStep): """Validate feature values according to the feature set validation policy"""
[docs] def __init__(self, featureset=None, columns=None, name=None, **kwargs): """Validate feature values according to the feature set validation policy :param featureset: feature set uri (or "." for current feature set pipeline) :param columns: names of the columns/fields to validate :param name: step name :param kwargs: optional kwargs (for storey) """ kwargs["full_event"] = True super().__init__(**kwargs) self._validators = {} self.featureset = featureset or "." self.columns = columns self.name = name if not self.context: return self._featureset = self.context.get_store_resource(featureset) for key, feature in self._featureset.spec.features.items(): if feature.validator and (not columns or key in columns): feature.validator.set_feature(feature) self._validators[key] = feature.validator
def _do_storey(self, event): body = event.body for name, validator in self._validators.items(): if name in body: ok, args = validator.check(body[name]) if not ok: message = args.pop("message") key_text = f" key={event.key}" if event.key else "" if event.time: key_text += f" time={event.time}" print( f"{validator.severity}! {name} {message},{key_text} args={args}" ) return event def _do_pandas(self, event): body = event.body for column in body: validator = self._validators.get(column, None) if validator: violations = 0 all_args = [] for i in body.index: # check each body entry if there is validator for it ok, args = validator.check(body.at[i, column]) if not ok: violations += 1 all_args.append(args) message = args.pop("message") if violations != 0: text = f" column={column}, has {violations} violations" if event.time: text += f" time={event.time}" print( f"{validator.severity}! {column} {message},{text} args={all_args}" ) return event
[docs]class MapValues(StepToDict, MLRunStep): """Map column values to new values"""
[docs] def __init__( self, mapping: Dict[str, Dict[str, Any]], with_original_features: bool = False, suffix: str = "mapped", **kwargs, ): """Map column values to new values example:: # replace the value "U" with '0' in the age column graph.to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True)) # replace integers, example graph.to(MapValues(mapping={'not': {0: 1, 1: 0}})) # replace by range, use -inf and inf for extended range graph.to(MapValues(mapping={'numbers': {'ranges': {'negative': [-inf, 0], 'positive': [0, inf]}}})) :param mapping: a dict with entry per column and the associated old/new values map :param with_original_features: set to True to keep the original features :param suffix: the suffix added to the column name <column>_<suffix> (default is "mapped") :param kwargs: optional kwargs (for storey) """ super().__init__(**kwargs) self.mapping = mapping self.with_original_features = with_original_features self.suffix = suffix
def _map_value(self, feature: str, value): feature_map = self.mapping.get(feature, {}) # Is this a range replacement? if "ranges" in feature_map: for val, val_range in feature_map.get("ranges", {}).items(): min_val = val_range[0] if val_range[0] != "-inf" else -np.inf max_val = val_range[1] if val_range[1] != "inf" else np.inf if value >= min_val and value < max_val: return val # Is it a regular replacement return feature_map.get(value, value) def _get_feature_name(self, feature) -> str: return f"{feature}_{self.suffix}" if self.with_original_features else feature def _do_storey(self, event): mapped_values = { self._get_feature_name(feature): self._map_value(feature, val) for feature, val in event.items() if feature in self.mapping } if self.with_original_features: mapped_values.update(event) return mapped_values def _do_pandas(self, event): df = pd.DataFrame(index=event.index) for feature in event.columns: feature_map = self.mapping.get(feature, {}) if "ranges" in feature_map: # create and apply range map for val, val_range in feature_map.get("ranges", {}).items(): min_val = val_range[0] if val_range[0] != "-inf" else -np.inf max_val = val_range[1] if val_range[1] != "inf" else np.inf feature_map["ranges"][val] = [min_val, max_val] matchdf = pd.DataFrame.from_dict( feature_map["ranges"], "index" ).reset_index() matchdf.index = pd.IntervalIndex.from_arrays( left=matchdf[0], right=matchdf[1], closed="both" ) df[self._get_feature_name(feature)] = matchdf.loc[event[feature]][ "index" ].values elif feature_map: # create and apply simple map df[self._get_feature_name(feature)] = event[feature].map( lambda x: feature_map[x] ) if self.with_original_features: df = pd.concat([event, df], axis=1) return df
[docs]class Imputer(StepToDict, MLRunStep):
[docs] def __init__( self, method: str = "avg", default_value=None, mapping: Dict[str, Any] = None, **kwargs, ): """Replace None values with default values :param method: for future use :param default_value: default value if not specified per column :param mapping: a dict of per column default value :param kwargs: optional kwargs (for storey) """ super().__init__(**kwargs) self.mapping = mapping self.method = method self.default_value = default_value
def _impute(self, feature: str, value): if value is None: return self.mapping.get(feature, self.default_value) return value def _do_storey(self, event): imputed_values = { feature: self._impute(feature, val) for feature, val in event.items() } return imputed_values def _do_pandas(self, event): for feature in event.columns: val = self.mapping.get(feature, self.default_value) if val is not None: event[feature].fillna(val, inplace=True) return event
[docs]class OneHotEncoder(StepToDict, MLRunStep):
[docs] def __init__(self, mapping: Dict[str, List[Union[int, str]]], **kwargs): """Create new binary fields, one per category (one hot encoded) example:: mapping = {'category': ['food', 'health', 'transportation'], 'gender': ['male', 'female']} graph.to(OneHotEncoder(mapping=one_hot_encoder_mapping)) :param mapping: a dict of per column categories (to map to binary fields) :param kwargs: optional kwargs (for storey) """ super().__init__(**kwargs) self.mapping = mapping for key, values in mapping.items(): for val in values: if not (isinstance(val, str) or isinstance(val, (int, np.integer))): raise mlrun.errors.MLRunInvalidArgumentError( "For OneHotEncoder you must provide int or string mapping list" ) mapping[key] = list(set(values))
def _encode(self, feature: str, value): encoding = self.mapping.get(feature, []) if encoding: one_hot_encoding = { f"{feature}_{OneHotEncoder._sanitized_category(category)}": 0 for category in encoding } if value in encoding: one_hot_encoding[ f"{feature}_{OneHotEncoder._sanitized_category(value)}" ] = 1 else: print(f"Warning, {value} is not a known value by the encoding") return one_hot_encoding return {feature: value} def _do_storey(self, event): encoded_values = {} for feature, val in event.items(): encoded_values.update(self._encode(feature, val)) return encoded_values def _do_pandas(self, event): for key, values in self.mapping.items(): event[key] = pd.Categorical(event[key], categories=list(values)) encoded = pd.get_dummies(event[key], prefix=key, dtype=np.int64) event = pd.concat([event.loc[:, :key], encoded, event.loc[:, key:]], axis=1) event.drop(columns=list(self.mapping.keys()), inplace=True) return event @staticmethod def _sanitized_category(category): # replace(" " and "-") -> "_" if isinstance(category, str): return re.sub("[ -]", "_", category) return category
[docs]class DateExtractor(StepToDict, MLRunStep): """Date Extractor allows you to extract a date-time component"""
[docs] def __init__( self, parts: Union[Dict[str, str], List[str]], timestamp_col: str = None, **kwargs, ): """Date Extractor extract a date-time component into new columns The extracted date part will appear as `<timestamp_col>_<date_part>` feature. Supports part values: * asm8: Return numpy datetime64 format in nanoseconds. * day_of_week: Return day of the week. * day_of_year: Return the day of the year. * dayofweek: Return day of the week. * dayofyear: Return the day of the year. * days_in_month: Return the number of days in the month. * daysinmonth: Return the number of days in the month. * freqstr: Return the total number of days in the month. * is_leap_year: Return True if year is a leap year. * is_month_end: Return True if date is last day of month. * is_month_start: Return True if date is first day of month. * is_quarter_end: Return True if date is last day of the quarter. * is_quarter_start: Return True if date is first day of the quarter. * is_year_end: Return True if date is last day of the year. * is_year_start: Return True if date is first day of the year. * quarter: Return the quarter of the year. * tz: Alias for tzinfo. * week: Return the week number of the year. * weekofyear: Return the week number of the year. example:: # (taken from the fraud-detection end-to-end feature store demo) # Define the Transactions FeatureSet transaction_set = fs.FeatureSet("transactions", entities=[fs.Entity("source")], timestamp_key='timestamp', description="transactions feature set") # Get FeatureSet computation graph transaction_graph = transaction_set.graph # Add the custom `DateExtractor` step # to the computation graph transaction_graph.to( class_name='DateExtractor', name='Extract Dates', parts = ['hour', 'day_of_week'], timestamp_col = 'timestamp', ) :param parts: list of pandas style date-time parts you want to extract. :param timestamp_col: The name of the column containing the timestamps to extract from, by default "timestamp" """ super().__init__(**kwargs) self.timestamp_col = timestamp_col self.parts = parts
def _get_key_name(self, part: str, timestamp_col: str): timestamp_col = timestamp_col if timestamp_col else "timestamp" return f"{timestamp_col}_{part}" def _extract_timestamp(self, event): # Extract timestamp if self.timestamp_col is None: timestamp = event["timestamp"] else: try: timestamp = event[self.timestamp_col] except KeyError: raise mlrun.errors.MLRunInvalidArgumentError( f"{self.timestamp_col} does not exist in the event" ) return timestamp def _do_storey(self, event): timestamp = self._extract_timestamp(event) # Extract specified parts timestamp = pd.Timestamp(timestamp) for part in self.parts: # Extract part extracted_part = getattr(timestamp, part) # Add to event event[self._get_key_name(part, self.timestamp_col)] = extracted_part return event def _do_pandas(self, event): timestamp = self._extract_timestamp(event) # Extract specified parts for part in self.parts: # Extract part and add it to event event[self._get_key_name(part, self.timestamp_col)] = timestamp.map( lambda x: getattr(pd.Timestamp(x), part) ) return event
[docs]class SetEventMetadata(MapClass): """Set the event metadata (id, key, timestamp) from the event body"""
[docs] def __init__( self, id_path: str = None, key_path: str = None, time_path: str = None, random_id: bool = None, **kwargs, ): """Set the event metadata (id, key, timestamp) from the event body set the event metadata fields (id, key, and time) from the event body data structure the xx_path attribute defines the key or path to the value in the body dict, "." in the path string indicate the value is in a nested dict e.g. `"x.y"` means `{"x": {"y": value}}` example:: flow = function.set_topology("flow") # build a graph and use the SetEventMetadata step to extract the id, key and path from the event body # ("myid", "mykey" and "mytime" fields), the metadata will be used for following data processing steps # (e.g. feature store ops, time/key aggregations, write to databases/streams, etc.) flow.to(SetEventMetadata(id_path="myid", key_path="mykey", time_path="mytime")) .to(...) # additional steps server = function.to_mock_server() event = {"myid": "34", "mykey": "123", "mytime": "2022-01-18 15:01"} resp = server.test(body=event) :param id_path: path to the id value :param key_path: path to the key value :param time_path: path to the time value (value should be of type str or datetime) :param random_id: if True will set the event.id to a random value """ kwargs["full_event"] = True super().__init__(**kwargs) self.id_path = id_path self.key_path = key_path self.time_path = time_path self.random_id = random_id self._tagging_funcs = []
def post_init(self, mode="sync"): def add_metadata(name, path, operator=str): def _add_meta(event): value = get_in(event.body, path) setattr(event, name, operator(value)) return _add_meta def set_random_id(event): event.id = uuid.uuid4().hex self._tagging_funcs = [] if self.id_path: self._tagging_funcs.append(add_metadata("id", self.id_path)) if self.key_path: self._tagging_funcs.append(add_metadata("key", self.key_path)) if self.time_path: self._tagging_funcs.append( add_metadata("time", self.time_path, get_event_time) ) if self.random_id: self._tagging_funcs.append(set_random_id) def do(self, event): for func in self._tagging_funcs: func(event) return event
[docs]class DropFeatures(StepToDict, MLRunStep):
[docs] def __init__(self, features: List[str], **kwargs): """Drop all the features from feature list :param features: string list of the features names to drop example:: feature_set = fs.FeatureSet("fs-new", entities=[fs.Entity("id")], description="feature set", engine="pandas", ) # Pre-processing grpah steps feature_set.graph.to(DropFeatures(features=["age"])) df_pandas = fs.ingest(feature_set, data) """ super().__init__(**kwargs) self.features = features
def _do_storey(self, event): for feature in self.features: try: del event[feature] except KeyError: raise mlrun.errors.MLRunInvalidArgumentError( f"The ingesting data doesn't contain a feature named '{feature}'" ) return event def _do_pandas(self, event): return event.drop(columns=self.features)