Distributed (multi-function) pipeline example

Distributed (multi-function) pipeline example#

This example demonstrates how to run a pipeline that consists of multiple serverless functions (connected using streams).

In the pipeline example the request contains the a URL of a file. It loads the content of the file and breaks it into paragraphs (using the FlatMap class), and pushes the results to a queue/stream. The second function picks up the paragraphs and runs the NLP flow to extract the entities and push the results to the output stream.

Setting the stream URLs for the internal queue, the final output and error/exceptions stream:

streams_prefix = "v3io:///users/admin/"
internal_stream = streams_prefix + "in-stream"
out_stream = streams_prefix + "out-stream"
err_stream = streams_prefix + "err-stream"

Alternatively, using Kafka:

# kafka_prefix = f"kafka://{broker}/"
# internal_topic = kafka_prefix + "in-topic"
# out_topic = kafka_prefix + "out-topic"
# err_topic = kafka_prefix + "err-topic"

In either case, continue with:

# set up the environment
import mlrun

project = mlrun.get_or_create_project("pipe", allow_cross_project=True)
> 2025-10-05 10:00:20,453 [info] Project loaded successfully: {"project_name":"pipe"}
# uncomment to install spacy requirements locally
!pip install spacy
!python -m spacy download en_core_web_sm
Collecting spacy
  Downloading spacy-3.8.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (27 kB)
Collecting spacy-legacy<3.1.0,>=3.0.11 (from spacy)
  Downloading spacy_legacy-3.0.12-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting spacy-loggers<2.0.0,>=1.0.0 (from spacy)
  Downloading spacy_loggers-1.0.5-py3-none-any.whl.metadata (23 kB)
Collecting murmurhash<1.1.0,>=0.28.0 (from spacy)
  Downloading murmurhash-1.0.13-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.2 kB)
Collecting cymem<2.1.0,>=2.0.2 (from spacy)
  Downloading cymem-2.0.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (8.5 kB)
Collecting preshed<3.1.0,>=3.0.2 (from spacy)
  Downloading preshed-3.0.10-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.4 kB)
Collecting thinc<8.4.0,>=8.3.4 (from spacy)
  Downloading thinc-8.3.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (15 kB)
Collecting wasabi<1.2.0,>=0.9.1 (from spacy)
  Downloading wasabi-1.1.3-py3-none-any.whl.metadata (28 kB)
Collecting srsly<3.0.0,>=2.4.3 (from spacy)
  Downloading srsly-2.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Collecting catalogue<2.1.0,>=2.0.6 (from spacy)
  Downloading catalogue-2.0.10-py3-none-any.whl.metadata (14 kB)
Collecting weasel<0.5.0,>=0.1.0 (from spacy)
  Downloading weasel-0.4.1-py3-none-any.whl.metadata (4.6 kB)
Requirement already satisfied: typer<1.0.0,>=0.3.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (0.17.3)
Requirement already satisfied: tqdm<5.0.0,>=4.38.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (4.67.1)
Requirement already satisfied: numpy>=1.19.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (1.26.4)
Requirement already satisfied: requests<3.0.0,>=2.13.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (2.32.5)
Requirement already satisfied: pydantic!=1.8,!=1.8.1,<3.0.0,>=1.7.4 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (2.10.6)
Requirement already satisfied: jinja2 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (3.1.6)
Requirement already satisfied: setuptools in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (80.9.0)
Requirement already satisfied: packaging>=20.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from spacy) (24.2)
Collecting langcodes<4.0.0,>=3.2.0 (from spacy)
  Downloading langcodes-3.5.0-py3-none-any.whl.metadata (29 kB)
Collecting language-data>=1.2 (from langcodes<4.0.0,>=3.2.0->spacy)
  Downloading language_data-1.3.0-py3-none-any.whl.metadata (4.3 kB)
