mlrun.feature_store

class mlrun.feature_store.Entity(name: Optional[str] = None, value_type: Optional[mlrun.data_types.data_types.ValueType] = None, description: Optional[str] = None, labels: Optional[Dict[str, str]] = None)[source]

Bases: mlrun.model.ModelObj

data entity (index)

data entity (index key)

Parameters
  • name – entity name

  • value_type – type of the entity, e.g. ValueType.STRING, ValueType.INT

  • description – test description of the entity

  • labels – a set of key/value labels (tags)

class mlrun.feature_store.Feature(value_type: Optional[str] = None, dims: Optional[List[int]] = None, description: Optional[str] = None, aggregate: Optional[bool] = None, name: Optional[str] = None, validator=None, default: Optional[str] = None, labels: Optional[Dict[str, str]] = None)[source]

Bases: mlrun.model.ModelObj

data feature

data feature

Features can be specified manually or inferred automatically (during ingest/preview)

Parameters
  • value_type – type of the feature. Use the ValueType constants library e.g. ValueType.STRING, ValueType.INT

  • dims – list of dimensions for vectors/tensors, e.g. [2, 2]

  • description – text description of the feature

  • aggregate – is it an aggregated value

  • name – name of the feature

  • validator – feature validation policy

  • default – default value

  • labels – a set of key/value labels (tags)

property validator
class mlrun.feature_store.FeatureSet(name: Optional[str] = None, description: Optional[str] = None, entities: Optional[List[Union[mlrun.features.Entity, str]]] = None, timestamp_key: Optional[str] = None, engine: Optional[str] = None)[source]

Bases: mlrun.model.ModelObj

Feature set object, defines a set of features and their data pipeline

Feature set object, defines a set of features and their data pipeline

example:

import mlrun.feature_store as fstore
ticks = fstore.FeatureSet("ticks", entities=["stock"], timestamp_key="timestamp")
fstore.ingest(ticks, df)
Parameters
  • name – name of the feature set

  • description – text description

  • entities – list of entity (index key) names or Entity

  • timestamp_key – timestamp column name

  • engine – name of the processing engine (storey, pandas, or spark), defaults to storey

add_aggregation(column, operations, windows, period=None, name=None, step_name=None, after=None, before=None, state_name=None, emit_policy: Optional[storey.dtypes.EmitPolicy] = None)[source]

add feature aggregation rule

example:

myset.add_aggregation("ask", ["sum", "max"], "1h", "10m", name="asks")
Parameters
  • column – name of column/field aggregate. Do not name columns starting with either _ or aggr_. They are reserved for internal use, and the data does not ingest correctly. When using the pandas engine, do not use spaces (` ) or periods (.`) in the column names; they cause errors in the ingestion.

  • operations – aggregation operations, e.g. [‘sum’, ‘std’]

  • windows

    time windows, can be a single window, e.g. ‘1h’, ‘1d’, or a list of same unit windows e.g. [‘1h’, ‘6h’] windows are transformed to fixed windows or sliding windows depending whether period parameter provided.

    • Sliding window is fixed-size overlapping windows that slides with time. The window size determines the size of the sliding window and the period determines the step size to slide. Period must be integral divisor of the window size. If the period is not provided then fixed windows is used.

    • Fixed window is fixed-size, non-overlapping, gap-less window. The window is referred to as a tumbling window. In this case, each record on an in-application stream belongs to a specific window. It is processed only once (when the query processes the window to which the record belongs).

  • period – optional, sliding window granularity, e.g. ’20s’ ‘10m’ ‘3h’ ‘7d’

  • name – optional, aggregation name/prefix. Must be unique per feature set. If not passed, the column will be used as name.

  • step_name – optional, graph step name

  • state_nameDeprecated - use step_name instead

  • after – optional, after which graph step it runs

  • before – optional, comes before graph step

  • emit_policy – optional, which emit policy to use when performing the aggregations. Use the derived classes of storey.EmitPolicy. The default is to emit every period for Spark engine and emit every event for storey. Currently the only other supported option is to use emit_policy=storey.EmitEveryEvent() when using the Spark engine to emit every event

add_entity(name: str, value_type: Optional[mlrun.data_types.data_types.ValueType] = None, description: Optional[str] = None, labels: Optional[Dict[str, str]] = None)[source]

add/set an entity (dataset index)

example:

import mlrun.feature_store as fstore

