mlrun.datastore#

class mlrun.datastore.BigQuerySource(name: str = '', table: Optional[str] = None, max_results_for_table: Optional[int] = None, query: Optional[str] = None, materialization_dataset: Optional[str] = None, chunksize: Optional[int] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, start_time=None, end_time=None, gcp_project: Optional[str] = None, spark_options: Optional[dict] = None)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads Google BigQuery query results as input source for a flow.

example:

# use sql query
query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000"
source = BigQuerySource("bq1", query=query_string,
                        gcp_project="my_project",
                        materialization_dataset="dataviews")

# read a table
source = BigQuerySource("bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project")
Parameters
  • name – source name

  • table – table name/path, cannot be used together with query

  • query – sql query string

  • materialization_dataset – for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission

  • chunksize – number of rows per chunk (default large single chunk)

  • key_field – the column to be used as the key for events. Can be a list of keys.

  • time_field – the column to be parsed as the timestamp for events. Defaults to None

  • schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes

  • start_time – filters out data before this time

  • end_time – filters out data after this time

  • gcp_project – google cloud project name

  • spark_options – additional spart read options

is_iterator()[source]#
kind = 'bigquery'#
support_spark = True#
support_storey = False#
to_dataframe()[source]#
to_spark_df(session, named_view=False)[source]#
class mlrun.datastore.CSVSource(name: str = '', path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, parse_dates: Optional[Union[List[int], List[str]]] = None)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads CSV file as input source for a flow.

Parameters
  • name – name of the source

  • path – path to CSV file

  • key_field – the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.

  • time_field – the CSV field to be parsed as the timestamp for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no timestamp field). The field will be parsed from isoformat (ISO-8601 as defined in datetime.fromisoformat()). In case the format is not isoformat, timestamp_format (as defined in datetime.strptime()) should be passed in attributes.

  • schedule – string to configure scheduling of the ingestion job.

  • attributes – additional parameters to pass to storey. For example: attributes={“timestamp_format”: ‘%Y%m%d%H’}

  • parse_dates – Optional. List of columns (names or integers, other than time_field) that will be attempted to parse as date column.

get_spark_options()[source]#
is_iterator()[source]#
kind = 'csv'#
support_spark = True#
support_storey = True#
to_dataframe()[source]#
to_spark_df(session, named_view=False)[source]#
to_step(key_field=None, time_field=None, context=None)[source]#
class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, after_state=None, max_events: Optional[int] = None, flush_after_seconds: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

add_writer_state(graph, after, features, key_columns=None, timestamp_key=None)[source]#
add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'csv'#
prepare_spark_df(df)[source]#
suffix = '.csv'#
support_spark = True#
support_storey = True#
class mlrun.datastore.DataItem(key: str, store: mlrun.datastore.base.DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#

Bases: object

Data input/output class abstracting access to various local/remote data sources

DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).

Example:

# using data item inside a function
def my_func(context, data: DataItem):
    df = data.as_df()


# reading run results using DataItem (run.artifact())
train_run = train_iris_func.run(inputs={'dataset': dataset},
                                params={'label_column': 'label'})

train_run.artifact('confusion-matrix').show()
test_set = train_run.artifact('test_set').as_df()

# create and use DataItem from uri
data = mlrun.get_dataitem('http://xyz/data.json').get()
property artifact_url#

DataItem artifact url (when its an artifact) or url for simple dataitems

as_df(columns=None, df_module=None, format='', **kwargs)[source]#

return a dataframe object (generated from the dataitem).

Parameters
  • columns – optional, list of columns to select

  • df_module – optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)

  • format – file format, if not specified it will be deducted from the suffix

delete()[source]#

delete the object from the datastore

download(target_path)[source]#

download to the target dir/path

Parameters

target_path – local target path for the downloaded item

get(size=None, offset=0, encoding=None)[source]#

read all or a byte range and return the content

Parameters
  • size – number of bytes to get

  • offset – fetch from offset (in bytes)

  • encoding – encoding (e.g. “utf-8”) for converting bytes to str

property key#

DataItem key

property kind#

DataItem store kind (file, s3, v3io, ..)

listdir()[source]#

return a list of child file names

local()[source]#

get the local path of the file, download to tmp first if its a remote object

ls()[source]#

return a list of child file names

property meta#

Artifact Metadata, when the DataItem is read from the artifacts store

open(mode)[source]#

return fsspec file handler, if supported

put(data, append=False)[source]#

write/upload the data, append is only supported by some datastores

Parameters
  • data – data (bytes/str) to write

  • append – append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!

show(format=None)[source]#

show the data object content in Jupyter

Parameters

format – format to use (when there is no/wrong suffix), e.g. ‘png’

stat()[source]#

return FileStats class (size, modified, content_type)

property store#

DataItem store object

property suffix#

DataItem suffix (file extension) e.g. ‘.png’

upload(src_path)[source]#

upload the source file (src_path)

Parameters

src_path – source file path to read from and upload

property url#

//bucket/path

Type

DataItem url e.g. /dir/path, s3