Requirement already satisfied: annotated-types>=0.6.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from pydantic!=1.8,!=1.8.1,<3.0.0,>=1.7.4->spacy) (0.7.0)
Requirement already satisfied: pydantic-core==2.27.2 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from pydantic!=1.8,!=1.8.1,<3.0.0,>=1.7.4->spacy) (2.27.2)
Requirement already satisfied: typing-extensions>=4.12.2 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from pydantic!=1.8,!=1.8.1,<3.0.0,>=1.7.4->spacy) (4.15.0)
Requirement already satisfied: charset_normalizer<4,>=2 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from requests<3.0.0,>=2.13.0->spacy) (3.4.3)
Requirement already satisfied: idna<4,>=2.5 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from requests<3.0.0,>=2.13.0->spacy) (3.10)
Requirement already satisfied: urllib3<3,>=1.21.1 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from requests<3.0.0,>=2.13.0->spacy) (2.5.0)
Requirement already satisfied: certifi>=2017.4.17 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from requests<3.0.0,>=2.13.0->spacy) (2025.8.3)
Collecting blis<1.4.0,>=1.3.0 (from thinc<8.4.0,>=8.3.4->spacy)
  Downloading blis-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.4 kB)
Collecting confection<1.0.0,>=0.0.1 (from thinc<8.4.0,>=8.3.4->spacy)
  Downloading confection-0.1.5-py3-none-any.whl.metadata (19 kB)
Collecting numpy>=1.19.0 (from spacy)
  Downloading numpy-2.3.3-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.1/62.1 kB 2.2 MB/s eta 0:00:00
?25hRequirement already satisfied: click>=8.0.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from typer<1.0.0,>=0.3.0->spacy) (8.2.1)
Requirement already satisfied: shellingham>=1.3.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from typer<1.0.0,>=0.3.0->spacy) (1.5.4)
Requirement already satisfied: rich>=10.11.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from typer<1.0.0,>=0.3.0->spacy) (14.1.0)
Collecting cloudpathlib<1.0.0,>=0.7.0 (from weasel<0.5.0,>=0.1.0->spacy)
  Downloading cloudpathlib-0.22.0-py3-none-any.whl.metadata (16 kB)
Collecting smart-open<8.0.0,>=5.2.1 (from weasel<0.5.0,>=0.1.0->spacy)
  Downloading smart_open-7.3.1-py3-none-any.whl.metadata (24 kB)
Requirement already satisfied: MarkupSafe>=2.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from jinja2->spacy) (2.1.5)
Collecting marisa-trie>=1.1.0 (from language-data>=1.2->langcodes<4.0.0,>=3.2.0->spacy)
  Downloading marisa_trie-1.3.1-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl.metadata (10 kB)
