Skip to content

module_types

table.filters

type_name table.filters The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name TableFiltersModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table.filters The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.filters.TableFiltersModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: filter_name: str = self.get_config_value("filter_name") data_type_data = self.__class__.get_supported_type() data_type = data_type_data["type"] # data_type_config = data_type_data["type_config"] # TODO: ensure value is of the right type? source_obj = inputs.get_value_obj("value") func_name = f"filter__{filter_name}" if not hasattr(self, func_name): raise Exception( f"Can't apply filter '{filter_name}': missing function '{func_name}' in class '{self.__class__.__name__}'. Please check this modules documentation or source code to determine which filters are supported." ) func = getattr(self, func_name) # TODO: check signature? filter_inputs = {} for k, v in inputs.items(): if k == data_type: continue filter_inputs[k] = v.data result = func(value=source_obj, filter_inputs=filter_inputs) if result is None: outputs.set_value("value", source_obj) else: outputs.set_value("value", result) The source code of the process method of the module.

render.database

type_name render.database The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name RenderDatabaseModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.db The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.db.RenderDatabaseModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: source_type = self.get_config_value("source_type") target_type = self.get_config_value("target_type") value: Value = inputs.get_value_obj("value") render_scene: DictModel = inputs.get_value_data("render_config") func_name = f"render__{source_type}__as__{target_type}" func = getattr(self, func_name) result = func(value=value, render_config=render_scene.dict_data) if isinstance(result, RenderValueResult): render_scene_result: RenderValueResult = result else: render_scene_result = RenderValueResult( value_id=value.value_id, render_config=render_scene, render_manifest=self.manifest.manifest_hash, rendered=result, related_scenes={}, ) render_scene_result.manifest_lookup[self.manifest.manifest_hash] = self.manifest outputs.set_value("render_value_result", render_scene_result) The source code of the process method of the module.

render.table

type_name render.table The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name RenderTableModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.RenderTableModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: source_type = self.get_config_value("source_type") target_type = self.get_config_value("target_type") value: Value = inputs.get_value_obj("value") render_scene: DictModel = inputs.get_value_data("render_config") func_name = f"render__{source_type}__as__{target_type}" func = getattr(self, func_name) result = func(value=value, render_config=render_scene.dict_data) if isinstance(result, RenderValueResult): render_scene_result: RenderValueResult = result else: render_scene_result = RenderValueResult( value_id=value.value_id, render_config=render_scene, render_manifest=self.manifest.manifest_hash, rendered=result, related_scenes={}, ) render_scene_result.manifest_lookup[self.manifest.manifest_hash] = self.manifest outputs.set_value("render_value_result", render_scene_result) The source code of the process method of the module.

export.table

type_name export.table The registered name for this item type.
documentation

Export table data items.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name ExportTableModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.ExportTableModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: target_profile: str = self.get_config_value("target_profile") source_type: str = self.get_config_value("source_type") export_metadata = inputs.get_value_data("export_metadata") source_obj = inputs.get_value_obj(source_type) source = source_obj.data func_name = f"export__{source_type}__as__{target_profile}" if not hasattr(self, func_name): raise Exception( f"Can't export '{source_type}' value: missing function '{func_name}' in class '{self.__class__.__name__}'. Please check this modules documentation or source code to determine which source types and profiles are supported." ) base_path = inputs.get_value_data("base_path") if base_path is None: base_path = os.getcwd() name = inputs.get_value_data("name") if not name: name = str(source_obj.value_id) func = getattr(self, func_name) # TODO: check signature? base_path = os.path.abspath(base_path) os.makedirs(base_path, exist_ok=True) result = func(value=source, base_path=base_path, name=name) if isinstance(result, Mapping): result = DataExportResult(**result) elif isinstance(result, str): result = DataExportResult(files=[result]) if not isinstance(result, DataExportResult): raise KiaraProcessingException( f"Can't export value: invalid result type '{type(result)}' from internal method. This is most likely a bug in the '{self.module_type_name}' module code." ) if export_metadata: metadata_file = Path(os.path.join(base_path, f"{name}.metadata")) value_info = source_obj.create_info() value_json = value_info.json() metadata_file.write_text(value_json) result.files.append(metadata_file.as_posix()) # schema = ValueSchema(type=self.get_target_value_type(), doc="Imported dataset.") # value_lineage = ValueLineage.from_module_and_inputs( # module=self, output_name=output_key, inputs=inputs # ) # value: Value = self._kiara.data_registry.register_data( # value_data=result, value_schema=schema, lineage=None # ) outputs.set_value("export_details", result) The source code of the process method of the module.

load.array

type_name load.array The registered name for this item type.
documentation

