mlrun.datastore#

class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads Google BigQuery query results as input source for a flow.

For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.

example:

# set the credentials
project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json})

# use sql query
query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000"
source = BigQuerySource(
    "bq1",
    query=query_string,
    gcp_project="my_project",
    materialization_dataset="dataviews",
)

# read a table
source = BigQuerySource(
    "bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project"
)
Parameters:
  • name -- source name

  • table -- table name/path, cannot be used together with query

  • query -- sql query string

  • materialization_dataset -- for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission

  • chunksize -- number of rows per chunk (default large single chunk)

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- the column to be used for time filtering. Defaults to the feature set's timestamp_key.

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • gcp_project -- google cloud project name

  • spark_options -- additional spark read options

is_iterator()[source]#
kind = 'bigquery'#
support_spark = True#
support_storey = False#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | list[int] | list[str] = None, **kwargs)[source]#

Bases: BaseSourceDriver

Reads CSV file as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to CSV file

  • key_field -- the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.

  • schedule -- string to configure scheduling of the ingestion job.

  • attributes -- additional parameters to pass to storey. For example: attributes={"timestamp_format": '%Y%m%d%H'}

  • parse_dates -- Optional. List of columns (names or integers) that will be attempted to parse as date column.

get_spark_options()[source]#
is_iterator()[source]#
kind = 'csv'#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
to_step(key_field=None, time_field=None, context=None)[source]#
class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#

return the target data as dataframe

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'csv'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
suffix = '.csv'#
support_pandas = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#

Bases: object

Data input/output class abstracting access to various local/remote data sources

DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).

Example:

# using data item inside a function
def my_func(context, data: DataItem):
    df = data.as_df()


# reading run results using DataItem (run.artifact())
train_run = train_iris_func.run(
    inputs={"dataset": dataset}, params={"label_column": "label"}
)

train_run.artifact("confusion-matrix").show()
test_set = train_run.artifact("test_set").as_df()

# create and use DataItem from uri
data = mlrun.get_dataitem("http://xyz/data.json").get()
property artifact_url#

DataItem artifact url (when its an artifact) or url for simple dataitems

as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, additional_filters=None, **kwargs)[source]#

return a dataframe object (generated from the dataitem).

Parameters:
  • columns -- optional, list of columns to select

  • df_module -- optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)

  • format -- file format, if not specified it will be deducted from the suffix

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • time_column -- Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.

  • additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

delete()[source]#

delete the object from the datastore

download(target_path)[source]#

download to the target dir/path

Parameters:

target_path -- local target path for the downloaded item

get(size: int | None = None, offset: int = 0, encoding: str | None = None) bytes | str[source]#

read all or a byte range and return the content

Parameters:
  • size -- number of bytes to get

  • offset -- fetch from offset (in bytes)

  • encoding -- encoding (e.g. "utf-8") for converting bytes to str

Returns:

the bytes/str content

get_artifact_type() str | None[source]#

Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.

Returns:

The store prefix of the artifact if it is an artifact data item and None if not.

property key#

DataItem key

property kind#

DataItem store kind (file, s3, v3io, ..)

listdir()[source]#

return a list of child file names

local()[source]#

get the local path of the file, download to tmp first if it's a remote object

ls()[source]#

return a list of child file names

property meta#

Artifact Metadata, when the DataItem is read from the artifacts store

open(mode)[source]#

return fsspec file handler, if supported

put(data: bytes | str, append: bool = False) None[source]#

write/upload the data, append is only supported by some datastores

Parameters:
  • data -- data (bytes/str) to write

  • append -- append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!

remove_local()[source]#

remove the local file if it exists and was downloaded from a remote object

show(format: str | None = None) None[source]#

show the data object content in Jupyter

Parameters:

format -- format to use (when there is no/wrong suffix), e.g. 'png'

stat()[source]#

return FileStats class (size, modified, content_type)

property store#

DataItem store object

property suffix#

DataItem suffix (file extension) e.g. '.png'

upload(src_path)[source]#

upload the source file (src_path)

Parameters:

src_path -- source file path to read from and upload

