Using data sources and items#

In this section

Connecting to data sources#

Accessing data from multiple source types is possible through MLRun’s DataItem object. This object plugs into the data-stores framework to connect to various types of data sources and download content. For example, to download data which is stored on S3 and load it into a DataFrame, use the following code:

# Access object in AWS S3, in the "input-data" bucket 
import mlrun

# Access credentials
os.environ["AWS_ACCESS_KEY_ID"] = "<access key ID>"
os.environ["AWS_SECRET_ACCESS_KEY"] = "<access key>"

source_url = "s3://input-data/input_data.csv"

input_data = mlrun.get_dataitem(source_url).as_df()

This code runs locally (for example, in Jupyter) and relies on environment variables to supply credentials for data access. See Data stores for more info on the available data-stores, accessing them locally and remotely, and how to provide credentials for connecting.

Running the code locally is very useful for easy debugging and development of the code. When the code moves to a stable status, it is usually recommended to run it “remotely” on a pod running in the Kubernetes cluster. This allows setting up specific resources to the processing pod (such as memory, CPU and execution priority).

MLRun provides facilities to create DataItem objects as inputs to running code. For example, this is a basic data ingestion function:

def ingest_data(context, source_url: mlrun.DataItem):
    # Load the data from its source, and convert to a DataFrame
    df = source_url.as_df()
    # Perform data cleaning and processing
    # ...
    # Save the processed data to the artifact store
    context.log_dataset('cleaned_data', df=df, format='csv')

This code can be placed in a python file, or as a cell in the Python notebook. For example, if the code above was saved to a file, the following code creates an MLRun function from it and executes it remotely in a pod:

# create a function from py or notebook (ipynb) file, specify the default function handler
ingest_func = mlrun.code_to_function(name='ingest_data', filename='./', 
                                     kind='job', image='mlrun/mlrun')

source_url = "s3://input-data/input_data.csv"

ingest_data_run ='ingest_data',
                                  inputs={'source_url': source_url},

As the source_url is part of the function’s inputs, MLRun automatically wraps it up with a DataItem. The output is logged to the function’s artifact_path, and can be obtained from the run result:

cleaned_data_frame = ingest_data_run.artifact('cleaned_data').as_df()

Note that running the function remotely may require attaching storage to the function, as well as passing storage credentials through project secrets. See the following pages for more details:

  1. Attach storage to functions

  2. Working with secrets

Data processing#

Once the data is imported from its source, it can be processed using any framework. MLRun natively supports working with Pandas DataFrames and converting from and to its DataItem object.

For distributed processing of very large datasets, MLRun integrates with the Spark processing engine, and provides facilities for executing pySpark code using a Spark service (which can be deployed by the platform when running MLRun as part of an Iguazio system) or through submitting the processing task to Spark-operator. The following page provides additional details and code-samples:

  1. Spark operator

In a similar manner, Dask can be used for parallel processing of the data. To read data as a Dask DataFrame, use the following code:

import dask.dataframe as dd

data_item = mlrun.get_dataitem(source_url)
dask_df: dd.DataFrame = data_item.as_df(df_module=dd)