ticks = fstore.FeatureSet("ticks",
                entities=["stock"],
                timestamp_key="timestamp")
ticks.add_entity("country",
                mlrun.data_types.ValueType.STRING,
                description="stock country")
ticks.add_entity("year", mlrun.data_types.ValueType.INT16)
ticks.save()
Parameters
  • name – entity name

  • value_type – type of the entity (default to ValueType.STRING)

  • description – description of the entity

  • labels – label tags dict

add_feature(feature: mlrun.features.Feature, name=None)[source]

add/set a feature

example:

import mlrun.feature_store as fstore
from mlrun.features import Feature

ticks = fstore.FeatureSet("ticks",
                entities=["stock"],
                timestamp_key="timestamp")
ticks.add_feature(Feature(value_type=mlrun.data_types.ValueType.STRING,
                description="client consistency"),"ABC01")
ticks.add_feature(Feature(value_type=mlrun.data_types.ValueType.FLOAT,
                description="client volatility"),"SAB")
ticks.save()
Parameters
  • feature – setting of Feature

  • name – feature name

property fullname

{tag}]

Type

full name in the form {project}/{name}[

get_stats_table()[source]

get feature statistics table (as dataframe)

get_target_path(name=None)[source]

get the url/path for an offline or specified data target

property graph

feature set transformation graph/DAG

has_valid_source()[source]

check if object’s spec has a valid (non empty) source definition

kind = 'FeatureSet'

add a linked file/artifact (chart, data, ..)

property metadata
plot(filename=None, format=None, with_targets=False, **kw)[source]

generate graphviz plot

purge_targets(target_names: Optional[List[str]] = None, silent: bool = False)[source]

Delete data of specific targets :param target_names: List of names of targets to delete (default: delete all ingested targets) :param silent: Fail silently if target doesn’t exist in featureset status

reload(update_spec=True)[source]

reload/sync the feature vector status and spec from the DB

save(tag='', versioned=False)[source]

save to mlrun db

set_targets(targets=None, with_defaults=True, default_final_step=None, default_final_state=None)[source]

set the desired target list or defaults

Parameters
  • targets – list of target type names (‘csv’, ‘nosql’, ..) or target objects CSVTarget(), ParquetTarget(), NoSqlTarget(), StreamTarget(), ..

  • with_defaults – add the default targets (as defined in the central config)

  • default_final_step – the final graph step after which we add the target writers, used when the graph branches and the end cant be determined automatically

  • default_final_stateDeprecated - use default_final_step instead

property spec
property status
to_dataframe(columns=None, df_module=None, target_name=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]

return featureset (offline) data as dataframe

Parameters
  • columns – list of columns to select (if not all)

  • df_module – py module used to create the DataFrame (pd for Pandas, dd for Dask, ..)

  • target_name – select a specific target (material view)

  • start_time – filter by start time

  • end_time – filter by end time

  • time_column – specify the time column name in the file

  • kwargs – additional reader (csv, parquet, ..) args

Returns

DataFrame

update_targets_for_ingest(targets: List[mlrun.model.DataTargetBase], overwrite: Optional[bool] = None)[source]
property uri

fully qualified feature set uri

class mlrun.feature_store.FeatureVector(name=None, features=None, label_feature=None, description=None, with_indexes=None)[source]

Bases: mlrun.model.ModelObj

Feature vector, specify selected features, their metadata and material views

Feature vector, specify selected features, their metadata and material views

example:

import mlrun.feature_store as fstore
features = ["quotes.bid", "quotes.asks_sum_5h as asks_5h", "stocks.*"]
vector = fstore.FeatureVector("my-vec", features)

# get the vector as a dataframe
df = fstore.get_offline_features(vector).to_dataframe()

# return an online/real-time feature service
svc = fs.get_online_feature_service(vector, impute_policy={"*": "$mean"})
resp = svc.get([{"stock": "GOOG"}])
Parameters
  • name – List of names of targets to delete (default: delete all ingested targets)

  • features – list of feature to collect to this vector. format [<project>/]<feature_set>.<feature_name or *> [as <alias>]

  • label_feature – feature name to be used as label data

  • description – text description of the vector

  • with_indexes – whether to keep the entity and timestamp columns in the response

get_feature_aliases()[source]
get_stats_table()[source]

get feature statistics table (as dataframe)

get_target_path(name=None)[source]
kind = 'FeatureVector'

add a linked file/artifact (chart, data, ..)

property metadata
parse_features(offline=True, update_stats=False)[source]

parse and validate feature list (from vector) and add metadata from feature sets

:returns

feature_set_objects: cache of used feature set objects feature_set_fields: list of field (name, alias) per featureset

reload(update_spec=True)[source]

reload/sync the feature set status and spec from the DB

save(tag='', versioned=False)[source]

save to mlrun db

property spec
property status
to_dataframe(df_module=None, target_name=None)[source]

return feature vector (offline) data as dataframe

property uri

fully qualified feature vector uri

class mlrun.feature_store.FixedWindowType(value)[source]

Bases: enum.Enum

An enumeration.

CurrentOpenWindow = 1
LastClosedWindow = 2
to_qbk_fixed_window_type()[source]
class mlrun.feature_store.OfflineVectorResponse(merger)[source]

Bases: object

get_offline_features response object

property status

vector prep job status (ready, running, error)

to_csv(target_path, **kw)[source]

return results as csv file

to_dataframe(to_pandas=True)[source]

return result as dataframe

to_parquet(target_path, **kw)[source]

return results as parquet file

class mlrun.feature_store.OnlineVectorService(vector, graph, index_columns, impute_policy: Optional[dict] = None)[source]

Bases: object

get_online_feature_service response object

close()[source]

terminate the async loop

get(entity_rows: List[Union[dict, list]], as_list=False)[source]

get feature vector given the provided entity inputs

take a list of input vectors/rows and return a list of enriched feature vectors each input and/or output vector can be a list of values or a dictionary of field names and values, to return the vector as a list of values set the as_list to True.

if the input is a list of list (vs a list of dict), the values in the list will correspond to the index/entity values, i.e. [[“GOOG”], [“MSFT”]] means “GOOG” and “MSFT” are the index/entity fields.

example:

# accept list of dict, return list of dict
svc = fs.get_online_feature_service(vector)
resp = svc.get([{"name": "joe"}, {"name": "mike"}])

# accept list of list, return list of list
svc = fs.get_online_feature_service(vector, as_list=True)
resp = svc.get([["joe"], ["mike"]])
Parameters
  • entity_rows – list of list/dict with input entity data/rows

  • as_list – return a list of list (list input is required by many ML frameworks)

initialize()[source]

internal, init the feature service and prep the imputing logic

property status

vector merger function status (ready, running, error)

class mlrun.feature_store.RunConfig(function: Optional[Union[str, mlrun.runtimes.function_reference.FunctionReference, mlrun.runtimes.base.BaseRuntime]] = None, local: Optional[bool] = None, image: Optional[str] = None, kind: Optional[str] = None, handler: Optional[str] = None, parameters: Optional[dict] = None, watch: Optional[bool] = None, owner=None, credentials: Optional[mlrun.model.Credentials] = None, code: Optional[str] = None, requirements: Optional[Union[str, List[str]]] = None, extra_spec: Optional[dict] = None)[source]

Bases: object

class for holding function and run specs for jobs and serving functions

class for holding function and run specs for jobs and serving functions

when running feature ingestion or merging tasks we use the RunConfig class to pass the desired function and job configuration. the apply() method is used to set resources like volumes, the with_secret() method adds secrets

Most attributes are optional, if not specified a proper default value will be set

examples:

# config for local run emulation
config = RunConfig(local=True)

# config for using empty/default code
config = RunConfig()

# config for using .py/.ipynb file with image and extra package requirements
config = RunConfig("mycode.py", image="mlrun/mlrun", requirements=["spacy"])

# config for using function object
function = mlrun.import_function("hub://some_function")
config = RunConfig(function)
Parameters
  • function – this can be function uri or function object or path to function code (.py/.ipynb) or a FunctionReference the function define the code, dependencies, and resources

  • local – use True to simulate local job run or mock service

  • image – function container image

  • kind – function runtime kind (job, serving, spark, ..), required when function points to code

  • handler – the function handler to execute (for jobs or nuclio)

  • parameters – job parameters

  • watch – in batch jobs will wait for the job completion and print job logs to the console

  • owner – job owner

  • credentials – job credentials

  • code – function source code (as string)

  • requirements – python requirements file path or list of packages

  • extra_spec – additional dict with function spec fields/values to add to the function

apply(modifier)[source]

apply a modifier to add/set function resources like volumes

example:

run_config.apply(mlrun.platforms.auto_mount())
copy()[source]
property function
to_function(default_kind=None, default_image=None)[source]

internal, generate function object

with_secret(kind, source)[source]

register a secrets source (file, env or dict)

read secrets from a source provider to be used in jobs, example:

run_config.with_secrets('file', 'file.txt')
run_config.with_secrets('inline', {'key': 'val'})
run_config.with_secrets('env', 'ENV1,ENV2')
run_config.with_secrets('vault', ['secret1', 'secret2'...])
Parameters
  • kind – secret type (file, inline, env, vault)

  • source – secret data or link (see example)

Returns

This (self) object

mlrun.feature_store.delete_feature_set(name, project='', tag=None, uid=None, force=False)[source]

Delete a FeatureSet object from the DB. :param name: Name of the object to delete :param project: Name of the object’s project :param tag: Specific object’s version tag :param uid: Specific object’s uid :param force: Delete feature set without purging its targets

If tag or uid are specified, then just the version referenced by them will be deleted. Using both

is not allowed. If none are specified, then all instances of the object whose name is name will be deleted.

mlrun.feature_store.delete_feature_vector(name, project='', tag=None, uid=None)[source]

Delete a FeatureVector object from the DB. :param name: Name of the object to delete :param project: Name of the object’s project :param tag: Specific object’s version tag :param uid: Specific object’s uid

If tag or uid are specified, then just the version referenced by them will be deleted. Using both

is not allowed. If none are specified, then all instances of the object whose name is name will be deleted.

mlrun.feature_store.deploy_ingestion_service(featureset: Union[mlrun.feature_store.feature_set.FeatureSet, str], source: Optional[mlrun.model.DataSource] = None, targets: Optional[List[mlrun.model.DataTargetBase]] = None, name: Optional[str] = None, run_config: Optional[mlrun.feature_store.common.RunConfig] = None, verbose=False)[source]

Start real-time ingestion service using nuclio function

Deploy a real-time function implementing feature ingestion pipeline the source maps to Nuclio event triggers (http, kafka, v3io stream, etc.)

the run_config parameter allow specifying the function and job configuration, see: RunConfig

example:

source = HTTPSource()
func = mlrun.code_to_function("ingest", kind="serving").apply(mount_v3io())
config = RunConfig(function=func)
fs.deploy_ingestion_service(my_set, source, run_config=config)
Parameters
  • featureset – feature set object or uri

  • source – data source object describing the online or offline source

  • targets – list of data target objects

  • name – name for the job/function

  • run_config – service runtime configuration (function object/uri, resources, etc..)

  • verbose – verbose log

mlrun.feature_store.get_feature_set(uri, project=None)[source]

get feature set object from the db

Parameters
  • uri – a feature set uri({project}/{name}[:version])

  • project – project name if not specified in uri or not using the current/default

mlrun.feature_store.get_feature_vector(uri, project=None)[source]

get feature vector object from the db

Parameters
  • uri – a feature vector uri({project}/{name}[:version])

  • project – project name if not specified in uri or not using the current/default

mlrun.feature_store.get_offline_features(feature_vector: Union[str, mlrun.feature_store.feature_vector.FeatureVector], entity_rows=None, entity_timestamp_column: Optional[str] = None, target: Optional[mlrun.model.DataTargetBase] = None, run_config: Optional[mlrun.feature_store.common.RunConfig] = None, drop_columns: Optional[List[str]] = None, start_time: Optional[Union[str, pandas._libs.tslibs.timestamps.Timestamp]] = None, end_time: Optional[Union[str, pandas._libs.tslibs.timestamps.Timestamp]] = None, with_indexes: bool = False, update_stats: bool = False, engine: Optional[str] = None, engine_args: Optional[dict] = None)mlrun.feature_store.feature_vector.OfflineVectorResponse[source]

retrieve offline feature vector results

specify a feature vector object/uri and retrieve the desired features, their metadata and statistics. returns OfflineVectorResponse, results can be returned as a dataframe or written to a target

The start_time and end_time attributes allow filtering the data to a given time range, they accept string values or pandas Timestamp objects, string values can also be relative, for example: “now”, “now - 1d2h”, “now+5m”, where a valid pandas Timedelta string follows the verb “now”, for time alignment you can use the verb “floor” e.g. “now -1d floor 1H” will align the time to the last hour (the floor string is passed to pandas.Timestamp.floor(), can use D, H, T, S for day, hour, min, sec alignment).

example:

features = [
    "stock-quotes.bid",
    "stock-quotes.asks_sum_5h",
    "stock-quotes.ask as mycol",
    "stocks.*",
]
vector = FeatureVector(features=features)
resp = get_offline_features(
    vector, entity_rows=trades, entity_timestamp_column="time"
)
print(resp.to_dataframe())
print(vector.get_stats_table())
resp.to_parquet("./out.parquet")
Parameters
  • feature_vector – feature vector uri or FeatureVector object. passing feature vector obj requires update permissions

  • entity_rows – dataframe with entity rows to join with

  • target – where to write the results to

  • drop_columns – list of columns to drop from the final result

  • entity_timestamp_column – timestamp column name in the entity rows dataframe

  • run_config – function and/or run configuration see RunConfig

  • start_time – datetime, low limit of time needed to be filtered. Optional. entity_timestamp_column must be passed when using time filtering.

  • end_time – datetime, high limit of time needed to be filtered. Optional. entity_timestamp_column must be passed when using time filtering.

  • with_indexes – return vector with index columns and timestamp_key from the feature sets (default False)

  • update_stats – update features statistics from the requested feature sets on the vector. Default is False.

  • engine – processing engine kind (“local”, “dask”, or “spark”)

  • engine_args – kwargs for the processing engine

mlrun.feature_store.get_online_feature_service(feature_vector: Union[str, mlrun.feature_store.feature_vector.FeatureVector], run_config: Optional[mlrun.feature_store.common.RunConfig] = None, fixed_window_type: mlrun.feature_store.feature_vector.FixedWindowType = <FixedWindowType.LastClosedWindow: 2>, impute_policy: Optional[dict] = None, update_stats: bool = False)mlrun.feature_store.feature_vector.OnlineVectorService[source]

initialize and return online feature vector service api, returns OnlineVectorService

There are two ways to use the function 1. As context manager

example:

    with get_online_feature_service(vector_uri) as svc:
        resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}])
        print(resp)
        resp = svc.get([{"ticker": "AAPL"}], as_list=True)
        print(resp)