Deserialize array data.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name DeserializeArrayModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.array The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.array.DeserializeArrayModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: value_type = self.get_config_value("value_type") serialized_value = inputs.get_value_obj(value_type) config = inputs.get_value_obj("deserialization_config") target_profile = self.get_config_value("target_profile") func_name = f"to__{target_profile}" func = getattr(self, func_name) if config.is_set: _config = config.data else: _config = {} result: Any = func(data=serialized_value.serialized_data, **_config) outputs.set_value("python_object", result) The source code of the process method of the module.

load.database

type_name load.database The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name LoadDatabaseFromDiskModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.db The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.db.LoadDatabaseFromDiskModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: value_type = self.get_config_value("value_type") serialized_value = inputs.get_value_obj(value_type) config = inputs.get_value_obj("deserialization_config") target_profile = self.get_config_value("target_profile") func_name = f"to__{target_profile}" func = getattr(self, func_name) if config.is_set: _config = config.data else: _config = {} result: Any = func(data=serialized_value.serialized_data, **_config) outputs.set_value("python_object", result) The source code of the process method of the module.

load.table

type_name load.table The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name DeserializeTableModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.DeserializeTableModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: value_type = self.get_config_value("value_type") serialized_value = inputs.get_value_obj(value_type) config = inputs.get_value_obj("deserialization_config") target_profile = self.get_config_value("target_profile") func_name = f"to__{target_profile}" func = getattr(self, func_name) if config.is_set: _config = config.data else: _config = {} result: Any = func(data=serialized_value.serialized_data, **_config) outputs.set_value("python_object", result) The source code of the process method of the module.

parse.date_array

type_name parse.date_array The registered name for this item type.
documentation

Create an array of date objects from an array of strings.

This module is very simplistic at the moment, more functionality and options will be added in the future.

At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:

  • 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
  • remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name ExtractDateModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.array The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.array.ExtractDateModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog): import polars as pl import pyarrow as pa from dateutil import parser force_non_null: bool = self.get_data_for_field( field_name="force_non_null", inputs=inputs ) min_pos: Union[None, int] = self.get_data_for_field( field_name="min_index", inputs=inputs ) if min_pos is None: min_pos = 0 max_pos: Union[None, int] = self.get_data_for_field( field_name="max_index", inputs=inputs ) remove_tokens: Iterable[str] = self.get_data_for_field( field_name="remove_tokens", inputs=inputs ) def parse_date(_text: str): text = _text if min_pos: try: text = text[min_pos:] # type: ignore except Exception: return None if max_pos: try: text = text[0 : max_pos - min_pos] # type: ignore # noqa except Exception: pass if remove_tokens: for t in remove_tokens: text = text.replace(t, " ") try: d_obj = parser.parse(text, fuzzy=True) except Exception as e: if force_non_null: raise KiaraProcessingException(e) return None if d_obj is None: if force_non_null: raise KiaraProcessingException( f"Can't parse date from string: {text}" ) return None return d_obj value = inputs.get_value_obj("array") array: KiaraArray = value.data series = pl.Series(name="tokens", values=array.arrow_array) job_log.add_log(f"start parsing date for {len(array)} items") result = series.apply(parse_date) job_log.add_log(f"finished parsing date for {len(array)} items") result_array = result.to_arrow() # TODO: remove this cast once the array data type can handle non-chunked arrays chunked = pa.chunked_array(result_array) outputs.set_values(date_array=chunked) The source code of the process method of the module.

create.database

type_name create.database The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name CreateDatabaseModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.db The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.db.CreateDatabaseModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: source_type = self.get_config_value("source_type") target_type = self.get_config_value("target_type") func_name = f"create__{target_type}__from__{source_type}" func = getattr(self, func_name) source_value = inputs.get_value_obj(source_type) signature = inspect.signature(func) if "optional" in signature.parameters: optional: Dict[str, Value] = {} op_schemas = {} for field, schema in self.inputs_schema.items(): if field == source_type: continue optional[field] = inputs.get_value_obj(field) op_schemas[field] = schema result = func( source_value=source_value, optional=ValueMapReadOnly( value_items=optional, values_schema=op_schemas ), ) else: result = func(source_value=source_value) outputs.set_value(target_type, result) The source code of the process method of the module.

create.table

type_name create.table The registered name for this item type.
documentation

-- n/a --

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name CreateTableModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.CreateTableModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: source_type = self.get_config_value("source_type") target_type = self.get_config_value("target_type") func_name = f"create__{target_type}__from__{source_type}" func = getattr(self, func_name) source_value = inputs.get_value_obj(source_type) signature = inspect.signature(func) if "optional" in signature.parameters: optional: Dict[str, Value] = {} op_schemas = {} for field, schema in self.inputs_schema.items(): if field == source_type: continue optional[field] = inputs.get_value_obj(field) op_schemas[field] = schema result = func( source_value=source_value, optional=ValueMapReadOnly( value_items=optional, values_schema=op_schemas ), ) else: result = func(source_value=source_value) outputs.set_value(target_type, result) The source code of the process method of the module.

