{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Part 1: Data ingestion\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```{admonition} Note\n",
"This demo works with the online feature store, which is currently not part of the Open Source default deployment.\n",
"```\n",
"This demo showcases financial fraud prevention using the MLRun feature store to define complex features that help identify \n",
"fraud. Fraud prevention specifically is a challenge because it requires processing raw transaction and events in real-time, and \n",
"being able to quickly respond and block transactions before they occur.\n",
"\n",
"To address this, you create a development pipeline and a production pipeline. Both pipelines share the same feature \n",
"engineering and model code, but serve data very differently. Furthermore, you automate the data and model monitoring \n",
"process, identify drift and trigger retraining in a CI/CD pipeline. This process is described in the diagram below:\n",
"\n",
"![Feature store demo diagram - fraud prevention](../../_static/images/feature_store_demo_diagram.png)\n",
"\n",
"By the end of this tutorial you’ll learn how to:\n",
"\n",
"- Create an ingestion pipeline for each data source.\n",
"- Define preprocessing, aggregation and validation of the pipeline.\n",
"- Run the pipeline locally within the notebook.\n",
"- Launch a real-time function to ingest live data.\n",
"- Schedule a cron to run the task when needed."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The raw data is described as follows:\n",
"\n",
"| TRANSACTIONS || ║ |USER EVENTS || \n",
"|-----------------|----------------------------------------------------------------|----------|-----------------|----------------------------------------------------------------|\n",
"| **age** | age group value 0-6. Some values are marked as U for unknown | ║ | **source** | The party/entity related to the event |\n",
"| **gender** | A character to define the gender | ║ | **event** | event, such as login or password change |\n",
"| **zipcodeOri** | ZIP code of the person originating the transaction | ║ | **timestamp** | The date and time of the event |\n",
"| **zipMerchant** | ZIP code of the merchant receiving the transaction | ║ | | |\n",
"| **category** | category of the transaction (e.g., transportation, food, etc.) | ║ | | |\n",
"| **amount** | the total amount of the transaction | ║ | | |\n",
"| **fraud** | whether the transaction is fraudulent | ║ | | |\n",
"| **timestamp** | the date and time in which the transaction took place | ║ | | |\n",
"| **source** | the ID of the party/entity performing the transaction | ║ | | |\n",
"| **target** | the ID of the party/entity receiving the transaction | ║ | | |\n",
"| **device** | the device ID used to perform the transaction | ║ | | |"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook introduces how to **Ingest** different data sources to the **Feature Store**.\n",
"\n",
"The following FeatureSets are created:\n",
"- **Transactions**: Monetary transactions between a source and a target.\n",
"- **Events**: Account events such as account login or a password change.\n",
"- **Label**: Fraud label for the data."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Both server & client are aligned (1.3.0rc23).\n"
]
}
],
"source": [
"!/User/align_mlrun.sh"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"project_name = \"fraud-demo\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> 2023-02-15 14:40:30,932 [info] loaded project fraud-demo from ./ and saved in MLRun DB\n"
]
}
],
"source": [
"import mlrun\n",
"\n",
"# Initialize the MLRun project object\n",
"project = mlrun.get_or_create_project(project_name, context=\"./\", user_project=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 1 - Fetch, process and ingest the datasets"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1.1 - Transactions"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transactions"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"tags": [
"hide-cell"
]
},
"outputs": [],
"source": [
"# Helper functions to adjust the timestamps of our data\n",
"# while keeping the order of the selected events and\n",
"# the relative distance from one event to the other\n",
"\n",
"\n",
"def date_adjustment(sample, data_max, new_max, old_data_period, new_data_period):\n",
" \"\"\"\n",
" Adjust a specific sample's date according to the original and new time periods\n",
" \"\"\"\n",
" sample_dates_scale = (data_max - sample) / old_data_period\n",
" sample_delta = new_data_period * sample_dates_scale\n",
" new_sample_ts = new_max - sample_delta\n",
" return new_sample_ts\n",
"\n",
"\n",
"def adjust_data_timespan(\n",
" dataframe, timestamp_col=\"timestamp\", new_period=\"2d\", new_max_date_str=\"now\"\n",
"):\n",
" \"\"\"\n",
" Adjust the dataframe timestamps to the new time period\n",
" \"\"\"\n",
" # Calculate old time period\n",
" data_min = dataframe.timestamp.min()\n",
" data_max = dataframe.timestamp.max()\n",
" old_data_period = data_max - data_min\n",
"\n",
" # Set new time period\n",
" new_time_period = pd.Timedelta(new_period)\n",
" new_max = pd.Timestamp(new_max_date_str)\n",
" new_min = new_max - new_time_period\n",
" new_data_period = new_max - new_min\n",
"\n",
" # Apply the timestamp change\n",
" df = dataframe.copy()\n",
" df[timestamp_col] = df[timestamp_col].apply(\n",
" lambda x: date_adjustment(\n",
" x, data_max, new_max, old_data_period, new_data_period\n",
" )\n",
" )\n",
" return df"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"\n",
"
\n",
" \n",
"
\n",
"
\n",
"
step
\n",
"
age
\n",
"
gender
\n",
"
zipcodeOri
\n",
"
zipMerchant
\n",
"
category
\n",
"
amount
\n",
"
fraud
\n",
"
timestamp
\n",
"
source
\n",
"
target
\n",
"
device
\n",
"
\n",
" \n",
" \n",
"
\n",
"
274633
\n",
"
91
\n",
"
5
\n",
"
F
\n",
"
28007
\n",
"
28007
\n",
"
es_transportation
\n",
"
26.92
\n",
"
0
\n",
"
2023-02-13 14:41:37.388791000
\n",
"
C1022153336
\n",
"
M1823072687
\n",
"
33832bb8607545df97632a7ab02d69c4
\n",
"
\n",
"
\n",
"
286902
\n",
"
94
\n",
"
2
\n",
"
M
\n",
"
28007
\n",
"
28007
\n",
"
es_transportation
\n",
"
48.22
\n",
"
0
\n",
"
2023-02-13 14:41:55.682416913
\n",
"
C1006176917
\n",
"
M348934600
\n",
"
fadd829c49e74ffa86c8da3be75ada53
\n",
"
\n",
"
\n",
"
416998
\n",
"
131
\n",
"
3
\n",
"
M
\n",
"
28007
\n",
"
28007
\n",
"
es_transportation
\n",
"
17.56
\n",
"
0
\n",
"
2023-02-13 14:42:00.789586939
\n",
"
C1010936270
\n",
"
M348934600
\n",
"
58d0422a50bc40c89d2b4977b2f1beea
\n",
"
\n",
" \n",
"
\n",
"
"
],
"text/plain": [
" step age gender zipcodeOri zipMerchant category amount \\\n",
"274633 91 5 F 28007 28007 es_transportation 26.92 \n",
"286902 94 2 M 28007 28007 es_transportation 48.22 \n",
"416998 131 3 M 28007 28007 es_transportation 17.56 \n",
"\n",
" fraud timestamp source target \\\n",
"274633 0 2023-02-13 14:41:37.388791000 C1022153336 M1823072687 \n",
"286902 0 2023-02-13 14:41:55.682416913 C1006176917 M348934600 \n",
"416998 0 2023-02-13 14:42:00.789586939 C1010936270 M348934600 \n",
"\n",
" device \n",
"274633 33832bb8607545df97632a7ab02d69c4 \n",
"286902 fadd829c49e74ffa86c8da3be75ada53 \n",
"416998 58d0422a50bc40c89d2b4977b2f1beea "
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"\n",
"# Fetch the transactions dataset from the server\n",
"transactions_data = pd.read_csv(\n",
" \"https://s3.wasabisys.com/iguazio/data/fraud-demo-mlrun-fs-docs/data.csv\",\n",
" parse_dates=[\"timestamp\"],\n",
")\n",
"\n",
"# use only first 50k\n",
"transactions_data = transactions_data.sort_values(by=\"source\", axis=0)[:10000]\n",
"\n",
"# Adjust the samples timestamp for the past 2 days\n",
"transactions_data = adjust_data_timespan(transactions_data, new_period=\"2d\")\n",
"\n",
"# Sorting after adjusting timestamps\n",
"transactions_data = transactions_data.sort_values(by=\"timestamp\", axis=0)\n",
"\n",
"# Preview\n",
"transactions_data.head(3)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transactions - create a feature set and preprocessing pipeline\n",
"Create the feature set (data pipeline) definition for the **credit transaction processing** that describes the \n",
"offline/online data transformations and aggregations. \n",
"The feature store automatically adds an offline `parquet` target and an online `NoSQL` target by using `set_targets()`.\n",
"\n",
"The data pipeline consists of:\n",
"\n",
"* **Extracting** the data components (hour, day of week)\n",
"* **Mapping** the age values\n",
"* **One hot encoding** for the transaction category and the gender\n",
"* **Aggregating** the amount (avg, sum, count, max over 2/12/24 hour time windows)\n",
"* **Aggregating** the transactions per category (over 14 days time windows)\n",
"* **Writing** the results to **offline** (Parquet) and **online** (NoSQL) targets "
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# Import MLRun's Feature Store\n",
"import mlrun.feature_store as fstore\n",
"from mlrun.feature_store.steps import OneHotEncoder, MapValues, DateExtractor"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Define the transactions FeatureSet\n",
"transaction_set = fstore.FeatureSet(\n",
" \"transactions\",\n",
" entities=[fstore.Entity(\"source\")],\n",
" timestamp_key=\"timestamp\",\n",
" description=\"transactions feature set\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"image/svg+xml": [
"\n",
"\n",
"\n",
"\n",
"\n"
],
"text/plain": [
""
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Define and add value mapping\n",
"main_categories = [\n",
" \"es_transportation\",\n",
" \"es_health\",\n",
" \"es_otherservices\",\n",
" \"es_food\",\n",
" \"es_hotelservices\",\n",
" \"es_barsandrestaurants\",\n",
" \"es_tech\",\n",
" \"es_sportsandtoys\",\n",
" \"es_wellnessandbeauty\",\n",
" \"es_hyper\",\n",
" \"es_fashion\",\n",
" \"es_home\",\n",
" \"es_contents\",\n",
" \"es_travel\",\n",
" \"es_leisure\",\n",
"]\n",
"\n",
"# One Hot Encode the newly defined mappings\n",
"one_hot_encoder_mapping = {\n",
" \"category\": main_categories,\n",
" \"gender\": list(transactions_data.gender.unique()),\n",
"}\n",
"\n",
"# Define the graph steps\n",
"transaction_set.graph.to(\n",
" DateExtractor(parts=[\"hour\", \"day_of_week\"], timestamp_col=\"timestamp\")\n",
").to(MapValues(mapping={\"age\": {\"U\": \"0\"}}, with_original_features=True)).to(\n",
" OneHotEncoder(mapping=one_hot_encoder_mapping)\n",
")\n",
"\n",
"\n",
"# Add aggregations for 2, 12, and 24 hour time windows\n",
"transaction_set.add_aggregation(\n",
" name=\"amount\",\n",
" column=\"amount\",\n",
" operations=[\"avg\", \"sum\", \"count\", \"max\"],\n",
" windows=[\"2h\", \"12h\", \"24h\"],\n",
" period=\"1h\",\n",
")\n",
"\n",
"\n",
"# Add the category aggregations over a 14 day window\n",
"for category in main_categories:\n",
" transaction_set.add_aggregation(\n",
" name=category,\n",
" column=f\"category_{category}\",\n",
" operations=[\"sum\"],\n",
" windows=[\"14d\"],\n",
" period=\"1d\",\n",
" )\n",
"\n",
"# Add default (offline-parquet & online-nosql) targets\n",
"transaction_set.set_targets()\n",
"\n",
"# Plot the pipeline so you can see the different steps\n",
"transaction_set.plot(rankdir=\"LR\", with_targets=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Transactions - ingestion"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"