example with imputing::

    with get_online_feature_service(vector_uri, impute_policy={"*": "$mean", "amount": 0)) as svc:
        resp = svc.get([{"id": "C123487"}])
  1. as simple function, note that in that option you need to close the session.

    example:

    svc = get_online_feature_service(vector_uri)
    try:
        resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}])
        print(resp)
        resp = svc.get([{"ticker": "AAPL"}], as_list=True)
        print(resp)
    
    finally:
        svc.close()
    

    example with imputing:

    svc = get_online_feature_service(vector_uri, impute_policy={"*": "$mean", "amount": 0))
    try:
        resp = svc.get([{"id": "C123487"}])
    except Exception as e:
        handling exception...
    finally:
        svc.close()
    
Parameters
  • feature_vector – feature vector uri or FeatureVector object. passing feature vector obj requires update permissions

  • run_config – function and/or run configuration for remote jobs/services

  • impute_policy – a dict with impute_policy per feature, the dict key is the feature name and the dict value indicate which value will be used in case the feature is NaN/empty, the replaced value can be fixed number for constants or $mean, $max, $min, $std, $count for statistical values. “*” is used to specify the default for all features, example: {“*”: “$mean”}

  • fixed_window_type – determines how to query the fixed window values which were previously inserted by ingest

  • update_stats – update features statistics from the requested feature sets on the vector. Default is False.

