Remote execution#
You can chain functions together with remote execution. This allows you to:
Call existing functions from the graph and reuse them from other graphs
Scale up and down different components individually
Call remote functions either using HTTP or using a queue / streaming.
In this section
HTTP#
Calling a function using HTTP uses the special $remote class. First deploy the remote function:
remote_func_name = "serving-example-flow"
fn_remote = project.set_function(
name=remote_func_name, kind="serving", image="mlrun/mlrun"
)
fn_remote.add_model(
"model1",
class_name="ClassifierModel",
model_path="https://s3.wasabisys.com/iguazio/models/iris/model.pkl",
)
remote_addr = fn_remote.deploy()
> 2022-03-17 08:20:40,674 [info] Starting remote function deploy
2022-03-17 08:20:40 (info) Deploying function
2022-03-17 08:20:40 (info) Building
2022-03-17 08:20:40 (info) Staging files and preparing base images
2022-03-17 08:20:40 (info) Building processor image
2022-03-17 08:20:42 (info) Build complete
2022-03-17 08:20:47 (info) Function deploy complete
> 2022-03-17 08:20:48,289 [info] successfully deployed function: {'internal_invocation_urls': ['nuclio-graph-basic-concepts-serving-example-flow.default-tenant.svc.cluster.local:8080'], 'external_invocation_urls': ['graph-basic-concepts-serving-example-flow-graph-basic-concepts.default-tenant.app.maor-gcp2.iguazio-cd0.com/']}
Create a new function with a graph and call the remote function above:
fn_preprocess = project.set_function(name="preprocess", kind="serving")
graph_preprocessing = fn_preprocess.set_topology("flow")
graph_preprocessing.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
"$remote", "remote_func", url=f"{remote_addr}v2/models/model1/infer", method="put"
).respond()
graph_preprocessing.plot(rankdir="LR")
fn3_server = fn_preprocess.to_mock_server()
my_data = """{"inputs":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}"""
result = fn3_server.test("/v2/models/my_model/infer", body=my_data)
fn3_server.wait_for_completion()
print(result)
> 2022-03-17 08:20:48,374 [warning] run command, file or code were not specified
{'id': '3a1dd36c-e7de-45af-a0c4-72e3163ba92a', 'model_name': 'model1', 'outputs': [0, 2]}
Queues and streams#
Queues and streams accept data from one or more source steps and publish to one or more output steps. You can use them to send events from one part of a graph to another and to decouple the processing of those parts.
The fundamental architectural difference between queues and streams is acknowledgement-based removal in queues vs. offset-based position tracking in streams. Streams are append-only logs you can navigate; queues are FIFO structures that consume messages destructively. Queues are better suited to deal with bursts of events, since all the events are stored in the queue until they are processed. Queues are best used to connect independent functions/containers. Queues can run in-memory or be implemented using a stream, which allows it to span processes/containers. Currently queues support Iguazio v3io and Kafka streams.
Both streams and queues are invoked with the to method where the class name is one of >> or $queue to specify that this is a queue/stream.
To configure a consumer group for the step, include the group in the to method.
Kafka stream example#
You can also use Kafka to configure the streams.
%%writefile echo.py
def echo_handler(x):
print(x)
return x
Configure the streams+
import os
kafka_prefix = f"kafka://{broker}/"
internal_topic = kafka_prefix + "in-topic"
out_topic = kafka_prefix + "out-topic"
err_topic = kafka_prefix + "err-topic"
# replace this
brokers = "<broker IP>"
Create the graph. In the to method the class name is one of >> or $queue to specify that this is a queue. To configure a consumer group for the step, include the group in the to method.
import mlrun
fn_preprocess2 = project.set_function("preprocess", kind="serving")
fn_preprocess2.add_child_function("echo_func", "./echo.py", "mlrun/mlrun")
graph_preprocess2 = fn_preprocess2.set_topology("flow")
graph_preprocess2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
">>",
"input_stream",
path=input_topic,
group="mygroup",
kafka_brokers=brokers,
).to(name="echo", handler="echo_handler", function="echo_func").to(
">>", "output_stream", path=out_topic, kafka_brokers=brokers
)
graph_preprocess2.plot(rankdir="LR")
from echo import *
fn4_server = fn_preprocess2.to_mock_server(current_function="*")
fn4_server.set_error_stream(f"kafka://{brokers}/{err_topic}")
my_data = """{"inputs":[[5.1, 3.5, 1.4, 0.2],[7.7, 3.8, 6.7, 2.2]]}"""
result = fn4_server.test("/v2/models/my_model/infer", body=my_data)
fn4_server.wait_for_completion()
print(result)
V3IO stream example#
The example below uses a V3IO stream, which is a fast real-time implementation of a stream that allows processing of events at very low latency.
%%writefile echo.py
def echo_handler(x):
print(x)
return x
Overwriting echo.py
Configure the streams:
import os
streams_prefix = (
f"v3io:///users/{os.getenv('V3IO_USERNAME')}/examples/graph-basic-concepts"
)
input_stream = streams_prefix + "/in-stream"
out_stream = streams_prefix + "/out-stream"
err_stream = streams_prefix + "/err-stream"
Create the graph.
fn_preprocess2 = project.set_function("preprocess", kind="serving")
fn_preprocess2.add_child_function("echo_func", "./echo.py", "mlrun/mlrun")
graph_preprocess2 = fn_preprocess2.set_topology("flow")
graph_preprocess2.to("storey.Extend", name="enrich", _fn='({"tag": "something"})').to(
">>", "input_stream", path=input_stream, group="mygroup"
).to(name="echo", handler="echo_handler", function="echo_func").to(
">>", "output_stream", path=out_stream, sharding_func="partition"
)
graph_preprocess2.plot(rankdir="LR")
from echo import *
fn4_server = fn_preprocess2.to_mock_server(current_function="*")
my_data = """{"inputs": [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], "partition": 0}"""
result = fn4_server.test("/v2/models/my_model/infer", body=my_data)
fn4_server.wait_for_completion()
print(result)
> 2022-03-17 08:20:55,182 [warning] run command, file or code were not specified
{'id': 'a6efe8217b024ec7a7e02cf0b7850b91'}
{'inputs': [[5.1, 3.5, 1.4, 0.2], [7.7, 3.8, 6.7, 2.2]], 'tag': 'something'}