mlrun.datastore#
- class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads Google BigQuery query results as input source for a flow.
For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.
example:
# set the credentials project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json}) # use sql query query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000" source = BigQuerySource( "bq1", query=query_string, gcp_project="my_project", materialization_dataset="dataviews", ) # read a table source = BigQuerySource( "bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project" )
- Parameters:
name -- source name
table -- table name/path, cannot be used together with query
query -- sql query string
materialization_dataset -- for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission
chunksize -- number of rows per chunk (default large single chunk)
key_field -- the column to be used as the key for events. Can be a list of keys.
time_field -- the column to be used for time filtering. Defaults to the feature set's timestamp_key.
schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes
start_time -- filters out data before this time
end_time -- filters out data after this time
gcp_project -- google cloud project name
spark_options -- additional spark read options
- kind = 'bigquery'#
- support_spark = True#
- support_storey = False#
- class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | list[int] | list[str] = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads CSV file as input source for a flow.
- Parameters:
name -- name of the source
path -- path to CSV file
key_field -- the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.
schedule -- string to configure scheduling of the ingestion job.
attributes -- additional parameters to pass to storey. For example: attributes={"timestamp_format": '%Y%m%d%H'}
parse_dates -- Optional. List of columns (names or integers) that will be attempted to parse as date column.
- kind = 'csv'#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'csv'#
- suffix = '.csv'#
- support_pandas = True#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#
Bases:
object
Data input/output class abstracting access to various local/remote data sources
DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).
Example:
# using data item inside a function def my_func(context, data: DataItem): df = data.as_df() # reading run results using DataItem (run.artifact()) train_run = train_iris_func.run( inputs={"dataset": dataset}, params={"label_column": "label"} ) train_run.artifact("confusion-matrix").show() test_set = train_run.artifact("test_set").as_df() # create and use DataItem from uri data = mlrun.get_dataitem("http://xyz/data.json").get()
- property artifact_url#
DataItem artifact url (when its an artifact) or url for simple dataitems
- as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, additional_filters=None, **kwargs)[source]#
return a dataframe object (generated from the dataitem).
- Parameters:
columns -- optional, list of columns to select
df_module -- optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)
format -- file format, if not specified it will be deducted from the suffix
start_time -- filters out data before this time
end_time -- filters out data after this time
time_column -- Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- download(target_path)[source]#
download to the target dir/path
- Parameters:
target_path -- local target path for the downloaded item
- get(size=None, offset=0, encoding=None)[source]#
read all or a byte range and return the content
- Parameters:
size -- number of bytes to get
offset -- fetch from offset (in bytes)
encoding -- encoding (e.g. "utf-8") for converting bytes to str
- get_artifact_type() str | None [source]#
Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.
- Returns:
The store prefix of the artifact if it is an artifact data item and None if not.
- property key#
DataItem key
- property kind#
DataItem store kind (file, s3, v3io, ..)
- property meta#
Artifact Metadata, when the DataItem is read from the artifacts store
- put(data, append=False)[source]#
write/upload the data, append is only supported by some datastores
- Parameters:
data -- data (bytes/str) to write
append -- append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!
- show(format: str | None = None) None [source]#
show the data object content in Jupyter
- Parameters:
format -- format to use (when there is no/wrong suffix), e.g. 'png'
- property store#
DataItem store object
- property suffix#
DataItem suffix (file extension) e.g. '.png'
- upload(src_path)[source]#
upload the source file (src_path)
- Parameters:
src_path -- source file path to read from and upload
- property url#
//bucket/path
- Type:
DataItem url e.g. /dir/path, s3
- class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#
Bases:
DatabricksFile
Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278
Create a new instance of the DatabricksFile.
The blocksize needs to be the default one.
- class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#
Bases:
DatabricksFileSystem
Create a new DatabricksFileSystem.
- Parameters:
instance (str) -- The instance URL of the databricks cluster. For example for an Azure databricks cluster, this has the form adb-<some-number>.<two digits>.azuredatabricks.net.
token (str) -- Your personal token. Find out more here: https://docs.databricks.com/dev-tools/api/latest/authentication.html
- protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
- root_marker = '/'#
- storage_args: Tuple[Any, ...]#
- storage_options: Dict[str, Any]#
- class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#
Bases:
OnlineSource
- kind = 'http'#
- class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#
Bases:
OnlineSource
Sets kafka source for the flow
- Parameters:
brokers -- list of broker IP addresses
topics -- list of topic names on which to listen.
group -- consumer group. Default "serving"
initial_offset -- from where to consume the stream. Default earliest
partitions -- Optional, A list of partitions numbers for which the function receives events.
sasl_user -- Optional, user name to use for sasl authentications
sasl_pass -- Optional, password to use for sasl authentications
attributes -- Optional, extra attributes to be passed to kafka trigger
- create_topics(num_partitions: int = 4, replication_factor: int = 1, topics: list[str] | None = None)[source]#
Create Kafka topics with the specified number of partitions and replication factor.
- Parameters:
num_partitions -- number of partitions for the topics
replication_factor -- replication factor for the topics
topics -- list of topic names to create, if None, the topics will be taken from the source attributes
- kind = 'kafka'#
- class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#
Bases:
NoSqlBaseTarget
- kind: str = 'nosql'#
- support_spark = True#
- writer_step_name = 'NoSqlTarget'#
- class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None, additional_filters: list[Union[tuple, list]] | None = None)[source]#
Bases:
BaseSourceDriver
Reads Parquet file/dir as input source for a flow.
- Parameters:
name -- name of the source
path -- path to Parquet file or directory
key_field -- the column to be used as the key for events. Can be a list of keys.
time_field -- Optional. Feature set's timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.
start_filter -- datetime. If not None, the results will be filtered by partitions and 'filter_column' > start_filter. Default is None
end_filter -- datetime. If not None, the results will be filtered by partitions 'filter_column' <= end_filter. Default is None
schedule -- string to configure scheduling of the ingestion job. For example '*/30 * * * *' will cause the job to run every 30 minutes
start_time -- filters out data before this time
end_time -- filters out data after this time
attributes -- additional parameters to pass to storey.
additional_filters -- List of additional_filter conditions as tuples. Each tuple should be in the format (column_name, operator, value). Supported operators: "=", ">=", "<=", ">", "<". Example: [("Product", "=", "Computer")] For all supported filters, please see: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
- property additional_filters#
- property end_time#
- classmethod from_dict(struct=None, fields=None, deprecated_fields: dict | None = None)[source]#
create an object from a python dictionary
- kind = 'parquet'#
- property start_time#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: dict[str, str] | None = None)[source]#
Bases:
BaseStoreTarget
Parquet target storage driver, used to materialize feature set/vector data into parquet files.
- Parameters:
name -- optional, target name. By default will be called ParquetTarget
path -- optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/
attributes -- optional, extra attributes for storey.ParquetTarget
after_step -- optional, after what step in the graph to add the target
columns -- optional, which columns from data to write
partitioned -- optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour
key_bucketing_number -- optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them
partition_cols -- optional, name of columns from the data to partition by
time_partitioning_granularity -- optional. the smallest time unit to partition the data by. For example "hour" will yield partitions of the format /year/month/day/hour
max_events -- optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events
flush_after_seconds -- optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, additional_filters=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'parquet'#
- support_append = True#
- support_dask = True#
- support_pandas = True#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#
Bases:
OnlineSource
Sets the stream source for the flow. If the stream doesn't exist it will create it.
- Parameters:
name -- stream name. Default "stream"
group -- consumer group. Default "serving"
seek_to -- from where to consume the stream. Default earliest
shards -- number of shards in the stream. Default 1
retention_in_hours -- if stream doesn't exist and it will be created set retention time. Default 24h
extra_attributes -- additional nuclio trigger attributes (key/value dict)
- kind = 'v3ioStream'#
- class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: list[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: dict[str, str] | None = None, schema: dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- is_online = False#
- is_table = False#
- kind: str = 'stream'#
- support_append = True#
- support_spark = False#
- support_storey = True#
- mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#
get store resource object by uri
- mlrun.datastore.get_stream_pusher(stream_path: str, **kwargs)[source]#
get a stream pusher object from URL.
common kwargs:
create: create a new stream if doesnt exist shards: number of shards retention_in_hours: stream retention in hours
- Parameters:
stream_path -- path/url of stream
- class mlrun.datastore.datastore_profile.DatastoreProfile(*, type: str, name: str)[source]#
Bases:
BaseModel
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- name: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfile2Json[source]#
Bases:
BaseModel
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- static get_json_private(profile: DatastoreProfile) str [source]#
- static get_json_public(profile: DatastoreProfile) str [source]#
- class mlrun.datastore.datastore_profile.DatastoreProfileAzureBlob(*, type: str = 'az', name: str, connection_string: str | None = None, account_name: str | None = None, account_key: str | None = None, tenant_id: str | None = None, client_id: str | None = None, client_secret: str | None = None, sas_token: str | None = None, credential: str | None = None, container: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- account_key: str | None#
- account_name: str | None#
- client_id: str | None#
- client_secret: str | None#
- connection_string: str | None#
- container: str | None#
- credential: str | None#
- sas_token: str | None#
- tenant_id: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileBasic(*, type: str = 'basic', name: str, public: str, private: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- private: str | None#
- public: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileDBFS(*, type: str = 'dbfs', name: str, endpoint_url: str | None = None, token: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- endpoint_url: str | None#
- token: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileGCS(*, type: str = 'gcs', name: str, credentials_path: str | None = None, gcp_credentials: str | dict | None = None, bucket: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- bucket: str | None#
- credentials_path: str | None#
- gcp_credentials: str | dict | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileHdfs(*, type: str = 'hdfs', name: str, host: str | None = None, port: int | None = None, http_port: int | None = None, user: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- host: str | None#
- http_port: int | None#
- port: int | None#
- type: str#
- user: str | None#
- class mlrun.datastore.datastore_profile.DatastoreProfileKafkaSource(*, type: str = 'kafka_source', name: str, brokers: str | list[str], topics: str | list[str], group: str | None = 'serving', initial_offset: str | None = 'earliest', partitions: str | list[str] | None = None, sasl_user: str | None = None, sasl_pass: str | None = None, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- brokers: str | list[str]#
- group: str | None#
- initial_offset: str | None#
- kwargs_private: dict | None#
- kwargs_public: dict | None#
- partitions: str | list[str] | None#
- sasl_pass: str | None#
- sasl_user: str | None#
- topics: str | list[str]#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileKafkaTarget(*, type: str = 'kafka_target', name: str, bootstrap_servers: str | None = None, brokers: str | None = None, topic: str, kwargs_public: dict | None = None, kwargs_private: dict | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- bootstrap_servers: str | None#
- brokers: str | None#
- kwargs_private: dict | None#
- kwargs_public: dict | None#
- topic: str#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileRedis(*, type: str = 'redis', name: str, endpoint_url: str, username: str | None = None, password: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- endpoint_url: str#
- password: str | None#
- type: str#
- username: str | None#
- class mlrun.datastore.datastore_profile.DatastoreProfileS3(*, type: str = 's3', name: str, endpoint_url: str | None = None, force_non_anonymous: str | None = None, profile_name: str | None = None, assume_role_arn: str | None = None, access_key_id: str | None = None, secret_key: str | None = None, bucket: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- access_key_id: str | None#
- assume_role_arn: str | None#
- bucket: str | None#
- endpoint_url: str | None#
- force_non_anonymous: str | None#
- profile_name: str | None#
- secret_key: str | None#
- type: str#
- class mlrun.datastore.datastore_profile.DatastoreProfileV3io(*, type: str = 'v3io', name: str, v3io_access_key: str | None = None)[source]#
Bases:
DatastoreProfile
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
- type: str#
- v3io_access_key: str | None#
- class mlrun.datastore.datastore_profile.TemporaryClientDatastoreProfiles(*args, **kwargs)[source]#
Bases:
object
- add(profile: DatastoreProfile)[source]#
- mlrun.datastore.datastore_profile.datastore_profile_read(url, project_name='', secrets: dict | None = None)[source]#
- mlrun.datastore.datastore_profile.register_temporary_client_datastore_profile(profile: DatastoreProfile)[source]#
Register the datastore profile. This profile is temporary and remains valid only for the duration of the caller's session. It's beneficial for testing purposes.