mlrun.feature_store.ingest(featureset: Optional[Union[mlrun.feature_store.feature_set.FeatureSet, str]] = None, source=None, targets: Optional[List[mlrun.model.DataTargetBase]] = None, namespace=None, return_df: bool = True, infer_options: mlrun.data_types.data_types.InferOptions = 63, run_config: Optional[mlrun.feature_store.common.RunConfig] = None, mlrun_context=None, spark_context=None, overwrite=None)pandas.core.frame.DataFrame[source]

Read local DataFrame, file, URL, or source into the feature store Ingest reads from the source, run the graph transformations, infers metadata and stats and writes the results to the default of specified targets

when targets are not specified data is stored in the configured default targets (will usually be NoSQL for real-time and Parquet for offline).

the run_config parameter allow specifying the function and job configuration, see: RunConfig

example:

stocks_set = FeatureSet("stocks", entities=[Entity("ticker")])
stocks = pd.read_csv("stocks.csv")
df = ingest(stocks_set, stocks, infer_options=fstore.InferOptions.default())

# for running as remote job
config = RunConfig(image='mlrun/mlrun')
df = ingest(stocks_set, stocks, run_config=config)

# specify source and targets
source = CSVSource("mycsv", path="measurements.csv")
targets = [CSVTarget("mycsv", path="./mycsv.csv")]
ingest(measurements, source, targets)
Parameters
  • featureset – feature set object or featureset.uri. (uri must be of a feature set that is in the DB, call .save() if it’s not)

  • source – source dataframe or other sources (e.g. parquet source see: ParquetSource and other classes in mlrun.datastore with suffix Source)

  • targets – optional list of data target objects

  • namespace – namespace or module containing graph classes

  • return_df – indicate if to return a dataframe with the graph results

  • infer_options – schema and stats infer options

  • run_config – function and/or run configuration for remote jobs, see RunConfig

  • mlrun_context – mlrun context (when running as a job), for internal use !

  • spark_context – local spark session for spark ingestion, example for creating the spark context: spark = SparkSession.builder.appName(“Spark function”).getOrCreate() For remote spark ingestion, this should contain the remote spark service name

  • overwrite

    delete the targets’ data prior to ingestion (default: True for non scheduled ingest - deletes the targets that are about to be ingested.

    False for scheduled ingest - does not delete the target)

