mlrun.datastore#

class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads Google BigQuery query results as input source for a flow.

For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.

example:

# set the credentials
project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json})

# use sql query
query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000"
source = BigQuerySource(
    "bq1",
    query=query_string,
    gcp_project="my_project",
    materialization_dataset="dataviews",
)

# read a table
source = BigQuerySource(
    "bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project"
)
Parameters:
  • name -- source name

  • table -- table name/path, cannot be used together with query

  • query -- sql query string

  • materialization_dataset -- for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission

  • chunksize -- number of rows per chunk (default large single chunk)

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- the column to be used for time filtering. Defaults to the feature set's timestamp_key.

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • gcp_project -- google cloud project name

  • spark_options -- additional spark read options

is_iterator()[source]#
kind = 'bigquery'#
support_spark = True#
support_storey = False#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | list[int] | list[str] = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads CSV file as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to CSV file

  • key_field -- the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.

  • schedule -- string to configure scheduling of the ingestion job.

  • attributes -- additional parameters to pass to storey. For example: attributes={"timestamp_format": '%Y%m%d%H'}

  • parse_dates -- Optional. List of columns (names or integers) that will be attempted to parse as date column.

get_spark_options()[source]#
is_iterator()[source]#
kind = 'csv'#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
to_step(key_field=None, time_field=None, context=None)[source]#
class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#

return the target data as dataframe

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'csv'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
suffix = '.csv'#
support_pandas = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#

Bases: object

Data input/output class abstracting access to various local/remote data sources

DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).

Example:

# using data item inside a function
def my_func(context, data: DataItem):
    df = data.as_df()


# reading run results using DataItem (run.artifact())
train_run = train_iris_func.run(
    inputs={"dataset": dataset}, params={"label_column": "label"}
)

train_run.artifact("confusion-matrix").show()
test_set = train_run.artifact("test_set").as_df()

# create and use DataItem from uri
data = mlrun.get_dataitem("http://xyz/data.json").get()
property artifact_url#

DataItem artifact url (when its an artifact) or url for simple dataitems

as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, additional_filters=None, **kwargs)[source]#

return a dataframe object (generated from the dataitem).

Parameters:
  • columns -- optional, list of columns to select

  • df_module -- optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)

  • format -- file format, if not specified it will be deducted from the suffix

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • time_column -- Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.

  • additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

delete()[source]#

delete the object from the datastore

download(target_path)[source]#

download to the target dir/path

Parameters:

target_path -- local target path for the downloaded item

get(size=None, offset=0, encoding=None)[source]#

read all or a byte range and return the content

Parameters:
  • size -- number of bytes to get

  • offset -- fetch from offset (in bytes)

  • encoding -- encoding (e.g. "utf-8") for converting bytes to str

get_artifact_type() str | None[source]#

Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.

Returns:

The store prefix of the artifact if it is an artifact data item and None if not.

property key#

DataItem key

property kind#

DataItem store kind (file, s3, v3io, ..)

listdir()[source]#

return a list of child file names

local()[source]#

get the local path of the file, download to tmp first if it's a remote object

ls()[source]#

return a list of child file names

property meta#

Artifact Metadata, when the DataItem is read from the artifacts store

open(mode)[source]#

return fsspec file handler, if supported

put(data, append=False)[source]#

write/upload the data, append is only supported by some datastores

Parameters:
  • data -- data (bytes/str) to write

  • append -- append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!

remove_local()[source]#

remove the local file if it exists and was downloaded from a remote object

show(format: str | None = None) None[source]#

show the data object content in Jupyter

Parameters:

format -- format to use (when there is no/wrong suffix), e.g. 'png'

stat()[source]#

return FileStats class (size, modified, content_type)

property store#

DataItem store object

property suffix#

DataItem suffix (file extension) e.g. '.png'

upload(src_path)[source]#

upload the source file (src_path)

Parameters:

src_path -- source file path to read from and upload

property url#

//bucket/path

Type:

DataItem url e.g. /dir/path, s3

class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#

Bases: DatabricksFile

Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278

Create a new instance of the DatabricksFile.

The blocksize needs to be the default one.

class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#

Bases: DatabricksFileSystem

Create a new DatabricksFileSystem.

