Skip to content

Assembling a kiara pipeline

Preparation

If you haven't already, it would make sense for you to go through the kiara getting started guide, as well as writing your own kiara module. We'll use the development environment set-up from the latter guide, as well as the module created there as a step in our pipeline.

Creating a pipeline

A kiara pipeline is a dict-like data structure that includes one or several processing steps (implemented by kiara operations), connected (or not) in a specific way so that some steps outputs feed into other steps' inputs.

A single-step pipeline

The simplest pipeline contains a single operation, and is not useful in any way, since it's easier to just kiara run the operation directly. Nonetheless, below is how that would look like, we'll be using the module we created in the writing your own kiara module guide:

steps:
  - module_type: filter.table
    step_id: filter_table_step

A pipeline step is a dictionary with 2 required keys (and some optional ones, which we'll cover later):

  • module_type: the name of the module or operation that should be used.
  • step_id: the name of the step, ideally a short, descriptive name outlining what the step does. It can't contain special characters except '_', and it must be unique within the pipeline.

An assembled pipeline has the same characteristics as a kiara module, and in fact is a perfectly valid operation, like any other one, and can be called the same way.

Create a new file my_first_pipeline.yaml, and copy and paste the above code into it. Then, run the operation explain command against the file:

kiara operation explain my_first_pipeline.yaml
╭─ Operation: my_first_pipeline.yaml.yaml ──────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   Documentation   -- n/a --                                                                                                              │
│                                                                                                                                          │
│   Inputs                                                                                                                                 │
│                     field name                         type     description                              Required   Default              │
│                    ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────    │
│                     filter_table_step__table_input     table    The table to filter.                     yes        -- no default --     │
│                     filter_table_step__column_name     string   The column containing the element to     no         City                 │
│                                                                 use as filter.                                                           │
│                     filter_table_step__filter_string   string   The string to use as filter.             yes        -- no default --     │
│                                                                                                                                          │
│                                                                                                                                          │
│   Outputs                                                                                                                                │
│                     field name                        type    description                                                                │
│                    ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────    │
│                     filter_table_step__table_output   table   The filtered table.                                                        │
│                                                                                                                                          │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

As you can see, kiara turned this (single-step) pipeline into an operation, and auto-generated some input- and output-fields, by assembling the step-id and step input-/output-field(s). Those long field names are a bit unwieldy, and we'll remedy that later, for now let's just ignore that.

Adding a second step

In the previous tutorial we pre-seeded the kiara data store with a csv file/tabular dataset, to help us with developing our table filter module. In this tutorial, we'll remove the requirement to do that, by adding a step to our pipeline that lets the user specify a path to a csv file, and import and convert that into a table value.

Previously, we've used the import.table.from.local_file_path operation to import the csv file, and we can do the same now. Edit the pipeline file you created so it looks like the following:

steps:
  - module_type: import.table.from.local_file_path
    step_id: import_table_step
  - module_type: filter.table
    step_id: filter_table_step
    input_links:
      table_input: import_table_step.table

What we did here:

  • add a new step with the id import_table_step, which will execute the import.table.from.local_file_path operation
  • leave our filter step in place, but connect the table_input input of this steps operation to the table output field of the import_table_step operation

!!! note: To find out the input/output field names of each step, use kiara operation explain <MODULE_TYPE>.

We can ask kiara again about what it thinks of this new pipeline/operation:

kiara operation explain my_first_pipeline.yaml
╭─ Operation: my_first_pipeline.yaml.yaml ──────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   Documentation   -- n/a --                                                                                                              │
│                                                                                                                                          │
│   Inputs                                                                                                                                 │
│                     field name                         type     description                              Required   Default              │
│                    ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────    │
│                     import_table_step__path            string   The local path to the file.              yes        -- no default --     │
│                     filter_table_step__column_name     string   The column containing the element to     no         City                 │
│                                                                 use as filter.                                                           │
│                     filter_table_step__filter_string   string   The string to use as filter.             yes        -- no default --     │
│                                                                                                                                          │
│                                                                                                                                          │
│   Outputs                                                                                                                                │
│                     field name                         type    description                                                               │
│                    ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────    │
│                     import_table_step__imported_file   file    The loaded files.                                                         │
│                     import_table_step__table           table   The result value (of type 'table').                                       │
│                     filter_table_step__table_output    table   The filtered table.                                                       │
│                                                                                                                                          │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

As you can see, the previously existing input with the field name filter_table_step__table_input (type: table) is gone now, replaced by a new one, with the field name import_table_step__path (type: string). The other two inputs remain the same (since we did not connect a step output to them).

Side-note: visualizing the pipeline

We can let kiara visualize our pipeline at each step in the development process. This is quite useful, as it can serve as a visual aid to debug and assemble pipelines and their steps.

If you want to do this in your own environment, you need to have Java installed, as well as an additional Python dependency in your virtual- or conda-environment:

pip install 'git+https://github.com/cosminbasca/asciinet.git#egg=asciinet&subdirectory=pyasciinet'

Currently, the following commands exist to print a pipeline as graph on the command-line:

  • kiara pipeline execution-graph <pipeline_file>: display the pipeline steps in the order they will be executed.
  • kiara pipeline data-flow-graph <pipeline_file>: display the the connections of inputs/outputs as well as processing steps.

As an example, let's look at the execution graph of our current pipeline:

kiara pipeline execution-graph my_first_pipeline.yaml
     ┌────────┐     
     │__root__│     
     └────┬───┘     
          │         
          v         
 ┌─────────────────┐
 │import_table_step│
 └────────┬────────┘
          │         
          v         
 ┌─────────────────┐
 │filter_table_step│
 └─────────────────┘

Adjusting the input-/output-field names

We could run our pipeline as is, but let's adjust its input- and output field names first. Mainly to make it more intuitive to use. To do that we can add one or both of the following keys to our pipeline description:

  • input_aliases: a mapping of pipeline inputs to more user-friendly names
  • output_aliases: a mapping of pipeline outputs to more user-friendly names

Lets start with our inputs. Add the following to your pipeline file:

input_aliases:
    import_table_step.path: csv_file_path
    filter_table_step.column_name: column_name
    filter_table_step.filter_string: filter_string

This is basically just a rename of one (or several, or all) pipeline-input-fields, to shorter names. If you specify the same value for several keys, then the user input for those fields will be re-used for all the keys that have that value (we'll cover that in a later tutorial).

Now let's do our outputs:

output_aliases:
    filter_table_step.table_output: filtered_table

Output aliases work a bit different to input aliases: for inputs, if we don't specify a field, kiara will just use the auto-generated name (since the need for the input to exist does not go away). For output aliases, if we don't specify a field, kiara will ignore that output, and not display it to the user. In our case, we are not really interested in the intermediate outputs of the first step, so we only add the filtered_table alias that represents our final, filtered result.

Lets see what kiara has to say about the 'API' of our pipelines now:

kiara operation explain my_first_pipeline.yaml
╭─ Operation: my_first_pipeline.yaml.yaml ──────────────────────────────────────────────────╮
│                                                                                                  │
│   Documentation   -- n/a --                                                                      │
│                                                                                                  │
│   Inputs                                                                                         │
│                     field name      type     description         Required   Default              │
│                    ──────────────────────────────────────────────────────────────────────────    │
│                     csv_file_path   string   The local path to   yes        -- no default --     │
│                                              the file.                                           │
│                     column_name     string   The column          no         City                 │
│                                              containing the                                      │
│                                              element to use as                                   │
│                                              filter.                                             │
│                     filter_string   string   The string to use   yes        -- no default --     │
│                                              as filter.                                          │
│                                                                                                  │
│                                                                                                  │
│   Outputs                                                                                        │
│                     field name       type    description                                         │
│                    ──────────────────────────────────────────────────────────────────────────    │
│                     filtered_table   table   The filtered table.                                 │
│                                                                                                  │
│                                                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯

Much nicer!

Run the pipeline

Now, all that is left to do is run the pipeline:

kiara run --save filtered_table=amsterdam_journals my_first_pipeline.yaml csv_file_path=examples/data/journals/JournalNodes1902.csv filter_string=Amsterdam column_name=City
╭─ Result ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   field            data_type   value                                                                                                     │
│  ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────  │
│   filtered_table   table                                                                                                                 │
│                                  Id   Label      JournalT   City       CountryN   PresentD   Latitude   Longitu   Language   __index     │
│                                 ─────────────────────────────────────────────────────────────────────────────────────────────────────    │
│                                  75   Psychiat   speciali   Amsterda   Netherla   Netherla   52.36666   4.9       Dutch      0           │
│                                                                                                                                          │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

╭─ Stored result value ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   field            data type   stored id                              alias(es)                                                          │
│  ────────────────────────────────────────────────────────────────────────────────────────                                                │
│   filtered_table   table       c1fc88d8-66af-4330-8bca-afa4b17faab3   amsterdam_journals                                                 │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

And to confirm this worked, we ask kiara about the value we just stored (alias: amsterdam_journals), including it's lineage, which should give us the value ids of the intermediate results (in case we ever needed them -- they won't have an alias associated with it, but are still persisted in the kiara data store and can be looked up with kiara data explain <VALUE_ID> and/or `kiara data load ):

kiara data explain --lineage alias:amsterdam_journals
╭─ Value details for: alias:amsterdam_journals ────────────────────────────────────────────────────╮
│                                                                                                  │
│   value_id            c1fc88d8-66af-4330-8bca-afa4b17faab3                                       │
│   kiara_id            09ae1e2e-381d-4d42-a47d-522a3d557686                                       │
│                                                                                                  │
│                       ────────────────────────────────────────────────────────────────────────   │
│   data_type_info                                                                                 │
│                         data_type_name     table                                                 │
│                         data_type_config   {}                                                    │
│                         characteristics    {                                                     │
│                                              "is_scalar": false,                                 │
│                                              "is_json_serializable": false                       │
│                                            }                                                     │
│                         data_type_class                                                          │
│                                              python_class_name    TableType                      │
│                                              python_module_name   kiara_plugin.tabular.da…       │
│                                              full_name            kiara_plugin.tabular.da…       │
│                                                                                                  │
│                                                                                                  │
│   destiny_backlinks   {}                                                                         │
│   enviroments         None                                                                       │
│   property_links      {                                                                          │
│                         "metadata.python_class": "b5c9b9eb-e55c-4ee4-ac42-4d5469958932",         │
│                         "metadata.table": "0910a3d6-d171-4e2b-842b-025a2512335e"                 │
│                       }                                                                          │
│   value_hash          zdpuAmca9P3gSGC6XipcksvC5aArdVeBJd8WHcwa6hLB9gqaF                          │
│   value_schema                                                                                   │
│                         type          table                                                      │
│                         type_config   {}                                                         │
│                         default       __not_set__                                                │
│                         optional      False                                                      │
│                         is_constant   False                                                      │
│                         doc           The filtered table.                                        │
│                                                                                                  │
│   value_size          4.97 KB                                                                    │
│   value_status        -- set --                                                                  │
│                                                                                                  │
│                       ────────────────────────────────────────────────────────────────────────   │
│                                                                                                  │
│   lineage             filter.table                                                             │
│                       ├── input: column_name (string) = 39cc115a-108f-4d5e-a3ec-84e076a278db     │
│                       ├── input: filter_string (string) = a6dbd985-bfcc-4a3f-a513-4476474b57ed   │
│                       └── input: table_input (table) = 50852e81-e1c0-4bcb-90bb-e3d0da6ea1b7      │
│                           └── create.table                                                       │
│                               └── input: file (file) = a2638023-3e85-48ca-b380-b5fb29daf4e5      │
│                                   └── import.local.file                                          │
│                                       └── input: path (string) =                                 │
│                                           97ffdcef-4066-4cd4-b3c4-4961afb7012e                   │
│                                                                                                  │
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯

Making the pipeline discoverable

One of the main concepts that kiara is trying to facilitate is a modular approach to assembling data workflows. The main ingredient here is the fact that assembled pipelines behave like any other operation in kiara, which means that pipelines can also be used as (single) steps in other pipelines.

To make that work in an intuitive and simple way for users, kiara needs to know about all the pipeline descriptions the user intends to use as building blocks. If we are only concerned about a single 'top-level' pipeline (as we have done in this tutorial so far), this is not a problem, and all we need to do is point kiara to a (yaml- or json-) file containing the pipeline description.

If we want to re-use our pipelines in other pipelines, we need to 'register' them in a kiara context, and give it its own operation id.

kiara tries to make this as simple as possible, so, if you don't do anything, kiara will use the file name that contains the pipeline description (without extension), and convert it to a valid operation id. In our case, we'd end up with an operation called 'my_first_pipeline'.

If we want to have more control over this, we can add a property pipeline_name to our description, that will override the auto-generated operation id.

Using the extra_pipeline property

Each kiara context has a property called extra_pipeline. On the command-line, you can specify one or several of those via the --pipelines (or: -p) argument (after the kiara command-name, before the sub-command name):

kiara --pipelines my_first_pipeline.yaml operation list my_first
╭─ Filtered operations ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   Id                  Type(s)    Description                                                                                             │
│  ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────  │
│   my_first_pipeline   pipeline   -- n/a --                                                                                               │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

Note

kiara accepts paths to files or folders as the argument to --pipelines. If you specify a folder, kiara will look for valid pipeline descriptions under that folder.

Let's test that again, but this time we give our new pipeline a descriptive name, as well as some documentation. Add those two properties to your existing pipeline file:

pipeline_name: "import.filtered_table"
doc: |
  Import a table from a csv file, then filter it.

  Filtering is done by matching a 'filter_string' against all cells of a column, both specified by the user.

Note

Here, we use a yaml literal multiline string to specify our pipeline documentation (the: '|'). This allows us to write as much documentation as we need to, without being restricted to a single line.

kiara --pipelines my_first_pipeline.yaml operation list import
╭─ Filtered operations ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                          │
│   Id                                          Type(s)    Description                                                                     │
│  ──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────  │
│   import.database.from.local_file_path        pipeline   Import a database from a csv file.                                              │
│   import.local.file                                      Import a file from the local filesystem.                                        │
│   import.local.file_bundle                               Import a folder (file_bundle) from the local filesystem.                        │
│   import.network_data.from.local_file_paths   pipeline   Onboard the edges and nodes from local files, create table values from them,    │
│                                                          then assemble those to the network_data result.                                 │
│   import.table.from.local_file_path           pipeline   Import a table from a file on the local filesystem.                             │
│   import.table.from.local_folder_path         pipeline   Import a table from a local folder containing text files.                       │
│   import.filtered_table                       pipeline   Import a table from a csv file, then filter it.                                 │
│                                                                                                                                          │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

Going with the information contained in this output, instead of running our pipeline with the path to its file after the run sub-command, we could also do it like:

kiara --pipelines my_first_pipeline.yaml run import.filtered_table ... ... ...

Including the pipeline in a kiara plugin

In case we want to 'publish' our pipeline so it can be re-used as part of a kiara plugin (which may or may not contain native 'Python' modules/operations, custom data-types, etc.), this is also easy to do. If you have followed the 'writing your own kiara module' tutorial, you'd have created a kiara plugin project from a template. To add your pipeline to this plugin, simply copy/move it into the directory src/kiara_plugin/<YOUR_PLUGIN_NAME>/pipelines. Naming the pipeline and adding documentation works the same as outlined in the previous chapter.

If you add pipelines to a kiara context this way, nothing else should be necessary, kiara will auto-discover all the pipelines added like this, and you can use the assigned 'pipeline_name' as value of the module_type key in your step description, if you want to run it within another pipeline.