Feature store example (stocks)#
This notebook demonstrates the following:
Generate features and feature-sets
Build complex transformations and ingest to offline and real-time data stores
Fetch feature vectors for training
Save feature vectors for re-use in real-time pipelines
Access features and their statistics in real-time
Note
By default, this demo works with the online feature store, which is currently not part of the Open Source MLRun default deployment.
In this section
Get started#
Install the latest MLRun package and restart the notebook.
Setting up the environment and project:
import mlrun
mlrun.get_or_create_project("stocks", "./")
> 2023-02-05 11:43:17,605 [info] Created and saved project stocks: {'from_template': None, 'overwrite': False, 'context': './', 'save': True}
> 2023-02-05 11:43:17,607 [info] created project stocks and saved in MLRun DB
<mlrun.projects.project.MlrunProject at 0x7f689811ea10>
Create sample data for demo#
View the demo data#
quotes
trades
stocks
Define, infer and ingest feature sets#
import mlrun.feature_store as fstore
from mlrun.feature_store.steps import *
from mlrun.features import MinMaxValidator
Build and ingest simple feature set (stocks)#
# add feature set without time column (stock ticker metadata)
stocks_set = fstore.FeatureSet("stocks", entities=[fstore.Entity("ticker")])
fstore.ingest(stocks_set, stocks, infer_options=fstore.InferOptions.default())
| name | exchange | |
|---|---|---|
| ticker | ||
| MSFT | Microsoft Corporation | NASDAQ |
| GOOG | Alphabet Inc | NASDAQ |
| AAPL | Apple Inc | NASDAQ |
Build an advanced feature set - with feature engineering pipeline#
Define a feature set with custom data processing and time aggregation functions:
# create a new feature set
quotes_set = fstore.FeatureSet("stock-quotes", entities=[fstore.Entity("ticker")])
Define a custom pipeline step (python class)
class MyMap(MapClass):
def __init__(self, multiplier=1, **kwargs):
super().__init__(**kwargs)
self._multiplier = multiplier
def do(self, event):
event["multi"] = event["bid"] * self._multiplier
return event
Build and show the transformation pipeline
Use storey stream processing classes along with library and custom classes:
quotes_set.graph.to("MyMap", multiplier=3).to(
"storey.Extend", _fn="({'extra': event['bid'] * 77})"
).to("storey.Filter", "filter", _fn="(event['bid'] > 51.92)").to(FeaturesetValidator())
quotes_set.add_aggregation("ask", ["sum", "max"], "1h", "10m", name="asks1")
quotes_set.add_aggregation("ask", ["sum", "max"], "5h", "10m", name="asks5")
quotes_set.add_aggregation("bid", ["min", "max"], "1h", "10m", name="bids")
# add feature validation policy
quotes_set["bid"] = fstore.Feature(validator=MinMaxValidator(min=52, severity="info"))
# add default target definitions and plot
quotes_set.set_targets()
quotes_set.plot(rankdir="LR", with_targets=True)
Test and show the pipeline results locally (allow to quickly develop and debug)
fstore.preview(
quotes_set,
quotes,
entity_columns=["ticker"],
timestamp_key="time",
options=fstore.InferOptions.default(),
)
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.013574 args={'min': 52, 'value': 51.95}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.020574 args={'min': 52, 'value': 51.97}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.031574 args={'min': 52, 'value': 51.99}
| asks1_sum_1h | asks1_max_1h | asks5_sum_5h | asks5_max_5h | bids_min_1h | bids_max_1h | time | bid | ask | multi | extra | |
|---|---|---|---|---|---|---|---|---|---|---|---|
| ticker | |||||||||||
| GOOG | 720.93 | 720.93 | 720.93 | 720.93 | 720.50 | 720.50 | 2021-05-23 09:04:07.013574 | 720.50 | 720.93 | 2161.50 | 55478.50 |
| MSFT | 51.96 | 51.96 | 51.96 | 51.96 | 51.95 | 51.95 | 2021-05-23 09:04:07.013574 | 51.95 | 51.96 | 155.85 | 4000.15 |
| MSFT | 103.94 | 51.98 | 103.94 | 51.98 | 51.95 | 51.97 | 2021-05-23 09:04:07.020574 | 51.97 | 51.98 | 155.91 | 4001.69 |
| MSFT | 155.94 | 52.00 | 155.94 | 52.00 | 51.95 | 51.99 | 2021-05-23 09:04:07.031574 | 51.99 | 52.00 | 155.97 | 4003.23 |
| GOOG | 1441.86 | 720.93 | 1441.86 | 720.93 | 720.50 | 720.50 | 2021-05-23 09:04:07.038574 | 720.50 | 720.93 | 2161.50 | 55478.50 |
| AAPL | 98.01 | 98.01 | 98.01 | 98.01 | 97.99 | 97.99 | 2021-05-23 09:04:07.039574 | 97.99 | 98.01 | 293.97 | 7545.23 |
| GOOG | 2162.74 | 720.93 | 2162.74 | 720.93 | 720.50 | 720.50 | 2021-05-23 09:04:07.062574 | 720.50 | 720.88 | 2161.50 | 55478.50 |
| MSFT | 207.97 | 52.03 | 207.97 | 52.03 | 51.95 | 52.01 | 2021-05-23 09:04:07.065574 | 52.01 | 52.03 | 156.03 | 4004.77 |
# print the feature set object
print(quotes_set.to_yaml())
Ingest data into offline and online stores#
This writes to both targets (Parquet and NoSQL).
# save ingest data and print the FeatureSet spec
df = fstore.ingest(quotes_set, quotes)
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.013574 args={'min': 52, 'value': 51.95}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.020574 args={'min': 52, 'value': 51.97}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.031574 args={'min': 52, 'value': 51.99}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.013574 args={'min': 52, 'value': 51.95}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.020574 args={'min': 52, 'value': 51.97}
info! bid value is smaller than min, key=['MSFT'] time=2021-05-23 09:04:07.031574 args={'min': 52, 'value': 51.99}
Get an offline feature vector for training#
Example of combining features from 3 sources with time travel join of 3 tables with time travel.
Specify a set of features and request the feature vector offline result as a dataframe:
features = [
"stock-quotes.multi",
"stock-quotes.asks5_sum_5h as total_ask",
"stock-quotes.bids_min_1h",
"stock-quotes.bids_max_1h",
"stocks.*",
]
vector = fstore.FeatureVector(
"stocks-vec", features, description="stocks demo feature vector"
)
vector.save()
resp = fstore.get_offline_features(
vector, entity_rows=trades, entity_timestamp_column="time"
)
resp.to_dataframe()
| price | quantity | multi | total_ask | bids_min_1h | bids_max_1h | name | exchange | |
|---|---|---|---|---|---|---|---|---|
| 0 | 51.95 | 75 | 155.97 | 155.94 | 51.95 | 51.99 | Microsoft Corporation | NASDAQ |
| 1 | 51.95 | 155 | 155.97 | 155.94 | 51.95 | 51.99 | Microsoft Corporation | NASDAQ |
| 2 | 720.77 | 100 | 2161.50 | 2162.74 | 720.50 | 720.50 | Alphabet Inc | NASDAQ |
| 3 | 720.92 | 100 | 2161.50 | 2162.74 | 720.50 | 720.50 | Alphabet Inc | NASDAQ |
| 4 | 98.00 | 100 | 293.97 | 98.01 | 97.99 | 97.99 | Apple Inc | NASDAQ |
Initialize an online feature service and use it for real-time inference#
service = fstore.get_online_feature_service("stocks-vec")
Request feature vector statistics, can be used for imputing or validation
service.vector.get_stats_table()
| count | mean | min | max | std | hist | unique | top | freq | |
|---|---|---|---|---|---|---|---|---|---|
| multi | 8.0 | 925.27875 | 155.85 | 2161.50 | 1024.751408 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| total_ask | 8.0 | 617.91875 | 51.96 | 2162.74 | 784.877980 | [[4, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0,... | NaN | NaN | NaN |
| bids_min_1h | 8.0 | 308.41125 | 51.95 | 720.50 | 341.596673 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| bids_max_1h | 8.0 | 308.42625 | 51.95 | 720.50 | 341.583803 | [[4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,... | NaN | NaN | NaN |
| name | 3.0 | NaN | NaN | NaN | NaN | NaN | 3.0 | Alphabet Inc | 1.0 |
| exchange | 3.0 | NaN | NaN | NaN | NaN | NaN | 1.0 | NASDAQ | 3.0 |
Real-time feature vector request
service.get([{"ticker": "GOOG"}, {"ticker": "MSFT"}])
[{'asks5_sum_5h': 2162.74,
'bids_min_1h': 720.5,
'bids_max_1h': 720.5,
'multi': 2161.5,
'name': 'Alphabet Inc',
'exchange': 'NASDAQ',
'total_ask': None},
{'asks5_sum_5h': 207.97,
'bids_min_1h': 51.95,
'bids_max_1h': 52.01,
'multi': 156.03,
'name': 'Microsoft Corporation',
'exchange': 'NASDAQ',
'total_ask': None}]
service.get([{"ticker": "AAPL"}])
[{'asks5_sum_5h': 98.01,
'bids_min_1h': 97.99,
'bids_max_1h': 97.99,
'multi': 293.97,
'name': 'Apple Inc',
'exchange': 'NASDAQ',
'total_ask': None}]
service.close()