mlrun.datastore#
- class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads Google BigQuery query results as input source for a flow.
For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.
example:
# set the credentials project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json}) # use sql query query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000" source = BigQuerySource( "bq1", query=query_string, gcp_project="my_project", materialization_dataset="dataviews", ) # read a table source = BigQuerySource( "bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project" )
- Parameters:
name -- source name
table -- table name/path, cannot be used together with query
query -- sql query string
materialization_dataset -- for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission
chunksize -- number of rows per chunk (default large single chunk)
key_field -- the column to be used as the key for events. Can be a list of keys.
time_field -- the column to be used for time filtering. Defaults to the feature set's timestamp_key.
schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes
start_time -- filters out data before this time
end_time -- filters out data after this time
gcp_project -- google cloud project name
spark_options -- additional spark read options
- kind = 'bigquery'#
- support_spark = True#
- support_storey = False#
- class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | list[int] | list[str] = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads CSV file as input source for a flow.
- Parameters:
name -- name of the source
path -- path to CSV file
key_field -- the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.
schedule -- string to configure scheduling of the ingestion job.
attributes -- additional parameters to pass to storey. For example: attributes={"timestamp_format": '%Y%m%d%H'}
parse_dates -- Optional. List of columns (names or integers) that will be attempted to parse as date column.
- kind = 'csv'#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'csv'#
- suffix = '.csv'#
- support_pandas = True#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#
Bases:
object
Data input/output class abstracting access to various local/remote data sources
DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).
Example:
# using data item inside a function def my_func(context, data: DataItem): df = data.as_df() # reading run results using DataItem (run.artifact()) train_run = train_iris_func.run( inputs={"dataset": dataset}, params={"label_column": "label"} ) train_run.artifact("confusion-matrix").show() test_set = train_run.artifact("test_set").as_df() # create and use DataItem from uri data = mlrun.get_dataitem("http://xyz/data.json").get()
- property artifact_url#
DataItem artifact url (when its an artifact) or url for simple dataitems
- as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, additional_filters=None, **kwargs)[source]#
return a dataframe object (generated from the dataitem).
- Parameters:
columns -- optional, list of columns to select
df_module -- optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)
format -- file format, if not specified it will be deducted from the suffix
start_time -- filters out data before this time
end_time -- filters out data after this time
time_column -- Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- download(target_path)[source]#
download to the target dir/path
- Parameters:
target_path -- local target path for the downloaded item
- get(size: int | None = None, offset: int = 0, encoding: str | None = None) bytes | str [source]#
read all or a byte range and return the content
- Parameters:
size -- number of bytes to get
offset -- fetch from offset (in bytes)
encoding -- encoding (e.g. "utf-8") for converting bytes to str
- Returns:
the bytes/str content
- get_artifact_type() str | None [source]#
Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.
- Returns:
The store prefix of the artifact if it is an artifact data item and None if not.
- property key#
DataItem key
- property kind#
DataItem store kind (file, s3, v3io, ..)
- property meta#
Artifact Metadata, when the DataItem is read from the artifacts store
- put(data: bytes | str, append: bool = False) None [source]#
write/upload the data, append is only supported by some datastores
- Parameters:
data -- data (bytes/str) to write
append -- append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!
- show(format: str | None = None) None [source]#
show the data object content in Jupyter
- Parameters:
format -- format to use (when there is no/wrong suffix), e.g. 'png'
- property store#
DataItem store object
- property suffix#
DataItem suffix (file extension) e.g. '.png'
- upload(src_path)[source]#
upload the source file (src_path)
- Parameters:
src_path -- source file path to read from and upload
- property url#
//bucket/path
- Type:
DataItem url e.g. /dir/path, s3
- class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#
Bases:
DatabricksFile
Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278
Create a new instance of the DatabricksFile.
The blocksize needs to be the default one.
- class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#
Bases:
DatabricksFileSystem
Create a new DatabricksFileSystem.
- Parameters:
instance (str) -- The instance URL of the databricks cluster. For example for an Azure databricks cluster, this has the form adb-<some-number>.<two digits>.azuredatabricks.net.
token (str) -- Your personal token. Find out more here: https://docs.databricks.com/dev-tools/api/latest/authentication.html
- protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
- root_marker = '/'#
- storage_args: Tuple[Any, ...]#
- storage_options: Dict[str, Any]#
- class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#
Bases:
OnlineSource
- kind = 'http'#
- class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#
Bases:
OnlineSource
Sets kafka source for the flow
- Parameters:
brokers -- list of broker IP addresses
topics -- list of topic names on which to listen.
group -- consumer group. Default "serving"
initial_offset -- from where to consume the stream. Default earliest
partitions -- Optional, A list of partitions numbers for which the function receives events.
sasl_user -- Optional, user name to use for sasl authentications
sasl_pass -- Optional, password to use for sasl authentications
attributes -- Optional, extra attributes to be passed to kafka trigger
- create_topics(num_partitions: int = 4, replication_factor: int = 1, topics: list[str] | None = None)[source]#
Create Kafka topics with the specified number of partitions and replication factor.
- Parameters:
num_partitions -- number of partitions for the topics
replication_factor -- replication factor for the topics
topics -- list of topic names to create, if None, the topics will be taken from the source attributes
- kind = 'kafka'#
- class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#
Bases:
NoSqlBaseTarget
- kind: str = 'nosql'#
- support_spark = True#
- writer_step_name = 'NoSqlTarget'#
- class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None, additional_filters: list[Union[tuple, list]] | None = None)[source]#
Bases:
BaseSourceDriver
Reads Parquet file/dir as input source for a flow.
- Parameters:
name -- name of the source
path -- path to Parquet file or directory
key_field -- the column to be used as the key for events. Can be a list of keys.
time_field -- Optional. Feature set's timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.
start_filter -- datetime. If not None, the results will be filtered by partitions and 'filter_column' > start_filter. Default is None
end_filter -- datetime. If not None, the results will be filtered by partitions 'filter_column' <= end_filter. Default is None
schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes
start_time -- filters out data before this time
end_time -- filters out data after this time
attributes -- additional parameters to pass to storey.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- property additional_filters#
- property end_time#
- classmethod from_dict(struct=None, fields=None, deprecated_fields: dict | None = None)[source]#
create an object from a python dictionary
- kind = 'parquet'#
- property start_time#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: dict[str, str] | None = None)[source]#
Bases:
BaseStoreTarget
Parquet target storage driver, used to materialize feature set/vector data into parquet files.
- Parameters:
name -- optional, target name. By default will be called ParquetTarget
path -- optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/
attributes -- optional, extra attributes for storey.ParquetTarget
after_step -- optional, after what step in the graph to add the target
columns -- optional, which columns from data to write
partitioned -- optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour
key_bucketing_number -- optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them
partition_cols -- optional, name of columns from the data to partition by
time_partitioning_granularity -- optional. the smallest time unit to partition the data by. For example "hour" will yield partitions of the format /year/month/day/hour
max_events -- optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events
flush_after_seconds -- optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'parquet'#
- support_append = True#
- support_dask = True#
- support_pandas = True#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#
Bases:
OnlineSource
Sets the stream source for the flow. If the stream doesn't exist it will create it.
- Parameters:
name -- stream name. Default "stream"
group -- consumer group. Default "serving"
seek_to -- from where to consume the stream. Default earliest
shards -- number of shards in the stream. Default 1
retention_in_hours -- if stream doesn't exist and it will be created set retention time. Default 24h
extra_attributes -- additional nuclio trigger attributes (key/value dict)
- kind = 'v3ioStream'#
- class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- is_online = False#
- is_table = False#
- kind: str = 'stream'#
- support_append = True#
- support_spark = False#
- support_storey = True#
- mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#
get store resource object by uri
- mlrun.datastore.get_stream_pusher(stream_path: str, **kwargs)[source]#
get a stream pusher object from URL.
common kwargs:
create: create a new stream if doesnt exist shards: number of shards retention_in_hours: stream retention in hours
- Parameters:
stream_path -- path/url of stream
- class mlrun.datastore.datastore_profile.ConfigProfile(*, type: str = 'config', name: str, public: dict | None = None, private: dict | None = None)[source]#
Bases:
DatastoreProfile
A profile class for managing configuration data with nested public and private attributes. This class extends DatastoreProfile to handle configuration settings, separating them into public and private dictionaries. Both dictionaries support nested structures, and the class provides functionality to merge these attributes when needed.
- Parameters:
public (Optional[dict]) -- Dictionary containing public configuration settings, supporting nested structures
private (Optional[dict]) -- Dictionary containing private/sensitive configuration settings, supporting nested structures
Example
>>> public = { "database": { "host": "localhost", "port": 5432 }, "api_version": "v1" } >>> private = { "database": { "password": "secret123", "username": "admin" }, "api_key": "xyz789" } >>> config = ConfigProfile("myconfig", public=public, private=private)
# When attributes() is called, it merges public and private: # { # "database": { # "host": "localhost", # "port": 5432, # "password": "secret123", # "username": "admin" # }, # "api_version": "v1", # "api_key": "xyz789" # }
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- private: dict | None#
- public: dict | None#
- class mlrun.datastore.datastore_profile.DatastoreProfile(*, type: str, name: str)[source]#
Bases:
BaseModel
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- name: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfile2Json[source]#
Bases:
BaseModel
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- static get_json_private(profile: DatastoreProfile) str [source]#
- static get_json_public(profile: DatastoreProfile) str [source]#
- class mlrun.datastore.datastore_profile.DatastoreProfileAzureBlob(*, type: str = 'az', name: str, connection_string: str | None = None, account_name: str | None = None, account_key: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, sas_token: str | None = None, credential: str | None = None, container: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- account_key: str | None#
- account_name: str | None#
- client_id: str | None#
- client_secret: str | None#
- connection_string: str | None#
- container: str | None#
- credential: str | None#
- sas_token: str | None#
- tenant_id: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileBasic(*, type: str = 'basic', name: str, public: str, private: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- private: str | None#
- public: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileDBFS(*, type: str = 'dbfs', name: str, endpoint_url: str | None = None, token: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- endpoint_url: str | None#
- token: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileGCS(*, type: str = 'gcs', name: str, credentials_path: str | None = None, gcp_credentials: str | dict | None = None, bucket: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- bucket: str | None#
- credentials_path: str | None#
- gcp_credentials: str | dict | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileHdfs(*, type: str = 'hdfs', name: str, host: str | None = None, port: int | None = None, http_port: int | None = None, user: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- host: str | None#
- http_port: int | None#
- port: int | None#
- type: str#
- user: str | None#
- class mlrun.datastore.datastore_profile.DatastoreProfileKafkaSource(*, type: str = 'kafka_source', name: str, brokers: str | list[str], topics: str | list[str], group: str | None = 'serving', initial_offset: str | None = 'earliest', partitions: str | list[str] | None = None, sasl_user: str | None = None, sasl_pass: str | None = None, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- brokers: str | list[str]#
- group: str | None#
- initial_offset: str | None#
- kwargs_private: dict | None#
- kwargs_public: dict | None#
- partitions: str | list[str] | None#
- sasl_pass: str | None#
- sasl_user: str | None#
- topics: str | list[str]#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileKafkaTarget(*, type: str = 'kafka_target', name: str, bootstrap_servers: str | None = None, brokers: str | None = None, topic: str, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- bootstrap_servers: str | None#
- brokers: str | None#
- kwargs_private: dict | None#
- kwargs_public: dict | None#
- topic: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileRedis(*, type: str = 'redis', name: str, endpoint_url: str, username: str | None = None, password: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- endpoint_url: str#
- password: str | None#
- type: str#
- username: str | None#
- class mlrun.datastore.datastore_profile.DatastoreProfileS3(*, type: str = 's3', name: str, endpoint_url: str | None = None, force_non_anonymous: str | None = None, profile_name: str | None = None, assume_role_arn: str | None = None, access_key_id: str | None = None, secret_key: str | None = None, bucket: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- access_key_id: str | None#
- assume_role_arn: str | None#
- bucket: str | None#
- endpoint_url: str | None#
- force_non_anonymous: str | None#
- profile_name: str | None#
- secret_key: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileV3io(*, type: str = 'v3io', name: str, v3io_access_key: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- type: str#
- v3io_access_key: str | None#
- class mlrun.datastore.datastore_profile.TDEngineDatastoreProfile(*, type: str = 'taosws', name: str, user: str, password: str | None = None, host: str, port: int)[source]#
Bases:
DatastoreProfile
A profile that holds the required parameters for a TDEngine database, with the websocket scheme. https://docs.tdengine.com/developer-guide/connecting-to-tdengine/#websocket-connection
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- classmethod from_dsn(dsn: str, profile_name: str) TDEngineDatastoreProfile [source]#
Construct a TDEngine profile from DSN (connection string) and a name for the profile.
- Parameters:
dsn -- The DSN (Data Source Name) of the TDEngine database, e.g.:
"taosws://root:taosdata@localhost:6041"
.profile_name -- The new profile's name.
- Returns:
The TDEngine profile.
- host: str#
- password: str | None#
- port: int#
- type: str#
- user: str#
- class mlrun.datastore.datastore_profile.TemporaryClientDatastoreProfiles(*args, **kwargs)[source]#
Bases:
object
- add(profile: DatastoreProfile)[source]#
- mlrun.datastore.datastore_profile.datastore_profile_read(url, project_name='', secrets: dict | None = None)[source]#
Read and retrieve a datastore profile from a given URL.
This function retrieves a datastore profile either from temporary client storage, or from the MLRun database. It handles both client-side and server-side profile formats and performs necessary conversions.
- Parameters:
url (str) -- A URL with 'ds' scheme pointing to the datastore profile (e.g., 'ds://profile-name').
project_name (str, optional) -- The project name where the profile is stored. Defaults to MLRun's default project.
secrets (dict, optional) -- Dictionary containing secrets needed for profile retrieval.
- Returns:
The retrieved datastore profile object.
- Return type:
- Raises:
MLRunInvalidArgumentError -- In the following cases: - If the URL scheme is not 'ds' - If the profile cannot be retrieved from either server or local environment
Note
When running from a client environment (outside MLRun pods), private profile information is not accessible. In this case, use register_temporary_client_datastore_profile() to register the profile with credentials for your local session. When running inside MLRun pods, the private information is automatically available and no temporary registration is needed.
- mlrun.datastore.datastore_profile.register_temporary_client_datastore_profile(profile: DatastoreProfile)[source]#
Register the datastore profile. This profile is temporary and remains valid only for the duration of the caller's session. It's beneficial for testing purposes.
- mlrun.datastore.datastore_profile.remove_temporary_client_datastore_profile(profile_name: str)[source]#
- class mlrun.datastore.vectorstore.VectorStoreCollection(mlrun_context: MlrunProject | MLClientCtx, vector_store: VectorStore, collection_name: str | None = None)[source]#
Bases:
object
A wrapper class for vector store collections with MLRun integration.
This class wraps a vector store implementation (like Milvus, Chroma) and provides integration with MLRun context for document and artifact management. It delegates most operations to the underlying vector store while handling MLRun-specific functionality.
The class implements attribute delegation through __getattr__ and __setattr__, allowing direct access to the underlying vector store's methods and attributes while maintaining MLRun integration.
- add_artifacts(artifacts: list[mlrun.artifacts.document.DocumentArtifact], splitter=None, **kwargs)[source]#
Add a list of DocumentArtifact objects to the vector store collection.
Converts artifacts to LangChain documents, adds them to the vector store, and updates the MLRun context. If documents are split, the IDs are handled appropriately.
- Parameters:
artifacts (list[DocumentArtifact]) -- List of DocumentArtifact objects to add
splitter (TextSplitter, optional) -- Document splitter to break artifacts into smaller chunks. If None, each artifact becomes a single document.
kwargs --
Additional arguments passed to the underlying add_documents method. Special handling for 'ids' kwarg:
- If provided and document is split, IDs are generated as "{original_id}_{i}"
where i starts from 1 (e.g., "doc1_1", "doc1_2", etc.)
If provided and document isn't split, original IDs are used as-is
- Returns:
List of IDs for all added documents. When no custom IDs are provided:
Without splitting: Vector store generates IDs automatically
With splitting: Vector store generates separate IDs for each chunk
When custom IDs are provided:
Without splitting: Uses provided IDs directly
With splitting: Generates sequential IDs as "{original_id}_{i}" for each chunk
- Return type:
list
- add_documents(documents: list['Document'], **kwargs)[source]#
Add a list of documents to the collection.
If the instance has an MLRun context, it will update the MLRun artifacts associated with the documents.
- Parameters:
documents (list[Document]) -- A list of Document objects to be added.
**kwargs -- Additional keyword arguments to be passed to the underlying collection implementation.
- Returns:
The result of the underlying collection implementation's add_documents method.
- delete_artifacts(artifacts: list[mlrun.artifacts.document.DocumentArtifact])[source]#
Delete a list of DocumentArtifact objects from the collection.
This method removes the specified artifacts from the collection and updates the MLRun context. The deletion process varies depending on the type of the underlying collection implementation.
- Parameters:
artifacts (list[DocumentArtifact]) -- A list of DocumentArtifact objects to be deleted.
- Raises:
NotImplementedError -- If the delete operation is not supported for the collection implementation.
- remove_from_artifact(artifact: DocumentArtifact)[source]#
Remove the current object from the given artifact's collection and update the artifact.
- Parameters:
artifact (DocumentArtifact) -- The artifact from which the current object should be removed.