Parameters:
protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
root_marker = '/'#
storage_args: Tuple[Any, ...]#
storage_options: Dict[str, Any]#
class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#

Bases: OnlineSource

add_nuclio_trigger(function)[source]#
kind = 'http'#
class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#

Bases: OnlineSource

Sets kafka source for the flow

Parameters:
  • brokers -- list of broker IP addresses

  • topics -- list of topic names on which to listen.

  • group -- consumer group. Default "serving"

  • initial_offset -- from where to consume the stream. Default earliest

  • partitions -- Optional, A list of partitions numbers for which the function receives events.

  • sasl_user -- Optional, user name to use for sasl authentications

  • sasl_pass -- Optional, password to use for sasl authentications

  • attributes -- Optional, extra attributes to be passed to kafka trigger

add_nuclio_trigger(function)[source]#
create_topics(num_partitions: int = 4, replication_factor: int = 1, topics: list[str] | None = None)[source]#

Create Kafka topics with the specified number of partitions and replication factor.

Parameters:
  • num_partitions -- number of partitions for the topics

  • replication_factor -- replication factor for the topics

  • topics -- list of topic names to create, if None, the topics will be taken from the source attributes

kind = 'kafka'#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#

Bases: NoSqlBaseTarget

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
get_table_object()[source]#

get storey Table object

kind: str = 'nosql'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_spark = True#
writer_step_name = 'NoSqlTarget'#
class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None, additional_filters: list[Union[tuple, list]] | None = None)[source]#

Bases: BaseSourceDriver

Reads Parquet file/dir as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to Parquet file or directory

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- Optional. Feature set's timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.

  • start_filter -- datetime. If not None, the results will be filtered by partitions and 'filter_column' > start_filter. Default is None

  • end_filter -- datetime. If not None, the results will be filtered by partitions 'filter_column' <= end_filter. Default is None

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • attributes -- additional parameters to pass to storey.

  • additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

property additional_filters#
property end_time#
classmethod from_dict(struct=None, fields=None, deprecated_fields: dict | None = None)[source]#

create an object from a python dictionary

get_spark_options()[source]#
kind = 'parquet'#
property start_time#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_step(key_field=None, time_field=None, start_time=None, end_time=None, context=None, additional_filters=None)[source]#
class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: dict[str, str] | None = None)[source]#

Bases: BaseStoreTarget

Parquet target storage driver, used to materialize feature set/vector data into parquet files.

Parameters:
  • name -- optional, target name. By default will be called ParquetTarget

  • path -- optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/

  • attributes -- optional, extra attributes for storey.ParquetTarget

  • after_step -- optional, after what step in the graph to add the target

  • columns -- optional, which columns from data to write

  • partitioned -- optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour

  • key_bucketing_number -- optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them

  • partition_cols -- optional, name of columns from the data to partition by

  • time_partitioning_granularity -- optional. the smallest time unit to partition the data by. For example "hour" will yield partitions of the format /year/month/day/hour

  • max_events -- optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events

  • flush_after_seconds -- optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#

return the target data as dataframe

get_dask_options()[source]#
get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'parquet'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_append = True#
support_dask = True#
support_pandas = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#

Bases: OnlineSource

Sets the stream source for the flow. If the stream doesn't exist it will create it.

Parameters:
  • name -- stream name. Default "stream"

  • group -- consumer group. Default "serving"

  • seek_to -- from where to consume the stream. Default earliest

  • shards -- number of shards in the stream. Default 1

  • retention_in_hours -- if stream doesn't exist and it will be created set retention time. Default 24h

  • extra_attributes -- additional nuclio trigger attributes (key/value dict)

add_nuclio_trigger(function)[source]#
kind = 'v3ioStream'#
class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
is_online = False#
is_table = False#
kind: str = 'stream'#
support_append = True#
support_spark = False#
support_storey = True#
mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#

get store resource object by uri

mlrun.datastore.get_stream_pusher(stream_path: str, **kwargs)[source]#

get a stream pusher object from URL.

common kwargs:

create:             create a new stream if doesnt exist
shards:             number of shards
retention_in_hours: stream retention in hours
Parameters:

stream_path -- path/url of stream

class mlrun.datastore.datastore_profile.DatastoreProfile(*, type: str, name: str)[source]#

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

class Config[source]#

Bases: object