mlrun.feature_store.preview(featureset: mlrun.feature_store.feature_set.FeatureSet, source, entity_columns: Optional[list] = None, timestamp_key: Optional[str] = None, namespace=None, options: Optional[mlrun.data_types.data_types.InferOptions] = None, verbose: bool = False, sample_size: Optional[int] = None)pandas.core.frame.DataFrame[source]

run the ingestion pipeline with local DataFrame/file data and infer features schema and stats

example:

quotes_set = FeatureSet("stock-quotes", entities=[Entity("ticker")])
quotes_set.add_aggregation("ask", ["sum", "max"], ["1h", "5h"], "10m")
quotes_set.add_aggregation("bid", ["min", "max"], ["1h"], "10m")
df = preview(
    quotes_set,
    quotes_df,
    entity_columns=["ticker"],
    timestamp_key="time",
)
Parameters
  • featureset – feature set object or uri

  • source – source dataframe or csv/parquet file path

  • entity_columns – list of entity (index) column names

  • timestamp_key – timestamp column name

  • namespace – namespace or module containing graph classes

  • options – schema and stats infer options (InferOptions)

  • verbose – verbose log

  • sample_size – num of rows to sample from the dataset (for large datasets)

class mlrun.feature_store.steps.DateExtractor(parts: Union[Dict[str, str], List[str]], timestamp_col: Optional[str] = None, **kwargs)[source]