class mlrun.datastore.HttpSource(name: Optional[str] = None, path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, workers: Optional[int] = None)[source]#

Bases: mlrun.datastore.sources.OnlineSource

add_nuclio_trigger(function)[source]#
kind = 'http'#
class mlrun.datastore.KafkaSource(brokers='localhost:9092', topics='topic', group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, **kwargs)[source]#

Bases: mlrun.datastore.sources.OnlineSource

Sets kafka source for the flow

Sets kafka source for the flow

Parameters
  • brokers – list of broker IP addresses

  • topics – list of topic names on which to listen.

  • group – consumer group. Default “serving”

  • initial_offset – from where to consume the stream. Default earliest

  • partitions – Optional, A list of partitions numbers for which the function receives events.

  • sasl_user – Optional, user name to use for sasl authentications

  • sasl_pass – Optional, password to use for sasl authentications

add_nuclio_trigger(function)[source]#
kind = 'kafka'#
class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#

Bases: mlrun.datastore.targets.NoSqlBaseTarget

get_table_object()[source]#

get storey Table object

kind: str = 'nosql'#
support_spark = True#
writer_step_name = 'NoSqlTarget'#
class mlrun.datastore.ParquetSource(name: str = '', path: Optional[str] = None, attributes: Optional[Dict[str, str]] = None, key_field: Optional[str] = None, time_field: Optional[str] = None, schedule: Optional[str] = None, start_time: Optional[Union[datetime.datetime, str]] = None, end_time: Optional[Union[datetime.datetime, str]] = None)[source]#

Bases: mlrun.datastore.sources.BaseSourceDriver

Reads Parquet file/dir as input source for a flow.

Parameters
  • name – name of the source

  • path – path to Parquet file or directory

  • key_field – the column to be used as the key for events. Can be a list of keys.

  • time_field – the column to be parsed as the timestamp for events. Defaults to None

  • start_filter – datetime. If not None, the results will be filtered by partitions and ‘filter_column’ > start_filter. Default is None

  • end_filter – datetime. If not None, the results will be filtered by partitions ‘filter_column’ <= end_filter. Default is None

  • filter_column – Optional. if not None, the results will be filtered by this column and start_filter & end_filter

  • schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes

  • start_time – filters out data before this time

  • end_time – filters out data after this time

  • attributes – additional parameters to pass to storey.

property end_time#
get_spark_options()[source]#
kind = 'parquet'#
property start_time#
support_spark = True#
support_storey = True#
to_dataframe()[source]#
to_step(key_field=None, time_field=None, start_time=None, end_time=None, context=None)[source]#
class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: Optional[bool] = None, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, after_state=None, max_events: Optional[int] = 10000, flush_after_seconds: Optional[int] = 900, storage_options: Optional[Dict[str, str]] = None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

parquet target storage driver, used to materialize feature set/vector data into parquet files

Parameters
  • name – optional, target name. By default will be called ParquetTarget

  • path – optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/

  • attributes – optional, extra attributes for storey.ParquetTarget

  • after_step – optional, after what step in the graph to add the target

  • columns – optional, which columns from data to write

  • partitioned – optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour

  • key_bucketing_number – optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them

  • partition_cols – optional, name of columns from the data to partition by

  • time_partitioning_granularity – optional. the smallest time unit to partition the data by. For example “hour” will yield partitions of the format /year/month/day/hour

  • max_events – optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events

  • flush_after_seconds

    optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set).

    Default 15 minutes

add_writer_state(graph, after, features, key_columns=None, timestamp_key=None)[source]#
add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_dask_options()[source]#
get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'parquet'#
support_append = True#
support_dask = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: Optional[dict] = None, **kwargs)[source]#

Bases: mlrun.datastore.sources.OnlineSource

Sets stream source for the flow. If stream doesn’t exist it will create it

Sets stream source for the flow. If stream doesn’t exist it will create it

Parameters
  • name – stream name. Default “stream”

  • group – consumer group. Default “serving”

  • seek_to – from where to consume the stream. Default earliest

  • shards – number of shards in the stream. Default 1

  • retention_in_hours – if stream doesn’t exist and it will be created set retention time. Default 24h

  • extra_attributes – additional nuclio trigger attributes (key/value dict)

add_nuclio_trigger(function)[source]#
kind = 'v3ioStream'#
class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: Optional[Dict[str, str]] = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: Optional[int] = None, partition_cols: Optional[List[str]] = None, time_partitioning_granularity: Optional[str] = None, after_state=None, max_events: Optional[int] = None, flush_after_seconds: Optional[int] = None, storage_options: Optional[Dict[str, str]] = None)[source]#

Bases: mlrun.datastore.targets.BaseStoreTarget

add_writer_state(graph, after, features, key_columns=None, timestamp_key=None)[source]#
add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, **kwargs)[source]#

return the target data as dataframe

is_online = False#
is_table = False#
kind: str = 'stream'#
support_append = True#
support_spark = False#
support_storey = True#
mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None)[source]#

get store resource object by uri