mlrun.datastore#

class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads Google BigQuery query results as input source for a flow.

For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.

example:

# set the credentials
project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json})

# use sql query
query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000"
source = BigQuerySource(
    "bq1",
    query=query_string,
    gcp_project="my_project",
    materialization_dataset="dataviews",
)

# read a table
source = BigQuerySource(
    "bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project"
)
Parameters:
  • name -- source name

  • table -- table name/path, cannot be used together with query

  • query -- sql query string

  • materialization_dataset -- for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission

  • chunksize -- number of rows per chunk (default large single chunk)

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- the column to be used for time filtering. Defaults to the feature set's timestamp_key.

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • gcp_project -- google cloud project name

  • spark_options -- additional spark read options

is_iterator()[source]#
kind = 'bigquery'#
support_spark = True#
support_storey = False#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: dict[str, str] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | list[int] | list[str] = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads CSV file as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to CSV file

  • key_field -- the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.

  • schedule -- string to configure scheduling of the ingestion job.

  • attributes -- additional parameters to pass to storey. For example: attributes={"timestamp_format": '%Y%m%d%H'}

  • parse_dates -- Optional. List of columns (names or integers) that will be attempted to parse as date column.

get_spark_options()[source]#
is_iterator()[source]#
kind = 'csv'#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
to_step(key_field=None, time_field=None, context=None)[source]#
class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'csv'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
suffix = '.csv'#
support_spark = True#
support_storey = True#
class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#

Bases: object

Data input/output class abstracting access to various local/remote data sources

DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).

Example:

# using data item inside a function
def my_func(context, data: DataItem):
    df = data.as_df()


# reading run results using DataItem (run.artifact())
train_run = train_iris_func.run(
    inputs={"dataset": dataset}, params={"label_column": "label"}
)

train_run.artifact("confusion-matrix").show()
test_set = train_run.artifact("test_set").as_df()

# create and use DataItem from uri
data = mlrun.get_dataitem("http://xyz/data.json").get()
property artifact_url#

DataItem artifact url (when its an artifact) or url for simple dataitems

as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, **kwargs)[source]#

return a dataframe object (generated from the dataitem).

Parameters:
  • columns -- optional, list of columns to select

  • df_module -- optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)

  • format -- file format, if not specified it will be deducted from the suffix

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • time_column -- Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.

delete()[source]#

delete the object from the datastore

download(target_path)[source]#

download to the target dir/path

Parameters:

target_path -- local target path for the downloaded item

get(size=None, offset=0, encoding=None)[source]#

read all or a byte range and return the content

Parameters:
  • size -- number of bytes to get

  • offset -- fetch from offset (in bytes)

  • encoding -- encoding (e.g. "utf-8") for converting bytes to str

get_artifact_type() str | None[source]#

Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.

Returns:

The store prefix of the artifact if it is an artifact data item and None if not.

property key#

DataItem key

property kind#

DataItem store kind (file, s3, v3io, ..)

listdir()[source]#

return a list of child file names

local()[source]#

get the local path of the file, download to tmp first if it's a remote object

ls()[source]#

return a list of child file names

property meta#

Artifact Metadata, when the DataItem is read from the artifacts store

open(mode)[source]#

return fsspec file handler, if supported

put(data, append=False)[source]#

write/upload the data, append is only supported by some datastores

Parameters:
  • data -- data (bytes/str) to write

  • append -- append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!

remove_local()[source]#

remove the local file if it exists and was downloaded from a remote object

show(format=None)[source]#

show the data object content in Jupyter

Parameters:

format -- format to use (when there is no/wrong suffix), e.g. 'png'

stat()[source]#

return FileStats class (size, modified, content_type)

property store#

DataItem store object

property suffix#

DataItem suffix (file extension) e.g. '.png'

upload(src_path)[source]#

upload the source file (src_path)

Parameters:

src_path -- source file path to read from and upload

property url#

//bucket/path

Type:

DataItem url e.g. /dir/path, s3

class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#

Bases: DatabricksFile

Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278