Date Extractor allows you to extract a date-time component

Date Extractor extract a date-time component into new columns

The extracted date part will appear as <timestamp_col>_<date_part> feature.

Supports part values:

  • asm8: Return numpy datetime64 format in nanoseconds.

  • day_of_week: Return day of the week.

  • day_of_year: Return the day of the year.

  • dayofweek: Return day of the week.

  • dayofyear: Return the day of the year.

  • days_in_month: Return the number of days in the month.

  • daysinmonth: Return the number of days in the month.

  • freqstr: Return the total number of days in the month.

  • is_leap_year: Return True if year is a leap year.

  • is_month_end: Return True if date is last day of month.

  • is_month_start: Return True if date is first day of month.

  • is_quarter_end: Return True if date is last day of the quarter.

  • is_quarter_start: Return True if date is first day of the quarter.

  • is_year_end: Return True if date is last day of the year.

  • is_year_start: Return True if date is first day of the year.

  • quarter: Return the quarter of the year.

  • tz: Alias for tzinfo.

  • week: Return the week number of the year.

  • weekofyear: Return the week number of the year.

example:

# (taken from the fraud-detection end-to-end feature store demo)
# Define the Transactions FeatureSet
transaction_set = fs.FeatureSet("transactions",
                                entities=[fs.Entity("source")],
                                timestamp_key='timestamp',
                                description="transactions feature set")

# Get FeatureSet computation graph
transaction_graph = transaction_set.graph