property url#

//bucket/path

Type:

DataItem url e.g. /dir/path, s3

class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#

Bases: DatabricksFile

Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278

Create a new instance of the DatabricksFile.

The blocksize needs to be the default one.

class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#

Bases: DatabricksFileSystem

Create a new DatabricksFileSystem.

Parameters:
protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
root_marker = '/'#
storage_args: Tuple[Any, ...]#
storage_options: Dict[str, Any]#
class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#

Bases: OnlineSource

add_nuclio_trigger(function)[source]#
kind = 'http'#
class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#

Bases: OnlineSource

Sets kafka source for the flow

Parameters:
  • brokers -- list of broker IP addresses

  • topics -- list of topic names on which to listen.

  • group -- consumer group. Default "serving"

  • initial_offset -- from where to consume the stream. Default earliest

  • partitions -- Optional, A list of partitions numbers for which the function receives events.

  • sasl_user -- Optional, user name to use for sasl authentications

  • sasl_pass -- Optional, password to use for sasl authentications

  • attributes -- Optional, extra attributes to be passed to kafka trigger

add_nuclio_trigger(function)[source]#
create_topics(num_partitions: int = 4, replication_factor: int = 1, topics: list[str] | None = None)[source]#

Create Kafka topics with the specified number of partitions and replication factor.

Parameters:
  • num_partitions -- number of partitions for the topics

  • replication_factor -- replication factor for the topics

  • topics -- list of topic names to create, if None, the topics will be taken from the source attributes

kind = 'kafka'#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_spark_df(session, named_view=False, time_field=None, columns=None)[source]#
class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#

Bases: NoSqlBaseTarget

get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
get_table_object()[source]#

get storey Table object

kind: str = 'nosql'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_spark = True#
writer_step_name = 'NoSqlTarget'#
class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None, additional_filters: list[Union[tuple, list]] | None = None)[source]#

Bases: BaseSourceDriver

Reads Parquet file/dir as input source for a flow.

Parameters:
  • name -- name of the source

  • path -- path to Parquet file or directory

  • key_field -- the column to be used as the key for events. Can be a list of keys.

  • time_field -- Optional. Feature set's timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.

  • start_filter -- datetime. If not None, the results will be filtered by partitions and 'filter_column' > start_filter. Default is None

  • end_filter -- datetime. If not None, the results will be filtered by partitions 'filter_column' <= end_filter. Default is None

  • schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes

  • start_time -- filters out data before this time

  • end_time -- filters out data after this time

  • attributes -- additional parameters to pass to storey.

  • additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

property additional_filters#
property end_time#
classmethod from_dict(struct=None, fields=None, deprecated_fields: dict | None = None)[source]#

create an object from a python dictionary

get_spark_options()[source]#
kind = 'parquet'#
property start_time#
support_spark = True#
support_storey = True#
to_dataframe(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_field=None, additional_filters=None)[source]#

return the source data as dataframe

to_step(key_field=None, time_field=None, start_time=None, end_time=None, context=None, additional_filters=None)[source]#
class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: dict[str, str] | None = None)[source]#

Bases: BaseStoreTarget

Parquet target storage driver, used to materialize feature set/vector data into parquet files.

Parameters:
  • name -- optional, target name. By default will be called ParquetTarget

  • path -- optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/

  • attributes -- optional, extra attributes for storey.ParquetTarget

  • after_step -- optional, after what step in the graph to add the target

  • columns -- optional, which columns from data to write

  • partitioned -- optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour

  • key_bucketing_number -- optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them

  • partition_cols -- optional, name of columns from the data to partition by

  • time_partitioning_granularity -- optional. the smallest time unit to partition the data by. For example "hour" will yield partitions of the format /year/month/day/hour

  • max_events -- optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events

  • flush_after_seconds -- optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#

return the target data as dataframe

get_dask_options()[source]#
get_spark_options(key_column=None, timestamp_key=None, overwrite=True)[source]#
is_offline = True#
is_single_file()[source]#
kind: str = 'parquet'#
prepare_spark_df(df, key_columns, timestamp_key=None, spark_options=None)[source]#
support_append = True#
support_dask = True#
support_pandas = True#
support_spark = True#
support_storey = True#
class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#