Create a new instance of the DatabricksFile.

The blocksize needs to be the default one.

class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#

Bases: DatabricksFileSystem

Create a new DatabricksFileSystem.

Parameters:
protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
root_marker = '/'#
class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#

Bases: OnlineSource

add_nuclio_trigger(function)[source]#
kind = 'http'#
class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#

Bases: OnlineSource

Sets kafka source for the flow

Parameters:
  • brokers -- list of broker IP addresses

  • topics -- list of topic names on which to listen.

  • group -- consumer group. Default "serving"

  • initial_offset -- from where to consume the stream. Default earliest

  • partitions -- Optional, A list of partitions numbers for which the function receives events.

  • sasl_user -- Optional, user name to use for sasl authentications

  • sasl_pass -- Optional, password to use for sasl authentications

  • attributes -- Optional, extra attributes to be passed to kafka trigger

add_nuclio_trigger(function)[source]#
kind = 'kafka'#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#

Bases: NoSqlBaseTarget

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
get_table_object()[source]#

get storey Table object

kind: str = 'nosql'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_spark = True#
writer_step_name = 'NoSqlTarget'#
class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: dict[str, str] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None)[source]#

Bases: BaseSourceDriver

Reads Parquet file/dir as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to Parquet file or directory

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- Optional. Feature set's timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.

  • start_filter -- datetime. If not None, the results will be filtered by partitions and 'filter_column' > start_filter. Default is None

  • end_filter -- datetime. If not None, the results will be filtered by partitions 'filter_column' <= end_filter. Default is None

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • attributes -- additional parameters to pass to storey.

property end_time#
get_spark_options()[source]#
kind = 'parquet'#
property start_time#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None)[source]#

return the source data as dataframe

to_step(key_field=None, time_field=None, start_time=None, end_time=None, context=None)[source]#
class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: dict[str, str] | None = None)[source]#

Bases: BaseStoreTarget

Parquet target storage driver, used to materialize feature set/vector data into parquet files.

Parameters:
  • name -- optional, target name. By default will be called ParquetTarget

  • path -- optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/

  • attributes -- optional, extra attributes for storey.ParquetTarget

  • after_step -- optional, after what step in the graph to add the target

  • columns -- optional, which columns from data to write

  • partitioned -- optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour

  • key_bucketing_number -- optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them

  • partition_cols -- optional, name of columns from the data to partition by

  • time_partitioning_granularity -- optional. the smallest time unit to partition the data by. For example "hour" will yield partitions of the format /year/month/day/hour

  • max_events -- optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events

  • flush_after_seconds -- optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#

return the target data as dataframe

get_dask_options()[source]#
get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'parquet'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_append = True#
support_dask = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#

Bases: OnlineSource

Sets the stream source for the flow. If the stream doesn't exist it will create it.

Parameters:
  • name -- stream name. Default "stream"

  • group -- consumer group. Default "serving"

  • seek_to -- from where to consume the stream. Default earliest

  • shards -- number of shards in the stream. Default 1

  • retention_in_hours -- if stream doesn't exist and it will be created set retention time. Default 24h

  • extra_attributes -- additional nuclio trigger attributes (key/value dict)

add_nuclio_trigger(function)[source]#
kind = 'v3ioStream'#
class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, **kwargs)[source]#

return the target data as dataframe

is_online = False#
is_table = False#
kind: str = 'stream'#
support_append = True#
support_spark = False#
support_storey = True#
mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#

get store resource object by uri

mlrun.datastore.get_stream_pusher(stream_path: str, **kwargs)[source]#

get a stream pusher object from URL.

common kwargs:

create:             create a new stream if doesnt exist
shards:             number of shards
retention_in_hours: stream retention in hours
Parameters:

stream_path -- path/url of stream

mlrun.datastore.datastore_profile.register_temporary_client_datastore_profile(profile: DatastoreProfile)[source]#

Register the datastore profile. This profile is temporary and remains valid only for the duration of the caller's session. It's beneficial for testing purposes.