mlrun.datastore#

class mlrun.datastore.BigQuerySource(name: str = '', table: Optional[str] = None, max_results_for_table: Optional[int] = None, query: Optional[str] = None, materialization_dataset: Optional[str] = None, chunksize: Optional[int] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, start_time=None, end_time=None, gcp_project: Optional[str] = None, spark_options: Optional[dict] = None, **kwargs)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads Google BigQuery query results as input source for a flow.

For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.

example:

# set the credentials
project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json})

# use sql query
query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000"
source = BigQuerySource("bq1", query=query_string,
                        gcp_project="my_project",
                        materialization_dataset="dataviews")

# read a table
source = BigQuerySource("bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project")
Parameters
  • name – source name

  • table – table name/path, cannot be used together with query

  • query – sql query string

  • materialization_dataset – for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission

  • chunksize – number of rows per chunk (default large single chunk)

  • key_field – the column to be used as the key for events. Can be a list of keys.

  • time_field – the column to be used for time filtering. Defaults to the feature set’s timestamp_key.

  • schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes

  • start_time – filters out data before this time

  • end_time – filters out data after this time

  • gcp_project – google cloud project name

  • spark_options – additional spark read options

is_iterator()[source]#
kind = 'bigquery'#
support_spark = True#
support_storey = False#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.CSVSource(name: str = '', path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, parse_dates: Union[None, int, str, List[int], List[str]] = None, **kwargs)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads CSV file as input source for a flow.

Parameters
  • name – name of the source

  • path – path to CSV file

  • key_field – the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.

  • time_field – DEPRECATED. Use parse_dates to parse timestamps.

  • schedule – string to configure scheduling of the ingestion job.

  • attributes – additional parameters to pass to storey. For example: attributes={“timestamp_format”: ‘%Y%m%d%H’}

  • parse_dates – Optional. List of columns (names or integers) that will be attempted to parse as date column.

get_spark_options()[source]#
is_iterator()[source]#
kind = 'csv'#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
to_step(key_field=None, time_field=None, context=None)[source]#
class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, max_events: Optional[int] = None, flush_after_seconds: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, schema: Optional[Dict[str, Any]] = None, credentials_prefix=None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'csv'#
prepare_spark_df(df, key_columns)[source]#
suffix = '.csv'#
support_spark = True#
support_storey = True#
class mlrun.datastore.DataItem(key: str, store: mlrun.datastore.base.DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#

Bases: object

Data input/output class abstracting access to various local/remote data sources

DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).

Example:

# using data item inside a function
def my_func(context, data: DataItem):
    df = data.as_df()


# reading run results using DataItem (run.artifact())
train_run = train_iris_func.run(inputs={'dataset': dataset},
                                params={'label_column': 'label'})

train_run.artifact('confusion-matrix').show()
test_set = train_run.artifact('test_set').as_df()

# create and use DataItem from uri
data = mlrun.get_dataitem('http://xyz/data.json').get()
property artifact_url#

DataItem artifact url (when its an artifact) or url for simple dataitems

as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, **kwargs)[source]#

return a dataframe object (generated from the dataitem).

Parameters
  • columns – optional, list of columns to select

  • df_module – optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)

  • format – file format, if not specified it will be deducted from the suffix

  • start_time – filters out data before this time

  • end_time – filters out data after this time

  • time_column – Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.

delete()[source]#

delete the object from the datastore

download(target_path)[source]#

download to the target dir/path

Parameters

target_path – local target path for the downloaded item

get(size=None, offset=0, encoding=None)[source]#

read all or a byte range and return the content

Parameters
  • size – number of bytes to get

  • offset – fetch from offset (in bytes)

  • encoding – encoding (e.g. “utf-8”) for converting bytes to str

get_artifact_type() Optional[str][source]#

Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.

Returns

The store prefix of the artifact if it is an artifact data item and None if not.

property key#

DataItem key

property kind#

DataItem store kind (file, s3, v3io, ..)

listdir()[source]#

return a list of child file names

local()[source]#

get the local path of the file, download to tmp first if it’s a remote object

ls()[source]#

return a list of child file names

property meta#

Artifact Metadata, when the DataItem is read from the artifacts store

open(mode)[source]#

return fsspec file handler, if supported

put(data, append=False)[source]#

write/upload the data, append is only supported by some datastores

Parameters
  • data – data (bytes/str) to write

  • append – append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!

remove_local()[source]#

remove the local file if it exists and was downloaded from a remote object

show(format=None)[source]#

show the data object content in Jupyter

Parameters