extra = 'forbid'#
static generate_secret_key(profile_name: str, project: str)[source]#
classmethod lower_case(v)[source]#
name: str#
secrets() dict[source]#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfile2Json[source]#

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

static create_from_json(public_json: str, private_json: str = '{}')[source]#
static get_json_private(profile: DatastoreProfile) str[source]#
static get_json_public(profile: DatastoreProfile) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileAzureBlob(*, type: str = 'az', name: str, connection_string: str | None = None, account_name: str | None = None, account_key: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, sas_token: str | None = None, credential: str | None = None, container: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

account_key: str | None#
account_name: str | None#
classmethod check_container(v)[source]#
client_id: str | None#
client_secret: str | None#
connection_string: str | None#
container: str | None#
credential: str | None#
sas_token: str | None#
secrets() dict[source]#
tenant_id: str | None#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileBasic(*, type: str = 'basic', name: str, public: str, private: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

private: str | None#
public: str#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileDBFS(*, type: str = 'dbfs', name: str, endpoint_url: str | None = None, token: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

endpoint_url: str | None#
secrets() dict[source]#
token: str | None#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileGCS(*, type: str = 'gcs', name: str, credentials_path: str | None = None, gcp_credentials: str | dict | None = None, bucket: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

bucket: str | None#
classmethod check_bucket(v)[source]#
classmethod convert_dict_to_json(v)[source]#
credentials_path: str | None#
gcp_credentials: str | dict | None#
secrets() dict[source]#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileHdfs(*, type: str = 'hdfs', name: str, host: str | None = None, port: int | None = None, http_port: int | None = None, user: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

host: str | None#
http_port: int | None#
port: int | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
user: str | None#
class mlrun.datastore.datastore_profile.DatastoreProfileKafkaSource(*, type: str = 'kafka_source', name: str, brokers: str | list[str], topics: str | list[str], group: str | None = 'serving', initial_offset: str | None = 'earliest', partitions: str | list[str] | None = None, sasl_user: str | None = None, sasl_pass: str | None = None, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

attributes()[source]#
brokers: str | list[str]#
group: str | None#
initial_offset: str | None#
kwargs_private: dict | None#
kwargs_public: dict | None#
partitions: str | list[str] | None#
sasl_pass: str | None#
sasl_user: str | None#
topics: str | list[str]#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileKafkaTarget(*, type: str = 'kafka_target', name: str, bootstrap_servers: str | None = None, brokers: str | None = None, topic: str, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

attributes()[source]#
bootstrap_servers: str | None#
brokers: str | None#
kwargs_private: dict | None#
kwargs_public: dict | None#
topic: str#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileRedis(*, type: str = 'redis', name: str, endpoint_url: str, username: str | None = None, password: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

endpoint_url: str#
password: str | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
url_with_credentials()[source]#
username: str | None#
class mlrun.datastore.datastore_profile.DatastoreProfileS3(*, type: str = 's3', name: str, endpoint_url: str | None = None, force_non_anonymous: str | None = None, profile_name: str | None = None, assume_role_arn: str | None = None, access_key_id: str | None = None, secret_key: str | None = None, bucket: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

access_key_id: str | None#
assume_role_arn: str | None#
bucket: str | None#
classmethod check_bucket(v)[source]#
endpoint_url: str | None#
force_non_anonymous: str | None#
profile_name: str | None#
secret_key: str | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileV3io(*, type: str = 'v3io', name: str, v3io_access_key: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

secrets() dict[source]#
type: str#
url(subpath)[source]#
v3io_access_key: str | None#
class mlrun.datastore.datastore_profile.TemporaryClientDatastoreProfiles(*args, **kwargs)[source]#

Bases: object

add(profile: DatastoreProfile)[source]#
get(key)[source]#
remove(key)[source]#
mlrun.datastore.datastore_profile.datastore_profile_read(url, project_name='', secrets: dict | None = None)[source]#
mlrun.datastore.datastore_profile.register_temporary_client_datastore_profile(profile: DatastoreProfile)[source]#

Register the datastore profile. This profile is temporary and remains valid only for the duration of the caller's session. It's beneficial for testing purposes.

mlrun.datastore.datastore_profile.remove_temporary_client_datastore_profile(profile_name: str)[source]#