# Add the custom `DateExtractor` step
# to the computation graph
transaction_graph.to(
        class_name='DateExtractor',
        name='Extract Dates',
        parts = ['hour', 'day_of_week'],
        timestamp_col = 'timestamp',
    )
Parameters
  • parts – list of pandas style date-time parts you want to extract.

  • timestamp_col – The name of the column containing the timestamps to extract from, by default “timestamp”

__init__(parts: Union[Dict[str, str], List[str]], timestamp_col: Optional[str] = None, **kwargs)[source]

Date Extractor extract a date-time component into new columns

The extracted date part will appear as <timestamp_col>_<date_part> feature.

Supports part values:

  • asm8: Return numpy datetime64 format in nanoseconds.

  • day_of_week: Return day of the week.

  • day_of_year: Return the day of the year.

  • dayofweek: Return day of the week.

  • dayofyear: Return the day of the year.

  • days_in_month: Return the number of days in the month.

  • daysinmonth: Return the number of days in the month.

  • freqstr: Return the total number of days in the month.

  • is_leap_year: Return True if year is a leap year.

  • is_month_end: Return True if date is last day of month.

  • is_month_start: Return True if date is first day of month.

  • is_quarter_end: Return True if date is last day of the quarter.

  • is_quarter_start: Return True if date is first day of the quarter.

  • is_year_end: Return True if date is last day of the year.

  • is_year_start: Return True if date is first day of the year.

  • quarter: Return the quarter of the year.

  • tz: Alias for tzinfo.

  • week: Return the week number of the year.

  • weekofyear: Return the week number of the year.

example:

# (taken from the fraud-detection end-to-end feature store demo)
# Define the Transactions FeatureSet
transaction_set = fs.FeatureSet("transactions",
                                entities=[fs.Entity("source")],
                                timestamp_key='timestamp',
                                description="transactions feature set")

# Get FeatureSet computation graph
transaction_graph = transaction_set.graph

# Add the custom `DateExtractor` step
# to the computation graph
transaction_graph.to(
        class_name='DateExtractor',
        name='Extract Dates',
        parts = ['hour', 'day_of_week'],
        timestamp_col = 'timestamp',
    )
Parameters
  • parts – list of pandas style date-time parts you want to extract.

  • timestamp_col – The name of the column containing the timestamps to extract from, by default “timestamp”

class mlrun.feature_store.steps.FeaturesetValidator(featureset=None, columns=None, name=None, **kwargs)[source]

Validate feature values according to the feature set validation policy

Validate feature values according to the feature set validation policy

Parameters
  • featureset – feature set uri (or “.” for current feature set pipeline)

  • columns – names of the columns/fields to validate

  • name – step name

  • kwargs – optional kwargs (for storey)

__init__(featureset=None, columns=None, name=None, **kwargs)[source]

Validate feature values according to the feature set validation policy

Parameters
  • featureset – feature set uri (or “.” for current feature set pipeline)

  • columns – names of the columns/fields to validate

  • name – step name

  • kwargs – optional kwargs (for storey)

class mlrun.feature_store.steps.Imputer(method: str = 'avg', default_value=None, mapping: Optional[Dict[str, Dict[str, Any]]] = None, **kwargs)[source]

Replace None values with default values

Parameters
  • method – for future use

  • default_value – default value if not specified per column

  • mapping – a dict of per column default value

  • kwargs – optional kwargs (for storey)

__init__(method: str = 'avg', default_value=None, mapping: Optional[Dict[str, Dict[str, Any]]] = None, **kwargs)[source]

Replace None values with default values

Parameters
  • method – for future use

  • default_value – default value if not specified per column

  • mapping – a dict of per column default value

  • kwargs – optional kwargs (for storey)

class mlrun.feature_store.steps.MapValues(mapping: Dict[str, Dict[str, Any]], with_original_features: bool = False, suffix: str = 'mapped', **kwargs)[source]

Map column values to new values

Map column values to new values

example:

# replace the value "U" with '0' in the age column
graph.to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True))

# replace integers, example
graph.to(MapValues(mapping={'not': {0: 1, 1: 0}}))

# replace by range, use -inf and inf for extended range
graph.to(MapValues(mapping={'numbers': {'ranges': {'negative': [-inf, 0], 'positive': [0, inf]}}}))
Parameters
  • mapping – a dict with entry per column and the associated old/new values map

  • with_original_features – set to True to keep the original features

  • suffix – the suffix added to the column name <column>_<suffix> (default is “mapped”)

  • kwargs – optional kwargs (for storey)