query.database

type_name query.database The registered name for this item type.
documentation

Execute a sql query against a (sqlite) database.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name QueryDatabaseModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.db The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.db.QueryDatabaseModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap): import pyarrow as pa database: KiaraDatabase = inputs.get_value_data("database") query = self.get_config_value("query") if query is None: query = inputs.get_value_data("query") # TODO: make this memory efficent result_columns: Dict[str, List[Any]] = {} with database.get_sqlalchemy_engine().connect() as con: result = con.execute(text(query)) for r in result: for k, v in dict(r).items(): result_columns.setdefault(k, []).append(v) table = pa.Table.from_pydict(result_columns) outputs.set_value("query_result", table) The source code of the process method of the module.

table.cut_column

type_name table.cut_column The registered name for this item type.
documentation

Cut off one column from a table, returning an array.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name CutColumnModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.CutColumnModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: import pyarrow as pa column_name: str = inputs.get_value_data("column_name") table_value: Value = inputs.get_value_obj("table") table_metadata: KiaraTableMetadata = table_value.get_property_data( "metadata.table" ) available = table_metadata.table.column_names if column_name not in available: raise KiaraProcessingException( f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}" ) table: pa.Table = table_value.data.arrow_table column = table.column(column_name) outputs.set_value("array", column) The source code of the process method of the module.

table.merge

type_name table.merge The registered name for this item type.
documentation

Create a table from other tables and/or arrays.

This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.

Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value:

  • '[inputs_schema key]' for inputs of type 'array'
  • '[inputs_schema_key].orig_column_name' for inputs of type 'table'
Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name MergeTableModule The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.MergeTableModule The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None: import pyarrow as pa inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema") column_map: Dict[str, str] = self.get_config_value("column_map") sources = {} for field_name in inputs_schema.keys(): sources[field_name] = inputs.get_value_data(field_name) len_dict = {} arrays = {} column_map_final = dict(column_map) for source_key, table_or_array in sources.items(): if isinstance(table_or_array, KiaraTable): rows = table_or_array.num_rows for name in table_or_array.column_names: array_name = f"{source_key}.{name}" if column_map and array_name not in column_map.values(): job_log.add_log( f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map." ) continue column = table_or_array.arrow_table.column(name) arrays[array_name] = column if not column_map: if name in column_map_final: raise Exception( f"Can't merge table, duplicate column name: {name}." ) column_map_final[name] = array_name elif isinstance(table_or_array, KiaraArray): if column_map and source_key not in column_map.values(): job_log.add_log( f"Ignoring array '{source_key}': not listed in column_map." ) continue rows = len(table_or_array) arrays[source_key] = table_or_array.arrow_array if not column_map: if source_key in column_map_final.keys(): raise Exception( f"Can't merge table, duplicate column name: {source_key}." ) column_map_final[source_key] = source_key else: raise KiaraProcessingException( f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'." ) len_dict[source_key] = rows all_rows = None for source_key, rows in len_dict.items(): if all_rows is None: all_rows = rows else: if all_rows != rows: all_rows = None break if all_rows is None: len_str = "" for name, rows in len_dict.items(): len_str = f" {name} ({rows})" raise KiaraProcessingException( f"Can't merge table, sources have different lengths: {len_str}" ) column_names = [] columns = [] for column_name, ref in column_map_final.items(): column_names.append(column_name) column = arrays[ref] columns.append(column) table = pa.Table.from_arrays(arrays=columns, names=column_names) outputs.set_value("table", table) The source code of the process method of the module.

query.table

type_name query.table The registered name for this item type.
documentation

Execute a sql query against an (Arrow) table.

The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.

Documentation for the item.
authors Information about authorship for the item.
context
tags tabular
labels
  • package: kiara_plugin.tabular
references
Generic properties of this item (description, tags, labels, references, ...).
python_class
python_class_name QueryTableSQL The name of the Python class.
python_module_name kiara_plugin.tabular.modules.table The name of the Python module this class lives in.
full_name kiara_plugin.tabular.modules.table.QueryTableSQL The full class namespace.
The python class that implements this module type.
process_src def process(self, inputs: ValueMap, outputs: ValueMap) -> None: import duckdb if self.get_config_value("query") is None: _query: str = inputs.get_value_data("query") _relation_name: str = inputs.get_value_data("relation_name") else: _query = self.get_config_value("query") _relation_name = self.get_config_value("relation_name") if _relation_name.upper() in RESERVED_SQL_KEYWORDS: raise KiaraProcessingException( f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name." ) _table: KiaraTable = inputs.get_value_data("table") rel_from_arrow = duckdb.arrow(_table.arrow_table) result: duckdb.DuckDBPyResult = rel_from_arrow.query(_relation_name, _query) outputs.set_value("query_result", result.fetch_arrow_table()) The source code of the process method of the module.