Bases: OnlineSource

Sets the stream source for the flow. If the stream doesn't exist it will create it.

Parameters:
  • name -- stream name. Default "stream"

  • group -- consumer group. Default "serving"

  • seek_to -- from where to consume the stream. Default earliest

  • shards -- number of shards in the stream. Default 1

  • retention_in_hours -- if stream doesn't exist and it will be created set retention time. Default 24h

  • extra_attributes -- additional nuclio trigger attributes (key/value dict)

add_nuclio_trigger(function)[source]#
kind = 'v3ioStream'#
class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#

Bases: BaseStoreTarget

add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
is_online = False#
is_table = False#
kind: str = 'stream'#
support_append = True#
support_spark = False#
support_storey = True#
mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#

get store resource object by uri

mlrun.datastore.get_stream_pusher(stream_path: str, **kwargs)[source]#

get a stream pusher object from URL.

common kwargs:

create:             create a new stream if doesnt exist
shards:             number of shards
retention_in_hours: stream retention in hours
Parameters:

stream_path -- path/url of stream

class mlrun.datastore.datastore_profile.ConfigProfile(*, type: str = 'config', name: str, public: dict | None = None, private: dict | None = None)[source]#

Bases: DatastoreProfile

A profile class for managing configuration data with nested public and private attributes. This class extends DatastoreProfile to handle configuration settings, separating them into public and private dictionaries. Both dictionaries support nested structures, and the class provides functionality to merge these attributes when needed.

Parameters:
  • public (Optional[dict]) -- Dictionary containing public configuration settings, supporting nested structures

  • private (Optional[dict]) -- Dictionary containing private/sensitive configuration settings, supporting nested structures

Example

>>> public = {
    "database": {
        "host": "localhost",
        "port": 5432
    },
    "api_version": "v1"
}
>>> private = {
    "database": {
        "password": "secret123",
        "username": "admin"
    },
    "api_key": "xyz789"
}
>>> config = ConfigProfile("myconfig", public=public, private=private)

# When attributes() is called, it merges public and private: # { # "database": { # "host": "localhost", # "port": 5432, # "password": "secret123", # "username": "admin" # }, # "api_version": "v1", # "api_key": "xyz789" # }

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

attributes()[source]#
private: dict | None#
public: dict | None#
class mlrun.datastore.datastore_profile.DatastoreProfile(*, type: str, name: str)[source]#

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

class Config[source]#

Bases: object

extra = 'forbid'#
static generate_secret_key(profile_name: str, project: str)[source]#
classmethod lower_case(v)[source]#
name: str#
secrets() dict[source]#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfile2Json[source]#

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