__init__(mapping: Dict[str, Dict[str, Any]], with_original_features: bool = False, suffix: str = 'mapped', **kwargs)[source]

Map column values to new values

example:

# replace the value "U" with '0' in the age column
graph.to(MapValues(mapping={'age': {'U': '0'}}, with_original_features=True))

# replace integers, example
graph.to(MapValues(mapping={'not': {0: 1, 1: 0}}))

# replace by range, use -inf and inf for extended range
graph.to(MapValues(mapping={'numbers': {'ranges': {'negative': [-inf, 0], 'positive': [0, inf]}}}))
Parameters
  • mapping – a dict with entry per column and the associated old/new values map

  • with_original_features – set to True to keep the original features

  • suffix – the suffix added to the column name <column>_<suffix> (default is “mapped”)

  • kwargs – optional kwargs (for storey)

class mlrun.feature_store.steps.OneHotEncoder(mapping: Dict[str, Dict[str, Any]], **kwargs)[source]

Create new binary fields, one per category (one hot encoded)

example:

mapping = {'category': ['food', 'health', 'transportation'],
           'gender': ['male', 'female']}
graph.to(OneHotEncoder(mapping=one_hot_encoder_mapping))
Parameters
  • mapping – a dict of per column categories (to map to binary fields)

  • kwargs – optional kwargs (for storey)

__init__(mapping: Dict[str, Dict[str, Any]], **kwargs)[source]

Create new binary fields, one per category (one hot encoded)

example:

mapping = {'category': ['food', 'health', 'transportation'],
           'gender': ['male', 'female']}
graph.to(OneHotEncoder(mapping=one_hot_encoder_mapping))
Parameters
  • mapping – a dict of per column categories (to map to binary fields)

  • kwargs – optional kwargs (for storey)

class mlrun.feature_store.steps.SetEventMetadata(id_path: Optional[str] = None, key_path: Optional[str] = None, time_path: Optional[str] = None, random_id: Optional[bool] = None, **kwargs)[source]

Set the event metadata (id, key, timestamp) from the event body

Set the event metadata (id, key, timestamp) from the event body

set the event metadata fields (id, key, and time) from the event body data structure the xx_path attribute defines the key or path to the value in the body dict, “.” in the path string indicate the value is in a nested dict e.g. “x.y” means {“x”: {“y”: value}}

example:

flow = function.set_topology("flow")
# build a graph and use the SetEventMetadata step to extract the id, key and path from the event body
# ("myid", "mykey" and "mytime" fields), the metadata will be used for following data processing steps
# (e.g. feature store ops, time/key aggregations, write to databases/streams, etc.)
flow.to(SetEventMetadata(id_path="myid", key_path="mykey", time_path="mytime"))
    .to(...)  # additional steps

server = function.to_mock_server()
event = {"myid": "34", "mykey": "123", "mytime": "2022-01-18 15:01"}
resp = server.test(body=event)
Parameters
  • id_path – path to the id value

  • key_path – path to the key value

  • time_path – path to the time value (value should be of type str or datetime)

  • random_id – if True will set the event.id to a random value

__init__(id_path: Optional[str] = None, key_path: Optional[str] = None, time_path: Optional[str] = None, random_id: Optional[bool] = None, **kwargs)[source]

Set the event metadata (id, key, timestamp) from the event body

set the event metadata fields (id, key, and time) from the event body data structure the xx_path attribute defines the key or path to the value in the body dict, “.” in the path string indicate the value is in a nested dict e.g. “x.y” means {“x”: {“y”: value}}

example:

flow = function.set_topology("flow")
# build a graph and use the SetEventMetadata step to extract the id, key and path from the event body
# ("myid", "mykey" and "mytime" fields), the metadata will be used for following data processing steps
# (e.g. feature store ops, time/key aggregations, write to databases/streams, etc.)
flow.to(SetEventMetadata(id_path="myid", key_path="mykey", time_path="mytime"))
    .to(...)  # additional steps

server = function.to_mock_server()
event = {"myid": "34", "mykey": "123", "mytime": "2022-01-18 15:01"}
resp = server.test(body=event)
Parameters
  • id_path – path to the id value

  • key_path – path to the key value

  • time_path – path to the time value (value should be of type str or datetime)

  • random_id – if True will set the event.id to a random value