Requirement already satisfied: markdown-it-py>=2.2.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from rich>=10.11.0->typer<1.0.0,>=0.3.0->spacy) (4.0.0)
Requirement already satisfied: pygments<3.0.0,>=2.13.0 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from rich>=10.11.0->typer<1.0.0,>=0.3.0->spacy) (2.19.2)
Requirement already satisfied: wrapt in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from smart-open<8.0.0,>=5.2.1->weasel<0.5.0,>=0.1.0->spacy) (1.17.3)
Requirement already satisfied: mdurl~=0.1 in /User/.conda/envs/daniel311/lib/python3.11/site-packages (from markdown-it-py>=2.2.0->rich>=10.11.0->typer<1.0.0,>=0.3.0->spacy) (0.1.2)
Downloading spacy-3.8.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (33.0 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 33.0/33.0 MB 30.0 MB/s eta 0:00:0000:0100:01
?25hDownloading catalogue-2.0.10-py3-none-any.whl (17 kB)
Downloading cymem-2.0.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (218 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 218.9/218.9 kB 23.8 MB/s eta 0:00:00
?25hDownloading langcodes-3.5.0-py3-none-any.whl (182 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 183.0/183.0 kB 17.8 MB/s eta 0:00:00
?25hDownloading murmurhash-1.0.13-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (123 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 123.4/123.4 kB 15.4 MB/s eta 0:00:00
?25hDownloading preshed-3.0.10-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (842 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 842.9/842.9 kB 6.5 MB/s eta 0:00:00a 0:00:01m
?25hDownloading spacy_legacy-3.0.12-py2.py3-none-any.whl (29 kB)
Downloading spacy_loggers-1.0.5-py3-none-any.whl (22 kB)
Downloading srsly-2.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.1/1.1 MB 4.9 MB/s eta 0:00:0000:0100:01
?25hDownloading thinc-8.3.6-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.5/4.5 MB 43.6 MB/s eta 0:00:0000:01:00:01
?25hDownloading numpy-2.3.3-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.9 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 16.9/16.9 MB 35.3 MB/s eta 0:00:0000:0100:01
?25hDownloading wasabi-1.1.3-py3-none-any.whl (27 kB)
Downloading weasel-0.4.1-py3-none-any.whl (50 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 50.3/50.3 kB 3.1 MB/s eta 0:00:00
?25hDownloading blis-1.3.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.7 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 11.7/11.7 MB 47.0 MB/s eta 0:00:0000:010:01m
?25hDownloading cloudpathlib-0.22.0-py3-none-any.whl (61 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 61.5/61.5 kB 7.0 MB/s eta 0:00:00
?25hDownloading confection-0.1.5-py3-none-any.whl (35 kB)
Downloading language_data-1.3.0-py3-none-any.whl (5.4 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 5.4/5.4 MB 52.3 MB/s eta 0:00:00a 0:00:01m
?25hDownloading smart_open-7.3.1-py3-none-any.whl (61 kB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 61.7/61.7 kB 8.0 MB/s eta 0:00:00
?25hDownloading marisa_trie-1.3.1-cp311-cp311-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl (1.3 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1.3/1.3 MB 54.4 MB/s eta 0:00:00
?25hInstalling collected packages: cymem, wasabi, spacy-loggers, spacy-legacy, smart-open, numpy, murmurhash, marisa-trie, cloudpathlib, catalogue, srsly, preshed, language-data, blis, langcodes, confection, weasel, thinc, spacy
  Attempting uninstall: numpy
    Found existing installation: numpy 1.26.4
    Uninstalling numpy-1.26.4:
      Successfully uninstalled numpy-1.26.4
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
datasets 4.0.0 requires fsspec[http]<=2025.3.0,>=2023.1.0, but you have fsspec 2025.7.0 which is incompatible.
mlrun 1.10.0rc26 requires numpy<1.27.0,>=1.26.4, but you have numpy 2.3.3 which is incompatible.
pandas 2.1.4 requires numpy<2,>=1.23.2; python_version == "3.11", but you have numpy 2.3.3 which is incompatible.
storey 1.10.13 requires numpy<1.27,>=1.16.5, but you have numpy 2.3.3 which is incompatible.
unstructured 0.16.12 requires numpy<2, but you have numpy 2.3.3 which is incompatible.
Successfully installed blis-1.3.0 catalogue-2.0.10 cloudpathlib-0.22.0 confection-0.1.5 cymem-2.0.11 langcodes-3.5.0 language-data-1.3.0 marisa-trie-1.3.1 murmurhash-1.0.13 numpy-2.3.3 preshed-3.0.10 smart-open-7.3.1 spacy-3.8.7 spacy-legacy-3.0.12 spacy-loggers-1.0.5 srsly-2.5.1 thinc-8.3.6 wasabi-1.1.3 weasel-0.4.1
Collecting en-core-web-sm==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.8.0/en_core_web_sm-3.8.0-py3-none-any.whl (12.8 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 12.8/12.8 MB 43.6 MB/s eta 0:00:0000:0100:01
?25hInstalling collected packages: en-core-web-sm
Successfully installed en-core-web-sm-3.8.0
✔ Download and installation successful
You can now load the package via spacy.load('en_core_web_sm')

In this example

Create the pipeline#

The pipeline consists of two functions: data-prep and NLP. Each one has different package dependencies.

Create a file with data-prep graph steps:

Note

The model, version and operation can also be specified in the message body to support streaming protocols (e.g. Kafka).

%%writefile data_prep.py
import mlrun
import json

# load struct from a json file (event points to the url)
def load_url(event):
    url = event["url"]
    data = mlrun.get_object(url).decode("utf-8")
    return {"url": url, "doc": json.loads(data)}

def to_paragraphs(event):
    paragraphs = []
    url = event["url"]
    for i, paragraph in enumerate(event["doc"]):
        paragraphs.append(
            {"url": url, "paragraph_id": i, "paragraph": paragraph}
        )
    return paragraphs
Overwriting data_prep.py

Create a file with NLP graph steps (use spacy):

%%writefile nlp.py
import json
import spacy

def myprint(x):
    print(x)
    return x

class ApplyNLP:
    def __init__(self, context=None, spacy_dict="en_core_web_sm"):

        self.nlp = spacy.load(spacy_dict)

    def do(self, paragraph: dict):
        tokenized_paragraphs = []
        if isinstance(paragraph, (str, bytes)):
            paragraph = json.loads(paragraph)
        tokenized = {
            "url": paragraph["url"],
            "paragraph_id": paragraph["paragraph_id"],
            "tokens": self.nlp(paragraph["paragraph"]),
        }
        tokenized_paragraphs.append(tokenized)

        return tokenized_paragraphs

def extract_entities(tokens):
    paragraph_entities = []
    for token in tokens:
        entities = token["tokens"].ents
        for entity in entities:
            paragraph_entities.append(
                {
                    "url": token["url"],
                    "paragraph_id": token["paragraph_id"],
                    "entity": entity.ents,
                }
            )
    return paragraph_entities

def enrich_entities(entities):
    enriched_entities = []
    for entity in entities:
        enriched_entities.append(
            {
                "url": entity["url"],
                "paragraph_id": entity["paragraph_id"],
                "entity_text": entity["entity"][0].text,
                "entity_start_char": entity["entity"][0].start_char,
                "entity_end_char": entity["entity"][0].end_char,
                "entity_label": entity["entity"][0].label_,
            }
        )
    return enriched_entities
Overwriting nlp.py

Build and show the graph:

Create the master function ("multi-func") with the data_prep.py source and an async graph topology. Add a pipeline of steps made of custom python handlers, classes and built-in classes (like storey.FlatMap).

The pipeline runs across two functions which are connected by a queue/stream (q1). Use the function= to specify which function runs the specified step. End the flow with writing to the output stream.

# define a new real-time serving function (from code) with an async graph
fn = project.set_function(
    name="multi-func", func="./data_prep.py", kind="serving", image="mlrun/mlrun"
)
graph = fn.set_topology("flow", engine="async")

# define the graph steps (DAG)
graph.to(name="load_url", handler="load_url").to(
    name="to_paragraphs", handler="to_paragraphs"
).to("storey.FlatMap", "flatten_paragraphs", _fn="(event)").to(
    ">>", "q1", path=internal_stream
).to(name="nlp", class_name="ApplyNLP", function="enrich").to(
    name="extract_entities", handler="extract_entities", function="enrich"
).to(name="enrich_entities", handler="enrich_entities", function="enrich").to(
    "storey.FlatMap", "flatten_entities", _fn="(event)", function="enrich"
).to(name="printer", handler="myprint", function="enrich").to(
    ">>", "output_stream", path=out_stream
)
<mlrun.serving.states.QueueStep at 0x7f9d01234f10>
# specify the "enrich" child function, add extra package requirements
child = fn.add_child_function(name="enrich", url="./nlp.py", image="mlrun/mlrun")
child.spec.build.commands = [
    "python -m pip install spacy",
    "python -m spacy download en_core_web_sm",
]
graph.plot(rankdir="LR")
../_images/99296625c558bd568c10ce03a47efc0ffa8aa05025abb9b87e2db1364a59ccd5.svg

Test the pipeline locally#

Create an input file:

%%writefile in.json
["Born and raised in Queens, New York City, Trump attended Fordham University for two years and received a bachelor's degree in economics from the Wharton School of the University of Pennsylvania. He became president of his father Fred Trump's real estate business in 1971, renamed it The Trump Organization, and expanded its operations to building or renovating skyscrapers, hotels, casinos, and golf courses. Trump later started various side ventures, mostly by licensing his name. Trump and his businesses have been involved in more than 4,000 state and federal legal actions, including six bankruptcies. He owned the Miss Universe brand of beauty pageants from 1996 to 2015, and produced and hosted the reality television series The Apprentice from 2004 to 2015.", 
 "Trump's political positions have been described as populist, protectionist, isolationist, and nationalist. He entered the 2016 presidential race as a Republican and was elected in a surprise electoral college victory over Democratic nominee Hillary Clinton while losing the popular vote.[a] He became the oldest first-term U.S. president[b] and the first without prior military or government service. His election and policies have sparked numerous protests. Trump has made many false or misleading statements during his campaign and presidency. The statements have been documented by fact-checkers, and the media have widely described the phenomenon as unprecedented in American politics. Many of his comments and actions have been characterized as racially charged or racist."]
Overwriting in.json

Create a mock server (simulator) and test:

# tuggle verbosity if needed
fn.verbose = True
from nlp import Apply

# create a mock server (simulator), specify to simulate all the functions in the pipeline ("*")
server = fn.to_mock_server(current_function="*")
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Queens', 'entity_start_char': 19, 'entity_end_char': 25, 'entity_label': 'GPE'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'New York City', 'entity_start_char': 27, 'entity_end_char': 40, 'entity_label': 'GPE'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Trump', 'entity_start_char': 42, 'entity_end_char': 47, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Fordham University', 'entity_start_char': 57, 'entity_end_char': 75, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'two years', 'entity_start_char': 80, 'entity_end_char': 89, 'entity_label': 'DATE'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'the Wharton School of the University of Pennsylvania', 'entity_start_char': 141, 'entity_end_char': 193, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': "Fred Trump's", 'entity_start_char': 229, 'entity_end_char': 241, 'entity_label': 'PERSON'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': '1971', 'entity_start_char': 266, 'entity_end_char': 270, 'entity_label': 'DATE'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'The Trump Organization', 'entity_start_char': 283, 'entity_end_char': 305, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'more than 4,000', 'entity_start_char': 529, 'entity_end_char': 544, 'entity_label': 'CARDINAL'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'six', 'entity_start_char': 588, 'entity_end_char': 591, 'entity_label': 'CARDINAL'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'Universe', 'entity_start_char': 624, 'entity_end_char': 632, 'entity_label': 'PERSON'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': '1996 to 2015', 'entity_start_char': 663, 'entity_end_char': 675, 'entity_label': 'DATE'}
{'url': 'in.json', 'paragraph_id': 0, 'entity_text': 'The Apprentice', 'entity_start_char': 731, 'entity_end_char': 745, 'entity_label': 'WORK_OF_ART'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Trump', 'entity_start_char': 0, 'entity_end_char': 5, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': '2016', 'entity_start_char': 122, 'entity_end_char': 126, 'entity_label': 'DATE'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Republican', 'entity_start_char': 150, 'entity_end_char': 160, 'entity_label': 'NORP'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Democratic', 'entity_start_char': 222, 'entity_end_char': 232, 'entity_label': 'NORP'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Hillary Clinton', 'entity_start_char': 241, 'entity_end_char': 256, 'entity_label': 'PERSON'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'first', 'entity_start_char': 312, 'entity_end_char': 317, 'entity_label': 'ORDINAL'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'U.S.', 'entity_start_char': 323, 'entity_end_char': 327, 'entity_label': 'GPE'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'first', 'entity_start_char': 349, 'entity_end_char': 354, 'entity_label': 'ORDINAL'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'Trump', 'entity_start_char': 459, 'entity_end_char': 464, 'entity_label': 'ORG'}
{'url': 'in.json', 'paragraph_id': 1, 'entity_text': 'American', 'entity_start_char': 671, 'entity_end_char': 679, 'entity_label': 'NORP'}
# push a sample request into the pipeline and see the results print out (by the printer step)
resp = server.test(body={"url": "in.json"})
server.wait_for_completion()
server.wait_for_completion()

Deploy to the cluster#

# add credentials to the data/streams
fn.apply(mlrun.platforms.v3io_cred())
child.apply(mlrun.platforms.v3io_cred())

# specify the error stream (to store exceptions from the functions)
fn.spec.error_stream = err_stream

# deploy as a set of serverless functions
fn.deploy()

Listen on the output stream

You can use the SDK or CLI to listen on the output stream. Listening should be done in a separate console/notebook. Run:

mlrun watch-stream v3io:///users/admin/out-stream -j

or use the SDK:

from mlrun.platforms import watch_stream
watch_stream("v3io:///users/admin/out-stream", is_json=True)

Test the live function:

Note

The url must be a valid path to the input file.

fn.invoke("", body={"url": "v3io:///users/admin/pipe/in.json"})
{'id': '79354e45-a158-405f-811c-976e9cf4ab5e'}