mlrun.feature_store#
- class mlrun.feature_store.Entity(name: str | None = None, value_type: ValueType | str | None = None, description: str | None = None, labels: dict[str, str] | None = None)[source]#
Bases:
ModelObj
data entity (index)
data entity (index key)
- Parameters:
name -- entity name
value_type -- type of the entity, e.g. ValueType.STRING, ValueType.INT (default ValueType.STRING)
description -- test description of the entity
labels -- a set of key/value labels (tags)
- kind = 'entity'#
- class mlrun.feature_store.Feature(value_type: ValueType | str | None = None, dims: list[int] | None = None, description: str | None = None, aggregate: bool | None = None, name: str | None = None, validator=None, default: str | None = None, labels: dict[str, str] | None = None)[source]#
Bases:
ModelObj
data feature
Features can be specified manually or inferred automatically (during ingest/preview)
- Parameters:
value_type -- type of the feature. Use the ValueType constants library e.g. ValueType.STRING, ValueType.INT (default ValueType.STRING)
dims -- list of dimensions for vectors/tensors, e.g. [2, 2]
description -- text description of the feature
aggregate -- is it an aggregated value
name -- name of the feature
validator -- feature validation policy
default -- default value
labels -- a set of key/value labels (tags)
- property validator#
- class mlrun.feature_store.FeatureSet(name: str | None = None, description: str | None = None, entities: list[Union[mlrun.features.Entity, str]] | None = None, timestamp_key: str | None = None, engine: str | None = None, label_column: str | None = None, relations: dict[str, Union[mlrun.features.Entity, str]] | None = None, passthrough: bool | None = None)[source]#
Bases:
ModelObj
Feature set object, defines a set of features and their data pipeline
example:
import mlrun.feature_store as fstore ticks = fstore.FeatureSet( "ticks", entities=["stock"], timestamp_key="timestamp" ) ticks.ingest(df)
- Parameters:
name -- name of the feature set
description -- text description
entities -- list of entity (index key) names or
Entity
timestamp_key -- timestamp column name
engine -- name of the processing engine (storey, pandas, or spark), defaults to storey
label_column -- name of the label column (the one holding the target (y) values)
relations -- dictionary that indicates all the relations this feature set have with another feature sets. The format of this dictionary is {"my_column":Entity, ...}
passthrough -- if true, ingest will skip offline targets, and get_offline_features will read directly from source
- add_aggregation(column, operations, windows, period=None, name=None, step_name=None, after=None, before=None, emit_policy: EmitPolicy | None = None)[source]#
add feature aggregation rule
example:
myset.add_aggregation("ask", ["sum", "max"], "1h", "10m", name="asks")
- Parameters:
column -- name of column/field aggregate. Do not name columns starting with either _ or aggr_. They are reserved for internal use, and the data does not ingest correctly. When using the pandas engine, do not use spaces (` ) or periods (.`) in the column names; they cause errors in the ingestion.
operations -- aggregation operations. Supported operations: count, sum, sqr, max, min, first, last, avg, stdvar, stddev
windows --
time windows, can be a single window, e.g. '1h', '1d', or a list of same unit windows e.g. ['1h', '6h'] windows are transformed to fixed windows or sliding windows depending whether period parameter provided.
Sliding window is fixed-size overlapping windows that slides with time. The window size determines the size of the sliding window and the period determines the step size to slide. Period must be integral divisor of the window size. If the period is not provided then fixed windows is used.
Fixed window is fixed-size, non-overlapping, gap-less window. The window is referred to as a tumbling window. In this case, each record on an in-application stream belongs to a specific window. It is processed only once (when the query processes the window to which the record belongs).
period -- optional, sliding window granularity, e.g. '20s' '10m' '3h' '7d'
name -- optional, aggregation name/prefix. Must be unique per feature set. If not passed, the column will be used as name.
step_name -- optional, graph step name
after -- optional, after which graph step it runs
before -- optional, comes before graph step
emit_policy -- optional, which emit policy to use when performing the aggregations. Use the derived classes of
storey.EmitPolicy
. The default is to emit every period for Spark engine and emit every event for storey. Currently the only other supported option is to useemit_policy=storey.EmitEveryEvent()
when using the Spark engine to emit every event
- add_entity(name: str, value_type: ValueType | None = None, description: str | None = None, labels: dict[str, str] | None = None)[source]#
add/set an entity (dataset index)
example:
import mlrun.feature_store as fstore ticks = fstore.FeatureSet( "ticks", entities=["stock"], timestamp_key="timestamp" ) ticks.add_entity( "country", mlrun.data_types.ValueType.STRING, description="stock country" ) ticks.add_entity("year", mlrun.data_types.ValueType.INT16) ticks.save()
- Parameters:
name -- entity name
value_type -- type of the entity (default to ValueType.STRING)
description -- description of the entity
labels -- label tags dict
- add_feature(feature: Feature, name=None)[source]#
add/set a feature
example:
import mlrun.feature_store as fstore from mlrun.features import Feature ticks = fstore.FeatureSet( "ticks", entities=["stock"], timestamp_key="timestamp" ) ticks.add_feature( Feature( value_type=mlrun.data_types.ValueType.STRING, description="client consistency", ), "ABC01", ) ticks.add_feature( Feature( value_type=mlrun.data_types.ValueType.FLOAT, description="client volatility", ), "SAB", ) ticks.save()
- Parameters:
feature -- setting of Feature
name -- feature name
- deploy_ingestion_service(source: DataSource | None = None, targets: list[mlrun.model.DataTargetBase] | None = None, name: str | None = None, run_config: RunConfig | None = None, verbose=False) tuple[str, mlrun.runtimes.base.BaseRuntime] [source]#
Start real-time ingestion service using nuclio function
Deploy a real-time function implementing feature ingestion pipeline the source maps to Nuclio event triggers (http, kafka, v3io stream, etc.)
the run_config parameter allow specifying the function and job configuration, see:
RunConfig
example:
source = HTTPSource() func = mlrun.code_to_function("ingest", kind="serving").apply(mount_v3io()) config = RunConfig(function=func) my_set.deploy_ingestion_service(source, run_config=config)
- Parameters:
source -- data source object describing the online or offline source
targets -- list of data target objects
name -- name for the job/function
run_config -- service runtime configuration (function object/uri, resources, etc..)
verbose -- verbose log
- Returns:
URL to access the deployed ingestion service, and the function that was deployed (which will differ from the function passed in via the run_config parameter).
- extract_relation_keys(other_feature_set, relations: dict[str, Union[mlrun.features.Entity, str]] | None = None) list[str] [source]#
Checks whether a feature set can be merged to the right of this feature set.
- Parameters:
other_feature_set -- The feature set to be merged to the right of this feature set.
relations -- The relations that were defined on this feature set.
- Returns:
If the two feature sets can be merged, a list of the left join keys is returned. Otherwise, an empty list is returned. (The right join keys are always the entities of the other feature set)
- property fullname: str#
full name in the form
{project}/{name}[:{tag}]
- property graph: RootFlowStep#
feature set transformation graph/DAG
- ingest(source=None, targets: list[mlrun.model.DataTargetBase] | None = None, namespace=None, return_df: bool = True, infer_options: InferOptions = 63, run_config: RunConfig | None = None, mlrun_context=None, spark_context=None, overwrite=None) DataFrame | None [source]#
Read local DataFrame, file, URL, or source into the feature store Ingest reads from the source, run the graph transformations, infers metadata and stats and writes the results to the default of specified targets
when targets are not specified data is stored in the configured default targets (will usually be NoSQL for real-time and Parquet for offline).
the run_config parameter allow specifying the function and job configuration, see:
RunConfig
example:
stocks_set = FeatureSet("stocks", entities=[Entity("ticker")]) stocks = pd.read_csv("stocks.csv") df = stocks_set.ingest(stocks, infer_options=fstore.InferOptions.default()) # for running as remote job config = RunConfig(image="mlrun/mlrun") df = ingest(stocks_set, stocks, run_config=config) # specify source and targets source = CSVSource("mycsv", path="measurements.csv") targets = [CSVTarget("mycsv", path="./mycsv.csv")] ingest(measurements, source, targets)
- Parameters:
source -- source dataframe or other sources (e.g. parquet source see:
ParquetSource
and other classes in mlrun.datastore with suffix Source)targets -- optional list of data target objects
namespace -- namespace or module containing graph classes
return_df -- indicate if to return a dataframe with the graph results
infer_options -- schema (for discovery of entities, features in featureset), index, stats, histogram and preview infer options (
InferOptions
)run_config -- function and/or run configuration for remote jobs, see
RunConfig
mlrun_context -- mlrun context (when running as a job), for internal use !
spark_context -- local spark session for spark ingestion, example for creating the spark context: spark = SparkSession.builder.appName("Spark function").getOrCreate() For remote spark ingestion, this should contain the remote spark service name
overwrite -- delete the targets' data prior to ingestion (default: True for non scheduled ingest - deletes the targets that are about to be ingested. False for scheduled ingest - does not delete the target)
- Returns:
if return_df is True, a dataframe will be returned based on the graph
- is_connectable_to_df(df_columns: list[str]) bool [source]#
This method checks if the dataframe can be left-joined with this feature set
- Parameters:
df_columns -- The columns of the data frame you want to merge to the left of this feature set
- Returns:
True if it can be left-joined and False otherwise
- kind = 'FeatureSet'#
- property metadata: VersionedObjMetadata#
- plot(filename=None, format=None, with_targets=False, **kw)[source]#
plot/save graph using graphviz
example:
import mlrun.feature_store as fstore ... ticks = fstore.FeatureSet( "ticks", entities=["stock"], timestamp_key="timestamp" ) ticks.add_aggregation( name="priceN", column="price", operations=["avg"], windows=["1d"], period="1h", ) ticks.plot(rankdir="LR", with_targets=True)
- Parameters:
filename -- target filepath for the graph image (None for the notebook)
format -- the output format used for rendering (
'pdf'
,'png'
, etc.)with_targets -- show targets in the graph image
kw -- kwargs passed to graphviz, e.g. rankdir=”LR” (see https://graphviz.org/doc/info/attrs.html)
- Returns:
graphviz graph object
- preview(source, entity_columns: list | None = None, namespace=None, options: InferOptions | None = None, verbose: bool = False, sample_size: int | None = None) DataFrame [source]#
run the ingestion pipeline with local DataFrame/file data and infer features schema and stats
example:
quotes_set = FeatureSet("stock-quotes", entities=[Entity("ticker")]) quotes_set.add_aggregation("ask", ["sum", "max"], ["1h", "5h"], "10m") quotes_set.add_aggregation("bid", ["min", "max"], ["1h"], "10m") df = quotes_set.preview( quotes_df, entity_columns=["ticker"], )
- Parameters:
source -- source dataframe or csv/parquet file path
entity_columns -- list of entity (index) column names
namespace -- namespace or module containing graph classes
options -- schema (for discovery of entities, features in featureset), index, stats, histogram and preview infer options (
InferOptions
)verbose -- verbose log
sample_size -- num of rows to sample from the dataset (for large datasets)
- purge_targets(target_names: list[str] | None = None, silent: bool = False)[source]#
Delete data of specific targets :param target_names: List of names of targets to delete (default: delete all ingested targets) :param silent: Fail silently if target doesn't exist in featureset status
- set_targets(targets=None, with_defaults=True, default_final_step=None, default_final_state=None)[source]#
set the desired target list or defaults
- Parameters:
targets -- list of target type names ('csv', 'nosql', ..) or target objects CSVTarget(), ParquetTarget(), NoSqlTarget(), StreamTarget(), ..
with_defaults -- add the default targets (as defined in the central config)
default_final_step -- the final graph step after which we add the target writers, used when the graph branches and the end cant be determined automatically
default_final_state -- Deprecated - use default_final_step instead
- property spec: FeatureSetSpec#
- property status: FeatureSetStatus#
- to_dataframe(columns=None, df_module=None, target_name=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#
return featureset (offline) data as dataframe
- Parameters:
columns -- list of columns to select (if not all)
df_module -- py module used to create the DataFrame (pd for Pandas, dd for Dask, ..)
target_name -- select a specific target (material view)
start_time -- filter by start time
end_time -- filter by end time
time_column -- specify the time column name in the file
kwargs -- additional reader (csv, parquet, ..) args
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- Returns:
DataFrame
- update_targets_for_ingest(targets: list[mlrun.model.DataTargetBase], overwrite: bool | None = None)[source]#
- property uri#
fully qualified feature set uri
- class mlrun.feature_store.FeatureVector(name=None, features=None, label_feature=None, description=None, with_indexes=None, join_graph: JoinGraph | None = None, relations: dict[str, dict[str, Union[mlrun.features.Entity, str]]] | None = None)[source]#
Bases:
ModelObj
Feature vector, specify selected features, their metadata and material views
Feature vector, specify selected features, their metadata and material views
example:
import mlrun.feature_store as fstore features = ["quotes.bid", "quotes.asks_sum_5h as asks_5h", "stocks.*"] vector = fstore.FeatureVector("my-vec", features) # get the vector as a dataframe df = vector.get_offline_features().to_dataframe() # return an online/real-time feature service svc = vector.get_online_feature_service(impute_policy={"*": "$mean"}) resp = svc.get([{"stock": "GOOG"}])
- Parameters:
name -- List of names of targets to delete (default: delete all ingested targets)
features -- list of feature to collect to this vector. Format [<project>/]<feature_set>.<feature_name or *> [as <alias>]
label_feature -- feature name to be used as label data
description -- text description of the vector
with_indexes -- whether to keep the entity and timestamp columns in the response
join_graph -- An optional JoinGraph object representing the graph of data joins between feature sets for this feature vector, specified the order and the join types.
relations -- {<feature_set name>: {<column_name>: <other entity object/name>, ...}...} An optional dictionary specifying the relations between feature sets in the feature vector. The keys of the dictionary are feature set names, and the values are dictionaries where the keys represent column names(of the feature set), and the values represent the target entities to join with. The relations provided here will take precedence over the relations that were specified on the feature sets themselves. In case a specific feature set is not mentioned as a key here, the function will fall back to using the default relations defined in the feature set.
- get_feature_set_relations(feature_set: str | FeatureSet)[source]#
- get_offline_features(entity_rows=None, entity_timestamp_column: str | None = None, target: DataTargetBase | None = None, run_config: RunConfig | None = None, drop_columns: list[str] | None = None, start_time: str | datetime | None = None, end_time: str | datetime | None = None, with_indexes: bool = False, update_stats: bool = False, engine: str | None = None, engine_args: dict | None = None, query: str | None = None, order_by: str | list[str] | None = None, spark_service: str | None = None, timestamp_for_filtering: str | dict[str, str] | None = None, additional_filters: list | None = None)[source]#
retrieve offline feature vector results
specify a feature vector object/uri and retrieve the desired features, their metadata and statistics. returns
OfflineVectorResponse
, results can be returned as a dataframe or written to a targetThe start_time and end_time attributes allow filtering the data to a given time range, they accept string values or pandas Timestamp objects, string values can also be relative, for example: "now", "now - 1d2h", "now+5m", where a valid pandas Timedelta string follows the verb "now", for time alignment you can use the verb "floor" e.g. "now -1d floor 1H" will align the time to the last hour (the floor string is passed to pandas.Timestamp.floor(), can use D, H, T, S for day, hour, min, sec alignment). Another option to filter the data is by the query argument - can be seen in the example. example:
features = [ "stock-quotes.bid", "stock-quotes.asks_sum_5h", "stock-quotes.ask as mycol", "stocks.*", ] vector = FeatureVector(features=features) vector.get_offline_features(entity_rows=trades, entity_timestamp_column="time", query="ticker in ['GOOG'] and bid>100") print(resp.to_dataframe()) print(vector.get_stats_table()) resp.to_parquet("./out.parquet")
- Parameters:
entity_rows -- dataframe with entity rows to join with
target -- where to write the results to
drop_columns -- list of columns to drop from the final result
entity_timestamp_column -- timestamp column name in the entity rows dataframe. can be specified only if param entity_rows was specified.
run_config -- function and/or run configuration see
RunConfig
start_time -- datetime, low limit of time needed to be filtered. Optional.
end_time -- datetime, high limit of time needed to be filtered. Optional.
with_indexes -- Return vector with/without the entities and the timestamp_key of the feature sets and with/without entity_timestamp_column and timestamp_for_filtering columns. This property can be specified also in the feature vector spec (feature_vector.spec.with_indexes) (default False)
update_stats -- update features statistics from the requested feature sets on the vector. (default False).
engine -- processing engine kind ("local", "dask", or "spark")
engine_args -- kwargs for the processing engine
query -- The query string used to filter rows on the output
spark_service -- Name of the spark service to be used (when using a remote-spark runtime)
order_by -- Name or list of names to order by. The name or the names in the list can be the feature name or the alias of the feature you pass in the feature list.
timestamp_for_filtering -- name of the column to filter by, can be str for all the feature sets or a dictionary ({<feature set name>: <timestamp column name>, ...}) that indicates the timestamp column name for each feature set. Optional. By default, the filter executes on the timestamp_key of each feature set. Note: the time filtering is performed on each feature set before the merge process using start_time and end_time params.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- get_online_feature_service(run_config: RunConfig | None = None, fixed_window_type: FixedWindowType = FixedWindowType.LastClosedWindow, impute_policy: dict | None = None, update_stats: bool = False, entity_keys: list[str] | None = None)[source]#
initialize and return online feature vector service api, returns
OnlineVectorService
- Usage:
There are two ways to use the function:
As context manager
Example:
with vector_uri.get_online_feature_service() as svc: resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}]) print(resp) resp = svc.get([{"ticker": "AAPL"}], as_list=True) print(resp)
Example with imputing:
with vector_uri.get_online_feature_service(entity_keys=['id'], impute_policy={"*": "$mean", "amount": 0)) as svc: resp = svc.get([{"id": "C123487"}])
as simple function, note that in that option you need to close the session.
Example:
svc = vector_uri.get_online_feature_service(entity_keys=["ticker"]) try: resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}]) print(resp) resp = svc.get([{"ticker": "AAPL"}], as_list=True) print(resp) finally: svc.close()
Example with imputing:
svc = vector_uri.get_online_feature_service(entity_keys=['id'], impute_policy={"*": "$mean", "amount": 0)) try: resp = svc.get([{"id": "C123487"}]) except Exception as e: handling exception... finally: svc.close()
- Parameters:
run_config -- function and/or run configuration for remote jobs/services
impute_policy -- a dict with impute_policy per feature, the dict key is the feature name and the dict value indicate which value will be used in case the feature is NaN/empty, the replaced value can be fixed number for constants or $mean, $max, $min, $std, $count for statistical values. "*" is used to specify the default for all features, example: {"*": "$mean"}
fixed_window_type -- determines how to query the fixed window values which were previously inserted by ingest
update_stats -- update features statistics from the requested feature sets on the vector. Default: False.
entity_keys -- Entity list of the first feature_set in the vector. The indexes that are used to query the online service.
- Returns:
Initialize the OnlineVectorService. Will be used in subclasses where support_online=True.
- kind = 'FeatureVector'#
- property metadata: VersionedObjMetadata#
- parse_features(offline=True, update_stats=False)[source]#
parse and validate feature list (from vector) and add metadata from feature sets
- :returns
feature_set_objects: cache of used feature set objects feature_set_fields: list of field (name, alias) per featureset
- property spec: FeatureVectorSpec#
- property status: FeatureVectorStatus#
- to_dataframe(df_module=None, target_name=None)[source]#
return feature vector (offline) data as dataframe
- property uri#
fully qualified feature vector uri
- class mlrun.feature_store.FixedWindowType(value)[source]#
Bases:
Enum
An enumeration.
- CurrentOpenWindow = 1#
- LastClosedWindow = 2#
- class mlrun.feature_store.OfflineVectorResponse(merger)[source]#
Bases:
object
get_offline_features response object
- property status#
vector prep job status (ready, running, error)
- class mlrun.feature_store.OnlineVectorService(vector, graph, index_columns, impute_policy: dict | None = None, requested_columns: list[str] | None = None)[source]#
Bases:
object
get_online_feature_service response object
- get(entity_rows: list[Union[dict, list]], as_list=False)[source]#
get feature vector given the provided entity inputs
take a list of input vectors/rows and return a list of enriched feature vectors each input and/or output vector can be a list of values or a dictionary of field names and values, to return the vector as a list of values set the as_list to True.
if the input is a list of list (vs a list of dict), the values in the list will correspond to the index/entity values, i.e. [["GOOG"], ["MSFT"]] means "GOOG" and "MSFT" are the index/entity fields.
example:
# accept list of dict, return list of dict svc = fstore.get_online_feature_service(vector) resp = svc.get([{"name": "joe"}, {"name": "mike"}]) # accept list of list, return list of list svc = fstore.get_online_feature_service(vector, as_list=True) resp = svc.get([["joe"], ["mike"]])
- Parameters:
entity_rows -- list of list/dict with input entity data/rows
as_list -- return a list of list (list input is required by many ML frameworks)
- property status#
vector merger function status (ready, running, error)
- class mlrun.feature_store.RunConfig(function: str | FunctionReference | BaseRuntime | None = None, local: bool | None = None, image: str | None = None, kind: str | None = None, handler: str | None = None, parameters: dict | None = None, watch: bool | None = None, owner=None, credentials: Credentials | None = None, code: str | None = None, requirements: str | list[str] | None = None, extra_spec: dict | None = None, auth_info=None)[source]#
Bases:
object
class for holding function and run specs for jobs and serving functions
class for holding function and run specs for jobs and serving functions
when running feature ingestion or merging tasks we use the RunConfig class to pass the desired function and job configuration. the apply() method is used to set resources like volumes, the with_secret() method adds secrets
Most attributes are optional, if not specified a proper default value will be set
examples:
# config for local run emulation config = RunConfig(local=True) # config for using empty/default code config = RunConfig() # config for using .py/.ipynb file with image and extra package requirements config = RunConfig("mycode.py", image="mlrun/mlrun", requirements=["spacy"]) # config for using function object function = mlrun.import_function("hub://some-function") config = RunConfig(function)
- Parameters:
function -- this can be function uri or function object or path to function code (.py/.ipynb) or a
FunctionReference
the function define the code, dependencies, and resourceslocal -- use True to simulate local job run or mock service
image -- function container image
kind -- function runtime kind (job, serving, spark, ..), required when function points to code
handler -- the function handler to execute (for jobs or nuclio)
parameters -- job parameters
watch -- in batch jobs will wait for the job completion and print job logs to the console. Default (None) is True.
owner -- job owner
credentials -- job credentials
code -- function source code (as string)
requirements -- python requirements file path or list of packages
extra_spec -- additional dict with function spec fields/values to add to the function
auth_info -- authentication info. For internal use when running on server
- apply(modifier)[source]#
apply a modifier to add/set function resources like volumes
example:
run_config.apply(mlrun.platforms.auto_mount())
- property function#
- with_secret(kind, source)[source]#
register a secrets source (file, env or dict)
read secrets from a source provider to be used in jobs, example:
run_config.with_secrets('file', 'file.txt') run_config.with_secrets('inline', {'key': 'val'}) run_config.with_secrets('env', 'ENV1,ENV2') run_config.with_secrets('vault', ['secret1', 'secret2'...])
- Parameters:
kind -- secret type (file, inline, env, vault)
source -- secret data or link (see example)
- Returns:
This (self) object
- mlrun.feature_store.delete_feature_set(name, project='', tag=None, uid=None, force=False)[source]#
Delete a
FeatureSet
object from the DB.- Parameters:
name -- Name of the object to delete
project -- Name of the object's project
tag -- Specific object's version tag
uid -- Specific object's uid
force -- Delete feature set without purging its targets
- If
tag
oruid
are specified, then just the version referenced by them will be deleted. Using both is not allowed. If none are specified, then all instances of the object whose name is
name
will be deleted.
- mlrun.feature_store.delete_feature_vector(name, project='', tag=None, uid=None)[source]#
Delete a
FeatureVector
object from the DB.- Parameters:
name -- Name of the object to delete
project -- Name of the object's project
tag -- Specific object's version tag
uid -- Specific object's uid
- If
tag
oruid
are specified, then just the version referenced by them will be deleted. Using both is not allowed. If none are specified, then all instances of the object whose name is
name
will be deleted.
- mlrun.feature_store.deploy_ingestion_service_v2(featureset: FeatureSet | str, source: DataSource | None = None, targets: list[mlrun.model.DataTargetBase] | None = None, name: str | None = None, run_config: RunConfig | None = None, verbose=False) tuple[str, mlrun.runtimes.base.BaseRuntime] [source]#
Start real-time ingestion service using nuclio function
Deploy a real-time function implementing feature ingestion pipeline the source maps to Nuclio event triggers (http, kafka, v3io stream, etc.)
the run_config parameter allow specifying the function and job configuration, see:
RunConfig
example:
source = HTTPSource() func = mlrun.code_to_function("ingest", kind="serving").apply(mount_v3io()) config = RunConfig(function=func) deploy_ingestion_service_v2(my_set, source, run_config=config)
- Parameters:
featureset -- feature set object or uri
source -- data source object describing the online or offline source
targets -- list of data target objects
name -- name for the job/function
run_config -- service runtime configuration (function object/uri, resources, etc..)
verbose -- verbose log
- Returns:
URL to access the deployed ingestion service, and the function that was deployed (which will differ from the function passed in via the run_config parameter).
- mlrun.feature_store.get_feature_set(uri, project=None)[source]#
get feature set object from the db
- Parameters:
uri -- a feature set uri({project}/{name}[:version])
project -- project name if not specified in uri or not using the current/default
- mlrun.feature_store.get_feature_vector(uri, project=None)[source]#
get feature vector object from the db
- Parameters:
uri -- a feature vector uri({project}/{name}[:version])
project -- project name if not specified in uri or not using the current/default
- mlrun.feature_store.get_offline_features(feature_vector: str | FeatureVector, entity_rows=None, entity_timestamp_column: str | None = None, target: DataTargetBase | None = None, run_config: RunConfig | None = None, drop_columns: list[str] | None = None, start_time: str | datetime | None = None, end_time: str | datetime | None = None, with_indexes: bool = False, update_stats: bool = False, engine: str | None = None, engine_args: dict | None = None, query: str | None = None, order_by: str | list[str] | None = None, spark_service: str | None = None, timestamp_for_filtering: str | dict[str, str] | None = None, additional_filters: list | None = None)[source]#
retrieve offline feature vector results
specify a feature vector object/uri and retrieve the desired features, their metadata and statistics. returns
OfflineVectorResponse
, results can be returned as a dataframe or written to a targetThe start_time and end_time attributes allow filtering the data to a given time range, they accept string values or pandas Timestamp objects, string values can also be relative, for example: "now", "now - 1d2h", "now+5m", where a valid pandas Timedelta string follows the verb "now", for time alignment you can use the verb "floor" e.g. "now -1d floor 1H" will align the time to the last hour (the floor string is passed to pandas.Timestamp.floor(), can use D, H, T, S for day, hour, min, sec alignment). Another option to filter the data is by the query argument - can be seen in the example. example:
features = [ "stock-quotes.bid", "stock-quotes.asks_sum_5h", "stock-quotes.ask as mycol", "stocks.*", ] vector = FeatureVector(features=features) resp = get_offline_features( vector, entity_rows=trades, entity_timestamp_column="time", query="ticker in ['GOOG'] and bid>100", ) print(resp.to_dataframe()) print(vector.get_stats_table()) resp.to_parquet("./out.parquet")
- Parameters:
feature_vector -- feature vector uri or FeatureVector object. passing feature vector obj requires update permissions
entity_rows -- dataframe with entity rows to join with
target -- where to write the results to
drop_columns -- list of columns to drop from the final result
entity_timestamp_column -- timestamp column name in the entity rows dataframe. can be specified only if param entity_rows was specified.
run_config -- function and/or run configuration see
RunConfig
start_time -- datetime, low limit of time needed to be filtered. Optional.
end_time -- datetime, high limit of time needed to be filtered. Optional.
with_indexes -- Return vector with/without the entities and the timestamp_key of the feature sets and with/without entity_timestamp_column and timestamp_for_filtering columns. This property can be specified also in the feature vector spec (feature_vector.spec.with_indexes) (default False)
update_stats -- update features statistics from the requested feature sets on the vector. (default False).
engine -- processing engine kind ("local", "dask", or "spark")
engine_args -- kwargs for the processing engine
query -- The query string used to filter rows on the output
spark_service -- Name of the spark service to be used (when using a remote-spark runtime)
order_by -- Name or list of names to order by. The name or the names in the list can be the feature name or the alias of the feature you pass in the feature list.
timestamp_for_filtering -- name of the column to filter by, can be str for all the feature sets or a dictionary ({<feature set name>: <timestamp column name>, ...}) that indicates the timestamp column name for each feature set. Optional. By default, the filter executes on the timestamp_key of each feature set. Note: the time filtering is performed on each feature set before the merge process using start_time and end_time params.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- mlrun.feature_store.get_online_feature_service(feature_vector: str | FeatureVector, run_config: RunConfig | None = None, fixed_window_type: FixedWindowType = FixedWindowType.LastClosedWindow, impute_policy: dict | None = None, update_stats: bool = False, entity_keys: list[str] | None = None)[source]#
initialize and return online feature vector service api, returns
OnlineVectorService
- Usage:
There are two ways to use the function:
As context manager
Example:
with get_online_feature_service(vector_uri) as svc: resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}]) print(resp) resp = svc.get([{"ticker": "AAPL"}], as_list=True) print(resp)
Example with imputing:
with get_online_feature_service(vector_uri, entity_keys=['id'], impute_policy={"*": "$mean", "amount": 0)) as svc: resp = svc.get([{"id": "C123487"}])
as simple function, note that in that option you need to close the session.
Example:
svc = get_online_feature_service(vector_uri, entity_keys=["ticker"]) try: resp = svc.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}]) print(resp) resp = svc.get([{"ticker": "AAPL"}], as_list=True) print(resp) finally: svc.close()
Example with imputing:
svc = get_online_feature_service(vector_uri, entity_keys=['id'], impute_policy={"*": "$mean", "amount": 0)) try: resp = svc.get([{"id": "C123487"}]) except Exception as e: handling exception... finally: svc.close()
- Parameters:
feature_vector -- feature vector uri or FeatureVector object. passing feature vector obj requires update permissions.
run_config -- function and/or run configuration for remote jobs/services
impute_policy -- a dict with impute_policy per feature, the dict key is the feature name and the dict value indicate which value will be used in case the feature is NaN/empty, the replaced value can be fixed number for constants or $mean, $max, $min, $std, $count for statistical values. "*" is used to specify the default for all features, example: {"*": "$mean"}
fixed_window_type -- determines how to query the fixed window values which were previously inserted by ingest
update_stats -- update features statistics from the requested feature sets on the vector. Default: False.
entity_keys -- Entity list of the first feature_set in the vector. The indexes that are used to query the online service.
- Returns:
Initialize the OnlineVectorService. Will be used in subclasses where support_online=True.
- mlrun.feature_store.ingest(featureset: FeatureSet | str | None = None, source=None, targets: list[mlrun.model.DataTargetBase] | None = None, namespace=None, return_df: bool = True, infer_options: InferOptions = 63, run_config: RunConfig | None = None, mlrun_context=None, spark_context=None, overwrite=None) DataFrame | None [source]#
Read local DataFrame, file, URL, or source into the feature store Ingest reads from the source, run the graph transformations, infers metadata and stats and writes the results to the default of specified targets
when targets are not specified data is stored in the configured default targets (will usually be NoSQL for real-time and Parquet for offline).
the run_config parameter allow specifying the function and job configuration, see:
RunConfig
example:
stocks_set = FeatureSet("stocks", entities=[Entity("ticker")]) stocks = pd.read_csv("stocks.csv") df = ingest(stocks_set, stocks, infer_options=fstore.InferOptions.default()) # for running as remote job config = RunConfig(image="mlrun/mlrun") df = ingest(stocks_set, stocks, run_config=config) # specify source and targets source = CSVSource("mycsv", path="measurements.csv") targets = [CSVTarget("mycsv", path="./mycsv.csv")] ingest(measurements, source, targets)
- Parameters:
featureset -- feature set object or featureset.uri. (uri must be of a feature set that is in the DB, call .save() if it's not)
source -- source dataframe or other sources (e.g. parquet source see:
ParquetSource
and other classes in mlrun.datastore with suffix Source)targets -- optional list of data target objects
namespace -- namespace or module containing graph classes
return_df -- indicate if to return a dataframe with the graph results
infer_options -- schema (for discovery of entities, features in featureset), index, stats, histogram and preview infer options (
InferOptions
)run_config -- function and/or run configuration for remote jobs, see
RunConfig
mlrun_context -- mlrun context (when running as a job), for internal use !
spark_context -- local spark session for spark ingestion, example for creating the spark context: spark = SparkSession.builder.appName("Spark function").getOrCreate() For remote spark ingestion, this should contain the remote spark service name
overwrite -- delete the targets' data prior to ingestion (default: True for non scheduled ingest - deletes the targets that are about to be ingested. False for scheduled ingest - does not delete the target)
- Returns:
if return_df is True, a dataframe will be returned based on the graph
- mlrun.feature_store.preview(featureset: FeatureSet, source, entity_columns: list | None = None, namespace=None, options: InferOptions | None = None, verbose: bool = False, sample_size: int | None = None) DataFrame [source]#
run the ingestion pipeline with local DataFrame/file data and infer features schema and stats
example:
quotes_set = FeatureSet("stock-quotes", entities=[Entity("ticker")]) quotes_set.add_aggregation("ask", ["sum", "max"], ["1h", "5h"], "10m") quotes_set.add_aggregation("bid", ["min", "max"], ["1h"], "10m") df = preview( quotes_set, quotes_df, entity_columns=["ticker"], )
- Parameters:
featureset -- feature set object or uri
source -- source dataframe or csv/parquet file path
entity_columns -- list of entity (index) column names
namespace -- namespace or module containing graph classes
options -- schema (for discovery of entities, features in featureset), index, stats, histogram and preview infer options (
InferOptions
)verbose -- verbose log
sample_size -- num of rows to sample from the dataset (for large datasets)
- class mlrun.feature_store.feature_set.FeatureSetSpec(owner=None, description=None, entities=None, features=None, partition_keys=None, timestamp_key=None, label_column=None, relations=None, source=None, targets=None, graph=None, function=None, analysis=None, engine=None, output_path=None, passthrough=None)[source]#
Feature set spec object, defines the feature-set's configuration.
Warning
This class should not be modified directly. It is managed by the parent feature-set object or using feature-store APIs. Modifying the spec manually may result in unpredictable behaviour.
- Parameters:
description -- text description (copied from parent feature-set)
entities -- list of entity (index key) names or
Entity
features -- list of features -
Feature
partition_keys -- list of fields to partition results by (other than the default timestamp key)
timestamp_key -- timestamp column name
label_column -- name of the label column (the one holding the target (y) values)
targets -- list of data targets
graph -- the processing graph
function -- MLRun runtime to execute the feature-set in
engine -- name of the processing engine (storey, pandas, or spark), defaults to storey
output_path -- default location where to store results (defaults to MLRun's artifact path)
passthrough -- if true, ingest will skip offline targets, and get_offline_features will read directly from source
- class mlrun.feature_store.feature_set.FeatureSetStatus(state=None, targets=None, stats=None, preview=None, function_uri=None, run_uri=None)[source]#
Feature set status object, containing the current feature-set's status.
Warning
This class should not be modified directly. It is managed by the parent feature-set object or using feature-store APIs. Modifying the status manually may result in unpredictable behaviour.
- Parameters:
state -- object's current state
targets -- list of the data targets used in the last ingestion operation
stats -- feature statistics calculated in the last ingestion (if stats calculation was requested)
preview -- preview of the feature-set contents (if preview generation was requested)
function_uri -- function used to execute the feature-set graph
run_uri -- last run used for ingestion
- class mlrun.feature_store.steps.MLRunStep(**kwargs)[source]#
Abstract class for mlrun step. Can be used in pandas/storey/spark feature set ingestion. Extend this class and implement the relevant _do_XXX methods to support the required execution engines.
- _do_pandas(event)[source]#
The execution method for pandas engine.
- Parameters:
event -- Incoming event, a pandas.DataFrame object.
- _do_spark(event)[source]#
The execution method for spark engine.
- Parameters:
event -- Incoming event, a pyspark.sql.DataFrame object.
- class mlrun.feature_store.steps.DateExtractor(parts: dict[str, str] | list[str], timestamp_col: str | None = None, **kwargs)[source]#
Date Extractor extracts a date-time component into new columns
The extracted date part will appear as <timestamp_col>_<date_part> feature.
Supports part values:
asm8: Return numpy datetime64 format in nanoseconds.
day_of_week: Return day of the week.
day_of_year: Return the day of the year.
dayofweek: Return day of the week.
dayofyear: Return the day of the year.
days_in_month: Return the number of days in the month.
daysinmonth: Return the number of days in the month.
freqstr: Return the total number of days in the month.
is_leap_year: Return True if year is a leap year.
is_month_end: Return True if date is last day of month.
is_month_start: Return True if date is first day of month.
is_quarter_end: Return True if date is last day of the quarter.
is_quarter_start: Return True if date is first day of the quarter.
is_year_end: Return True if date is last day of the year.
is_year_start: Return True if date is first day of the year.
quarter: Return the quarter of the year.
tz: Alias for tzinfo.
week: Return the week number of the year.
weekofyear: Return the week number of the year.
example:
# (taken from the fraud-detection end-to-end feature store demo) # Define the Transactions FeatureSet transaction_set = fstore.FeatureSet( "transactions", entities=[fstore.Entity("source")], timestamp_key="timestamp", description="transactions feature set", ) # Get FeatureSet computation graph transaction_graph = transaction_set.graph # Add the custom `DateExtractor` step # to the computation graph transaction_graph.to( class_name="DateExtractor", name="Extract Dates", parts=["hour", "day_of_week"], timestamp_col="timestamp", )
- Parameters:
parts -- list of pandas style date-time parts you want to extract.
timestamp_col -- The name of the column containing the timestamps to extract from, by default "timestamp"
- __init__(parts: dict[str, str] | list[str], timestamp_col: str | None = None, **kwargs)[source]#
Date Extractor extracts a date-time component into new columns
The extracted date part will appear as <timestamp_col>_<date_part> feature.
Supports part values:
asm8: Return numpy datetime64 format in nanoseconds.
day_of_week: Return day of the week.
day_of_year: Return the day of the year.
dayofweek: Return day of the week.
dayofyear: Return the day of the year.
days_in_month: Return the number of days in the month.
daysinmonth: Return the number of days in the month.
freqstr: Return the total number of days in the month.
is_leap_year: Return True if year is a leap year.
is_month_end: Return True if date is last day of month.
is_month_start: Return True if date is first day of month.
is_quarter_end: Return True if date is last day of the quarter.
is_quarter_start: Return True if date is first day of the quarter.
is_year_end: Return True if date is last day of the year.
is_year_start: Return True if date is first day of the year.
quarter: Return the quarter of the year.
tz: Alias for tzinfo.
week: Return the week number of the year.
weekofyear: Return the week number of the year.
example:
# (taken from the fraud-detection end-to-end feature store demo) # Define the Transactions FeatureSet transaction_set = fstore.FeatureSet( "transactions", entities=[fstore.Entity("source")], timestamp_key="timestamp", description="transactions feature set", ) # Get FeatureSet computation graph transaction_graph = transaction_set.graph # Add the custom `DateExtractor` step # to the computation graph transaction_graph.to( class_name="DateExtractor", name="Extract Dates", parts=["hour", "day_of_week"], timestamp_col="timestamp", )
- Parameters:
parts -- list of pandas style date-time parts you want to extract.
timestamp_col -- The name of the column containing the timestamps to extract from, by default "timestamp"
- class mlrun.feature_store.steps.DropFeatures(features: list[str], **kwargs)[source]#
Drop all the features from feature list
- Parameters:
features -- string list of the features names to drop
example:
feature_set = fstore.FeatureSet( "fs-new", entities=[fstore.Entity("id")], description="feature set", engine="pandas", ) # Pre-processing graph steps feature_set.graph.to(DropFeatures(features=["age"])) df_pandas = feature_set.ingest(data)
- __init__(features: list[str], **kwargs)[source]#
Drop all the features from feature list
- Parameters:
features -- string list of the features names to drop
example:
feature_set = fstore.FeatureSet( "fs-new", entities=[fstore.Entity("id")], description="feature set", engine="pandas", ) # Pre-processing graph steps feature_set.graph.to(DropFeatures(features=["age"])) df_pandas = feature_set.ingest(data)
- class mlrun.feature_store.steps.FeaturesetValidator(featureset=None, columns=None, name=None, **kwargs)[source]#
Validate feature values according to the feature set validation policy
- Parameters:
featureset -- feature set uri (or "." for current feature set pipeline)
columns -- names of the columns/fields to validate
name -- step name
kwargs -- optional kwargs (for storey)
- __init__(featureset=None, columns=None, name=None, **kwargs)[source]#
Validate feature values according to the feature set validation policy
- Parameters:
featureset -- feature set uri (or "." for current feature set pipeline)
columns -- names of the columns/fields to validate
name -- step name
kwargs -- optional kwargs (for storey)
- class mlrun.feature_store.steps.Imputer(method: str = 'avg', default_value=None, mapping: dict[str, Any] | None = None, **kwargs)[source]#
Replace None values with default values
- Parameters:
method -- for future use
default_value -- default value if not specified per column
mapping -- a dict of per column default value
kwargs -- optional kwargs (for storey)
- __init__(method: str = 'avg', default_value=None, mapping: dict[str, Any] | None = None, **kwargs)[source]#
Replace None values with default values
- Parameters:
method -- for future use
default_value -- default value if not specified per column
mapping -- a dict of per column default value
kwargs -- optional kwargs (for storey)
- class mlrun.feature_store.steps.MapValues(mapping: dict[str, dict[Union[str, int, bool], Any]], with_original_features: bool = False, suffix: str = 'mapped', **kwargs)[source]#
Map column values to new values
example:
# replace the value "U" with '0' in the age column graph.to(MapValues(mapping={"age": {"U": "0"}}, with_original_features=True)) # replace integers, example graph.to(MapValues(mapping={"not": {0: 1, 1: 0}})) # replace by range, use -inf and inf for extended range graph.to( MapValues( mapping={ "numbers": {"ranges": {"negative": [-inf, 0], "positive": [0, inf]}} } ) )
- Parameters:
mapping -- a dict with entry per column and the associated old/new values map
with_original_features -- set to True to keep the original features
suffix -- the suffix added to the column name <column>_<suffix> (default is "mapped")
kwargs -- optional kwargs (for storey)
- __init__(mapping: dict[str, dict[Union[str, int, bool], Any]], with_original_features: bool = False, suffix: str = 'mapped', **kwargs)[source]#
Map column values to new values
example:
# replace the value "U" with '0' in the age column graph.to(MapValues(mapping={"age": {"U": "0"}}, with_original_features=True)) # replace integers, example graph.to(MapValues(mapping={"not": {0: 1, 1: 0}})) # replace by range, use -inf and inf for extended range graph.to( MapValues( mapping={ "numbers": {"ranges": {"negative": [-inf, 0], "positive": [0, inf]}} } ) )
- Parameters:
mapping -- a dict with entry per column and the associated old/new values map
with_original_features -- set to True to keep the original features
suffix -- the suffix added to the column name <column>_<suffix> (default is "mapped")
kwargs -- optional kwargs (for storey)
- class mlrun.feature_store.steps.OneHotEncoder(mapping: dict[str, list[Union[int, str]]], **kwargs)[source]#
Create new binary fields, one per category (one hot encoded)
example:
mapping = { "category": ["food", "health", "transportation"], "gender": ["male", "female"], } graph.to(OneHotEncoder(mapping=one_hot_encoder_mapping))
- Parameters:
mapping -- a dict of per column categories (to map to binary fields)
kwargs -- optional kwargs (for storey)
- __init__(mapping: dict[str, list[Union[int, str]]], **kwargs)[source]#
Create new binary fields, one per category (one hot encoded)
example:
mapping = { "category": ["food", "health", "transportation"], "gender": ["male", "female"], } graph.to(OneHotEncoder(mapping=one_hot_encoder_mapping))
- Parameters:
mapping -- a dict of per column categories (to map to binary fields)
kwargs -- optional kwargs (for storey)
- class mlrun.feature_store.steps.SetEventMetadata(id_path: str | None = None, key_path: str | None = None, random_id: bool | None = None, **kwargs)[source]#
Set the event metadata (id, key) from the event body
set the event metadata fields (id and key) from the event body data structure the xx_path attribute defines the key or path to the value in the body dict, "." in the path string indicate the value is in a nested dict e.g. "x.y" means {"x": {"y": value}}
example:
flow = function.set_topology("flow") # build a graph and use the SetEventMetadata step to extract the id, key and path from the event body # ("myid" and "mykey" fields), the metadata will be used for following data processing steps # (e.g. feature store ops, key aggregations, write to databases/streams, etc.) flow.to(SetEventMetadata(id_path="myid", key_path="mykey")) .to(...) # additional steps server = function.to_mock_server() event = {"myid": "34", "mykey": "123"} resp = server.test(body=event)
- Parameters:
id_path -- path to the id value
key_path -- path to the key value
random_id -- if True will set the event.id to a random value
- __init__(id_path: str | None = None, key_path: str | None = None, random_id: bool | None = None, **kwargs) None [source]#
Set the event metadata (id, key) from the event body
set the event metadata fields (id and key) from the event body data structure the xx_path attribute defines the key or path to the value in the body dict, "." in the path string indicate the value is in a nested dict e.g. "x.y" means {"x": {"y": value}}
example:
flow = function.set_topology("flow") # build a graph and use the SetEventMetadata step to extract the id, key and path from the event body # ("myid" and "mykey" fields), the metadata will be used for following data processing steps # (e.g. feature store ops, key aggregations, write to databases/streams, etc.) flow.to(SetEventMetadata(id_path="myid", key_path="mykey")) .to(...) # additional steps server = function.to_mock_server() event = {"myid": "34", "mykey": "123"} resp = server.test(body=event)
- Parameters:
id_path -- path to the id value
key_path -- path to the key value
random_id -- if True will set the event.id to a random value