format – format to use (when there is no/wrong suffix), e.g. ‘png’

stat()[source]#

return FileStats class (size, modified, content_type)

property store#

DataItem store object

property suffix#

DataItem suffix (file extension) e.g. ‘.png’

upload(src_path)[source]#

upload the source file (src_path)

Parameters

src_path – source file path to read from and upload

property url#

//bucket/path

Type

DataItem url e.g. /dir/path, s3

class mlrun.datastore.HttpSource(name: Optional[str] = None, path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, workers: Optional[int] = None)[source]#

Bases: mlrun.datastore.sources.OnlineSource

add_nuclio_trigger(function)[source]#
kind = 'http'#
class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#

Bases: mlrun.datastore.sources.OnlineSource

Sets kafka source for the flow

Sets kafka source for the flow

Parameters
  • brokers – list of broker IP addresses

  • topics – list of topic names on which to listen.

  • group – consumer group. Default “serving”

  • initial_offset – from where to consume the stream. Default earliest

  • partitions – Optional, A list of partitions numbers for which the function receives events.

  • sasl_user – Optional, user name to use for sasl authentications

  • sasl_pass – Optional, password to use for sasl authentications

  • attributes – Optional, extra attributes to be passed to kafka trigger

add_nuclio_trigger(function)[source]#
kind = 'kafka'#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#

Bases: mlrun.datastore.targets.NoSqlBaseTarget

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
get_table_object()[source]#

get storey Table object

kind: str = 'nosql'#
prepare_spark_df(df, key_columns)[source]#
support_spark = True#
writer_step_name = 'NoSqlTarget'#
class mlrun.datastore.ParquetSource(name: str = '', path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, start_time: Optional[Union[datetime.datetime, str]] = None, end_time: Optional[Union[datetime.datetime, str]] = None)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads Parquet file/dir as input source for a flow.

Parameters
  • name – name of the source

  • path – path to Parquet file or directory

  • key_field – the column to be used as the key for events. Can be a list of keys.

  • time_field – Optional. Feature set’s timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.

  • start_filter – datetime. If not None, the results will be filtered by partitions and ‘filter_column’ > start_filter. Default is None

  • end_filter – datetime. If not None, the results will be filtered by partitions ‘filter_column’ <= end_filter. Default is None

  • schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes

  • start_time – filters out data before this time

  • end_time – filters out data after this time

  • attributes – additional parameters to pass to storey.

property end_time#
get_spark_options()[source]#
kind = 'parquet'#
property start_time#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_step(key_field=None, time_field=None, start_time=None, end_time=None, context=None)[source]#
class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: Optional[bool] = None, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, max_events: Optional[int] = 10000, flush_after_seconds: Optional[int] = 900, storage_options: Optional[Dict[str, str]] = None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

parquet target storage driver, used to materialize feature set/vector data into parquet files

Parameters
  • name – optional, target name. By default will be called ParquetTarget

  • path – optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/

  • attributes – optional, extra attributes for storey.ParquetTarget

  • after_step – optional, after what step in the graph to add the target

  • columns – optional, which columns from data to write

  • partitioned – optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour

  • key_bucketing_number – optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them

  • partition_cols – optional, name of columns from the data to partition by

  • time_partitioning_granularity – optional. the smallest time unit to partition the data by. For example “hour” will yield partitions of the format /year/month/day/hour

  • max_events – optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events

  • flush_after_seconds – optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_dask_options()[source]#
get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'parquet'#
support_append = True#
support_dask = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: Optional[dict] = None, **kwargs)[source]#

Bases: mlrun.datastore.sources.OnlineSource

Sets stream source for the flow. If stream doesn’t exist it will create it

Sets stream source for the flow. If stream doesn’t exist it will create it

Parameters
  • name – stream name. Default “stream”

  • group – consumer group. Default “serving”

  • seek_to – from where to consume the stream. Default earliest

  • shards – number of shards in the stream. Default 1

  • retention_in_hours – if stream doesn’t exist and it will be created set retention time. Default 24h

  • extra_attributes – additional nuclio trigger attributes (key/value dict)

add_nuclio_trigger(function)[source]#
kind = 'v3ioStream'#
class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, max_events: Optional[int] = None, flush_after_seconds: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None, schema: Optional[Dict[str, Any]] = None, credentials_prefix=None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, **kwargs)[source]#

return the target data as dataframe

is_online = False#
is_table = False#
kind: str = 'stream'#
support_append = True#
support_spark = False#
support_storey = True#
mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#

get store resource object by uri