mlrun.datastore#
- class mlrun.datastore.BigQuerySource(name: str = '', table: str | None = None, max_results_for_table: int | None = None, query: str | None = None, materialization_dataset: str | None = None, chunksize: int | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time=None, end_time=None, gcp_project: str | None = None, spark_options: dict | None = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads Google BigQuery query results as input source for a flow.
For authentication, set the GCP_CREDENTIALS project secret to the credentials json string.
example:
# set the credentials project.set_secrets({"GCP_CREDENTIALS": gcp_credentials_json}) # use sql query query_string = "SELECT * FROM `the-psf.pypi.downloads20210328` LIMIT 5000" source = BigQuerySource("bq1", query=query_string, gcp_project="my_project", materialization_dataset="dataviews") # read a table source = BigQuerySource("bq2", table="the-psf.pypi.downloads20210328", gcp_project="my_project")
- Parameters:
name – source name
table – table name/path, cannot be used together with query
query – sql query string
materialization_dataset – for query with spark, The target dataset for the materialized view. This dataset should be in same location as the view or the queried tables. must be set to a dataset where the GCP user has table creation permission
chunksize – number of rows per chunk (default large single chunk)
key_field – the column to be used as the key for events. Can be a list of keys.
time_field – the column to be used for time filtering. Defaults to the feature set’s timestamp_key.
schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes
start_time – filters out data before this time
end_time – filters out data after this time
gcp_project – google cloud project name
spark_options – additional spark read options
- kind = 'bigquery'#
- support_spark = True#
- support_storey = False#
- class mlrun.datastore.CSVSource(name: str = '', path: str | None = None, attributes: Dict[str, str] | None = None, key_field: str | None = None, schedule: str | None = None, parse_dates: None | int | str | List[int] | List[str] = None, **kwargs)[source]#
Bases:
BaseSourceDriver
Reads CSV file as input source for a flow.
- Parameters:
name – name of the source
path – path to CSV file
key_field – the CSV field to be used as the key for events. May be an int (field index) or string (field name) if with_header is True. Defaults to None (no key). Can be a list of keys.
schedule – string to configure scheduling of the ingestion job.
attributes – additional parameters to pass to storey. For example: attributes={“timestamp_format”: ‘%Y%m%d%H’}
parse_dates – Optional. List of columns (names or integers) that will be attempted to parse as date column.
- kind = 'csv'#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.CSVTarget(name: str = '', path=None, attributes: Dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: List[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: Dict[str, str] | None = None, schema: Dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'csv'#
- suffix = '.csv'#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.DataItem(key: str, store: DataStore, subpath: str, url: str = '', meta=None, artifact_url=None)[source]#
Bases:
object
Data input/output class abstracting access to various local/remote data sources
DataItem objects are passed into functions and can be used inside the function, when a function run completes users can access the run data via the run.artifact(key) which returns a DataItem object. users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using mlrun.get_dataitem(url).
Example:
# using data item inside a function def my_func(context, data: DataItem): df = data.as_df() # reading run results using DataItem (run.artifact()) train_run = train_iris_func.run(inputs={'dataset': dataset}, params={'label_column': 'label'}) train_run.artifact('confusion-matrix').show() test_set = train_run.artifact('test_set').as_df() # create and use DataItem from uri data = mlrun.get_dataitem('http://xyz/data.json').get()
- property artifact_url#
DataItem artifact url (when its an artifact) or url for simple dataitems
- as_df(columns=None, df_module=None, format='', time_column=None, start_time=None, end_time=None, **kwargs)[source]#
return a dataframe object (generated from the dataitem).
- Parameters:
columns – optional, list of columns to select
df_module – optional, py module used to create the DataFrame (e.g. pd, dd, cudf, ..)
format – file format, if not specified it will be deducted from the suffix
start_time – filters out data before this time
end_time – filters out data after this time
time_column – Store timestamp_key will be used if None. The results will be filtered by this column and start_time & end_time.
- download(target_path)[source]#
download to the target dir/path
- Parameters:
target_path – local target path for the downloaded item
- get(size=None, offset=0, encoding=None)[source]#
read all or a byte range and return the content
- Parameters:
size – number of bytes to get
offset – fetch from offset (in bytes)
encoding – encoding (e.g. “utf-8”) for converting bytes to str
- get_artifact_type() str | None [source]#
Check if the data item represents an Artifact (one of Artifact, DatasetArtifact and ModelArtifact). If it does it return the store uri prefix (artifacts, datasets or models), otherwise None.
- Returns:
The store prefix of the artifact if it is an artifact data item and None if not.
- property key#
DataItem key
- property kind#
DataItem store kind (file, s3, v3io, ..)
- property meta#
Artifact Metadata, when the DataItem is read from the artifacts store
- put(data, append=False)[source]#
write/upload the data, append is only supported by some datastores
- Parameters:
data – data (bytes/str) to write
append – append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES!
- show(format=None)[source]#
show the data object content in Jupyter
- Parameters:
format – format to use (when there is no/wrong suffix), e.g. ‘png’
- property store#
DataItem store object
- property suffix#
DataItem suffix (file extension) e.g. ‘.png’
- upload(src_path)[source]#
upload the source file (src_path)
- Parameters:
src_path – source file path to read from and upload
- property url#
//bucket/path
- Type:
DataItem url e.g. /dir/path, s3
- class mlrun.datastore.DatabricksFileBugFixed(fs, path, mode='rb', block_size='default', autocommit=True, cache_type='readahead', cache_options=None, **kwargs)[source]#
Bases:
DatabricksFile
Overrides DatabricksFile to add the following fix: fsspec/filesystem_spec#1278
Create a new instance of the DatabricksFile.
The blocksize needs to be the default one.
- class mlrun.datastore.DatabricksFileSystemDisableCache(*args, **kwargs)[source]#
Bases:
DatabricksFileSystem
Create a new DatabricksFileSystem.
- Parameters:
instance (str) – The instance URL of the databricks cluster. For example for an Azure databricks cluster, this has the form adb-<some-number>.<two digits>.azuredatabricks.net.
token (str) – Your personal token. Find out more here: https://docs.databricks.com/dev-tools/api/latest/authentication.html
- protocol: ClassVar[str | tuple[str, ...]] = 'dbfs'#
- root_marker = '/'#
- class mlrun.datastore.HttpSource(name: str | None = None, path: str | None = None, attributes: Dict[str, object] | None = None, key_field: str | None = None, time_field: str | None = None, workers: int | None = None)[source]#
Bases:
OnlineSource
- kind = 'http'#
- class mlrun.datastore.KafkaSource(brokers=None, topics=None, group='serving', initial_offset='earliest', partitions=None, sasl_user=None, sasl_pass=None, attributes=None, **kwargs)[source]#
Bases:
OnlineSource
Sets kafka source for the flow
Sets kafka source for the flow
- Parameters:
brokers – list of broker IP addresses
topics – list of topic names on which to listen.
group – consumer group. Default “serving”
initial_offset – from where to consume the stream. Default earliest
partitions – Optional, A list of partitions numbers for which the function receives events.
sasl_user – Optional, user name to use for sasl authentications
sasl_pass – Optional, password to use for sasl authentications
attributes – Optional, extra attributes to be passed to kafka trigger
- kind = 'kafka'#
- class mlrun.datastore.NoSqlTarget(*args, **kwargs)[source]#
Bases:
NoSqlBaseTarget
- kind: str = 'nosql'#
- support_spark = True#
- writer_step_name = 'NoSqlTarget'#
- class mlrun.datastore.ParquetSource(name: str = '', path: str | None = None, attributes: Dict[str, str] | None = None, key_field: str | None = None, time_field: str | None = None, schedule: str | None = None, start_time: datetime | str | None = None, end_time: datetime | str | None = None)[source]#
Bases:
BaseSourceDriver
Reads Parquet file/dir as input source for a flow.
- Parameters:
name – name of the source
path – path to Parquet file or directory
key_field – the column to be used as the key for events. Can be a list of keys.
time_field – Optional. Feature set’s timestamp_key will be used if None. The results will be filtered by this column and start_filter & end_filter.
start_filter – datetime. If not None, the results will be filtered by partitions and ‘filter_column’ > start_filter. Default is None
end_filter – datetime. If not None, the results will be filtered by partitions ‘filter_column’ <= end_filter. Default is None
schedule – string to configure scheduling of the ingestion job. For example ‘*/30 * * * *’ will cause the job to run every 30 minutes
start_time – filters out data before this time
end_time – filters out data after this time
attributes – additional parameters to pass to storey.
- property end_time#
- kind = 'parquet'#
- property start_time#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.ParquetTarget(name: str = '', path=None, attributes: Dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool | None = None, key_bucketing_number: int | None = None, partition_cols: List[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = 10000, flush_after_seconds: int | None = 900, storage_options: Dict[str, str] | None = None)[source]#
Bases:
BaseStoreTarget
parquet target storage driver, used to materialize feature set/vector data into parquet files
- Parameters:
name – optional, target name. By default will be called ParquetTarget
path – optional, Output path. Can be either a file or directory. This parameter is forwarded as-is to pandas.DataFrame.to_parquet(). Default location v3io:///projects/{project}/FeatureStore/{name}/parquet/
attributes – optional, extra attributes for storey.ParquetTarget
after_step – optional, after what step in the graph to add the target
columns – optional, which columns from data to write
partitioned – optional, whether to partition the file, False by default, if True without passing any other partition field, the data will be partitioned by /year/month/day/hour
key_bucketing_number – optional, None by default will not partition by key, 0 will partition by the key as is, any other number X will create X partitions and hash the keys to one of them
partition_cols – optional, name of columns from the data to partition by
time_partitioning_granularity – optional. the smallest time unit to partition the data by. For example “hour” will yield partitions of the format /year/month/day/hour
max_events – optional. Maximum number of events to write at a time. All events will be written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Default 10k events
flush_after_seconds – optional. Maximum number of seconds to hold events before they are written. All events will be written on flow termination, or after max_events are accumulated (if max_events is set). Default 15 minutes
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- as_df(columns=None, df_module=None, entities=None, start_time=None, end_time=None, time_column=None, **kwargs)[source]#
return the target data as dataframe
- is_offline = True#
- kind: str = 'parquet'#
- support_append = True#
- support_dask = True#
- support_spark = True#
- support_storey = True#
- class mlrun.datastore.StreamSource(name='stream', group='serving', seek_to='earliest', shards=1, retention_in_hours=24, extra_attributes: dict | None = None, **kwargs)[source]#
Bases:
OnlineSource
Sets stream source for the flow. If stream doesn’t exist it will create it
Sets stream source for the flow. If stream doesn’t exist it will create it
- Parameters:
name – stream name. Default “stream”
group – consumer group. Default “serving”
seek_to – from where to consume the stream. Default earliest
shards – number of shards in the stream. Default 1
retention_in_hours – if stream doesn’t exist and it will be created set retention time. Default 24h
extra_attributes – additional nuclio trigger attributes (key/value dict)
- kind = 'v3ioStream'#
- class mlrun.datastore.StreamTarget(name: str = '', path=None, attributes: Dict[str, str] | None = None, after_step=None, columns=None, partitioned: bool = False, key_bucketing_number: int | None = None, partition_cols: List[str] | None = None, time_partitioning_granularity: str | None = None, max_events: int | None = None, flush_after_seconds: int | None = None, storage_options: Dict[str, str] | None = None, schema: Dict[str, Any] | None = None, credentials_prefix=None)[source]#
Bases:
BaseStoreTarget
- add_writer_step(graph, after, features, key_columns=None, timestamp_key=None, featureset_status=None)[source]#
- is_online = False#
- is_table = False#
- kind: str = 'stream'#
- support_append = True#
- support_spark = False#
- support_storey = True#
- mlrun.datastore.get_store_resource(uri, db=None, secrets=None, project=None, data_store_secrets=None)[source]#
get store resource object by uri