static create_from_json(public_json: str, private_json: str = '{}')[source]#
static get_json_private(profile: DatastoreProfile) str[source]#
static get_json_public(profile: DatastoreProfile) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileAzureBlob(*, type: str = 'az', name: str, connection_string: str | None = None, account_name: str | None = None, account_key: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, sas_token: str | None = None, credential: str | None = None, container: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

account_key: str | None#
account_name: str | None#
classmethod check_container(v)[source]#
client_id: str | None#
client_secret: str | None#
connection_string: str | None#
container: str | None#
credential: str | None#
sas_token: str | None#
secrets() dict[source]#
tenant_id: str | None#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileBasic(*, type: str = 'basic', name: str, public: str, private: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

private: str | None#
public: str#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileDBFS(*, type: str = 'dbfs', name: str, endpoint_url: str | None = None, token: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

endpoint_url: str | None#
secrets() dict[source]#
token: str | None#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileGCS(*, type: str = 'gcs', name: str, credentials_path: str | None = None, gcp_credentials: str | dict | None = None, bucket: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

bucket: str | None#
classmethod check_bucket(v)[source]#
classmethod convert_dict_to_json(v)[source]#
credentials_path: str | None#
gcp_credentials: str | dict | None#
secrets() dict[source]#
type: str#
url(subpath) str[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileHdfs(*, type: str = 'hdfs', name: str, host: str | None = None, port: int | None = None, http_port: int | None = None, user: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

host: str | None#
http_port: int | None#
port: int | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
user: str | None#
class mlrun.datastore.datastore_profile.DatastoreProfileKafkaSource(*, type: str = 'kafka_source', name: str, brokers: str | list[str], topics: str | list[str], group: str | None = 'serving', initial_offset: str | None = 'earliest', partitions: str | list[str] | None = None, sasl_user: str | None = None, sasl_pass: str | None = None, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

attributes()[source]#
brokers: str | list[str]#
group: str | None#
initial_offset: str | None#
kwargs_private: dict | None#
kwargs_public: dict | None#
partitions: str | list[str] | None#
sasl_pass: str | None#
sasl_user: str | None#
topics: str | list[str]#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileKafkaTarget(*, type: str = 'kafka_target', name: str, bootstrap_servers: str | None = None, brokers: str | None = None, topic: str, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

attributes()[source]#
bootstrap_servers: str | None#
brokers: str | None#
kwargs_private: dict | None#
kwargs_public: dict | None#
topic: str#
type: str#
class mlrun.datastore.datastore_profile.DatastoreProfileRedis(*, type: str = 'redis', name: str, endpoint_url: str, username: str | None = None, password: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

endpoint_url: str#
password: str | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
url_with_credentials()[source]#
username: str | None#
class mlrun.datastore.datastore_profile.DatastoreProfileS3(*, type: str = 's3', name: str, endpoint_url: str | None = None, force_non_anonymous: str | None = None, profile_name: str | None = None, assume_role_arn: str | None = None, access_key_id: str | None = None, secret_key: str | None = None, bucket: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

access_key_id: str | None#
assume_role_arn: str | None#
bucket: str | None#
classmethod check_bucket(v)[source]#
endpoint_url: str | None#
force_non_anonymous: str | None#
profile_name: str | None#
secret_key: str | None#
secrets() dict[source]#
type: str#
url(subpath)[source]#
class mlrun.datastore.datastore_profile.DatastoreProfileV3io(*, type: str = 'v3io', name: str, v3io_access_key: str | None = None)[source]#

Bases: DatastoreProfile

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

secrets() dict[source]#
type: str#
url(subpath)[source]#
v3io_access_key: str | None#
class mlrun.datastore.datastore_profile.TDEngineDatastoreProfile(*, type: str = 'taosws', name: str, user: str, password: str | None = None, host: str, port: int)[source]#

Bases: DatastoreProfile

A profile that holds the required parameters for a TDEngine database, with the websocket scheme. https://docs.tdengine.com/developer-guide/connecting-to-tdengine/#websocket-connection

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

dsn() str[source]#

Get the Data Source Name of the configured TDEngine profile.

classmethod from_dsn(dsn: str, profile_name: str) TDEngineDatastoreProfile[source]#

Construct a TDEngine profile from DSN (connection string) and a name for the profile.

Parameters:
  • dsn -- The DSN (Data Source Name) of the TDEngine database, e.g.: "taosws://root:taosdata@localhost:6041".

  • profile_name -- The new profile's name.

Returns:

The TDEngine profile.

host: str#
password: str | None#
port: int#
type: str#
user: str#
class mlrun.datastore.datastore_profile.TemporaryClientDatastoreProfiles(*args, **kwargs)[source]#

Bases: object

add(profile: DatastoreProfile)[source]#
get(key)[source]#
remove(key)[source]#
mlrun.datastore.datastore_profile.datastore_profile_read(url, project_name='', secrets: dict | None = None)[source]#

Read and retrieve a datastore profile from a given URL.

This function retrieves a datastore profile either from temporary client storage, or from the MLRun database. It handles both client-side and server-side profile formats and performs necessary conversions.

Parameters:
  • url (str) -- A URL with 'ds' scheme pointing to the datastore profile (e.g., 'ds://profile-name').

  • project_name (str, optional) -- The project name where the profile is stored. Defaults to MLRun's default project.

  • secrets (dict, optional) -- Dictionary containing secrets needed for profile retrieval.

Returns:

The retrieved datastore profile object.

Return type:

DatastoreProfile

Raises:

MLRunInvalidArgumentError -- In the following cases: - If the URL scheme is not 'ds' - If the profile cannot be retrieved from either server or local environment

Note

When running from a client environment (outside MLRun pods), private profile information is not accessible. In this case, use register_temporary_client_datastore_profile() to register the profile with credentials for your local session. When running inside MLRun pods, the private information is automatically available and no temporary registration is needed.

mlrun.datastore.datastore_profile.register_temporary_client_datastore_profile(profile: DatastoreProfile)[source]#

Register the datastore profile. This profile is temporary and remains valid only for the duration of the caller's session. It's beneficial for testing purposes.

mlrun.datastore.datastore_profile.remove_temporary_client_datastore_profile(profile_name: str)[source]#
class mlrun.datastore.vectorstore.VectorStoreCollection(mlrun_context: MlrunProject | MLClientCtx, vector_store: VectorStore, collection_name: str | None = None)[source]#

Bases: object

A wrapper class for vector store collections with MLRun integration.

This class wraps a vector store implementation (like Milvus, Chroma) and provides integration with MLRun context for document and artifact management. It delegates most operations to the underlying vector store while handling MLRun-specific functionality.

The class implements attribute delegation through __getattr__ and __setattr__, allowing direct access to the underlying vector store's methods and attributes while maintaining MLRun integration.

add_artifacts(artifacts: list[mlrun.artifacts.document.DocumentArtifact], splitter=None, **kwargs)[source]#

Add a list of DocumentArtifact objects to the vector store collection.

Converts artifacts to LangChain documents, adds them to the vector store, and updates the MLRun context. If documents are split, the IDs are handled appropriately.

Parameters:
  • artifacts (list[DocumentArtifact]) -- List of DocumentArtifact objects to add

  • splitter (TextSplitter, optional) -- Document splitter to break artifacts into smaller chunks. If None, each artifact becomes a single document.

  • kwargs --

    Additional arguments passed to the underlying add_documents method. Special handling for 'ids' kwarg:

    • If provided and document is split, IDs are generated as "{original_id}_{i}"

      where i starts from 1 (e.g., "doc1_1", "doc1_2", etc.)

    • If provided and document isn't split, original IDs are used as-is

Returns:

List of IDs for all added documents. When no custom IDs are provided:

  • Without splitting: Vector store generates IDs automatically

  • With splitting: Vector store generates separate IDs for each chunk

When custom IDs are provided:

  • Without splitting: Uses provided IDs directly

  • With splitting: Generates sequential IDs as "{original_id}_{i}" for each chunk

Return type:

list

add_documents(documents: list['Document'], **kwargs)[source]#

Add a list of documents to the collection.

If the instance has an MLRun context, it will update the MLRun artifacts associated with the documents.

Parameters:
  • documents (list[Document]) -- A list of Document objects to be added.

  • **kwargs -- Additional keyword arguments to be passed to the underlying collection implementation.

Returns:

The result of the underlying collection implementation's add_documents method.

delete(*args, **kwargs)[source]#
delete_artifacts(artifacts: list[mlrun.artifacts.document.DocumentArtifact])[source]#

Delete a list of DocumentArtifact objects from the collection.

This method removes the specified artifacts from the collection and updates the MLRun context. The deletion process varies depending on the type of the underlying collection implementation.

Parameters:

artifacts (list[DocumentArtifact]) -- A list of DocumentArtifact objects to be deleted.

Raises:

NotImplementedError -- If the delete operation is not supported for the collection implementation.

remove_from_artifact(artifact: DocumentArtifact)[source]#

Remove the current object from the given artifact's collection and update the artifact.

Parameters:

artifact (DocumentArtifact) -- The artifact from which the current object should be removed.

mlrun.datastore.vectorstore.find_existing_attribute(obj, base_name='name', parent_name='collection')[source]#