type_name |
table.filters |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
TableFiltersModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table.filters |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.filters.TableFiltersModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
filter_name: str = self.get_config_value("filter_name")
data_type_data = self.__class__.get_supported_type()
data_type = data_type_data["type"]
# data_type_config = data_type_data["type_config"]
# TODO: ensure value is of the right type?
source_obj = inputs.get_value_obj("value")
func_name = f"filter__{filter_name}"
if not hasattr(self, func_name):
raise Exception(
f"Can't apply filter '{filter_name}': missing function '{func_name}' in class '{self.__class__.__name__}'. Please check this modules documentation or source code to determine which filters are supported."
)
func = getattr(self, func_name)
# TODO: check signature?
filter_inputs = {}
for k, v in inputs.items():
if k == data_type:
continue
filter_inputs[k] = v.data
result = func(value=source_obj, filter_inputs=filter_inputs)
if result is None:
outputs.set_value("value", source_obj)
else:
outputs.set_value("value", result)
|
The source code of the process method of the module. |
type_name |
render.database |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
RenderDatabaseModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.db |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.db.RenderDatabaseModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
source_type = self.get_config_value("source_type")
target_type = self.get_config_value("target_type")
value: Value = inputs.get_value_obj("value")
render_scene: DictModel = inputs.get_value_data("render_config")
func_name = f"render__{source_type}__as__{target_type}"
func = getattr(self, func_name)
result = func(value=value, render_config=render_scene.dict_data)
if isinstance(result, RenderValueResult):
render_scene_result: RenderValueResult = result
else:
render_scene_result = RenderValueResult(
value_id=value.value_id,
render_config=render_scene,
render_manifest=self.manifest.manifest_hash,
rendered=result,
related_scenes={},
)
render_scene_result.manifest_lookup[self.manifest.manifest_hash] = self.manifest
outputs.set_value("render_value_result", render_scene_result)
|
The source code of the process method of the module. |
type_name |
render.table |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
RenderTableModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.RenderTableModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
source_type = self.get_config_value("source_type")
target_type = self.get_config_value("target_type")
value: Value = inputs.get_value_obj("value")
render_scene: DictModel = inputs.get_value_data("render_config")
func_name = f"render__{source_type}__as__{target_type}"
func = getattr(self, func_name)
result = func(value=value, render_config=render_scene.dict_data)
if isinstance(result, RenderValueResult):
render_scene_result: RenderValueResult = result
else:
render_scene_result = RenderValueResult(
value_id=value.value_id,
render_config=render_scene,
render_manifest=self.manifest.manifest_hash,
rendered=result,
related_scenes={},
)
render_scene_result.manifest_lookup[self.manifest.manifest_hash] = self.manifest
outputs.set_value("render_value_result", render_scene_result)
|
The source code of the process method of the module. |
type_name |
export.table |
The registered name for this item type. |
documentation |
Export table data items.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
ExportTableModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.ExportTableModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
target_profile: str = self.get_config_value("target_profile")
source_type: str = self.get_config_value("source_type")
export_metadata = inputs.get_value_data("export_metadata")
source_obj = inputs.get_value_obj(source_type)
source = source_obj.data
func_name = f"export__{source_type}__as__{target_profile}"
if not hasattr(self, func_name):
raise Exception(
f"Can't export '{source_type}' value: missing function '{func_name}' in class '{self.__class__.__name__}'. Please check this modules documentation or source code to determine which source types and profiles are supported."
)
base_path = inputs.get_value_data("base_path")
if base_path is None:
base_path = os.getcwd()
name = inputs.get_value_data("name")
if not name:
name = str(source_obj.value_id)
func = getattr(self, func_name)
# TODO: check signature?
base_path = os.path.abspath(base_path)
os.makedirs(base_path, exist_ok=True)
result = func(value=source, base_path=base_path, name=name)
if isinstance(result, Mapping):
result = DataExportResult(**result)
elif isinstance(result, str):
result = DataExportResult(files=[result])
if not isinstance(result, DataExportResult):
raise KiaraProcessingException(
f"Can't export value: invalid result type '{type(result)}' from internal method. This is most likely a bug in the '{self.module_type_name}' module code."
)
if export_metadata:
metadata_file = Path(os.path.join(base_path, f"{name}.metadata"))
value_info = source_obj.create_info()
value_json = value_info.json()
metadata_file.write_text(value_json)
result.files.append(metadata_file.as_posix())
# schema = ValueSchema(type=self.get_target_value_type(), doc="Imported dataset.")
# value_lineage = ValueLineage.from_module_and_inputs(
# module=self, output_name=output_key, inputs=inputs
# )
# value: Value = self._kiara.data_registry.register_data(
# value_data=result, value_schema=schema, lineage=None
# )
outputs.set_value("export_details", result)
|
The source code of the process method of the module. |
type_name |
load.array |
The registered name for this item type. |
documentation |
Deserialize array data.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
DeserializeArrayModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.array |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.array.DeserializeArrayModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
value_type = self.get_config_value("value_type")
serialized_value = inputs.get_value_obj(value_type)
config = inputs.get_value_obj("deserialization_config")
target_profile = self.get_config_value("target_profile")
func_name = f"to__{target_profile}"
func = getattr(self, func_name)
if config.is_set:
_config = config.data
else:
_config = {}
result: Any = func(data=serialized_value.serialized_data, **_config)
outputs.set_value("python_object", result)
|
The source code of the process method of the module. |
type_name |
load.database |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
LoadDatabaseFromDiskModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.db |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.db.LoadDatabaseFromDiskModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
value_type = self.get_config_value("value_type")
serialized_value = inputs.get_value_obj(value_type)
config = inputs.get_value_obj("deserialization_config")
target_profile = self.get_config_value("target_profile")
func_name = f"to__{target_profile}"
func = getattr(self, func_name)
if config.is_set:
_config = config.data
else:
_config = {}
result: Any = func(data=serialized_value.serialized_data, **_config)
outputs.set_value("python_object", result)
|
The source code of the process method of the module. |
type_name |
load.table |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
DeserializeTableModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.DeserializeTableModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
value_type = self.get_config_value("value_type")
serialized_value = inputs.get_value_obj(value_type)
config = inputs.get_value_obj("deserialization_config")
target_profile = self.get_config_value("target_profile")
func_name = f"to__{target_profile}"
func = getattr(self, func_name)
if config.is_set:
_config = config.data
else:
_config = {}
result: Any = func(data=serialized_value.serialized_data, **_config)
outputs.set_value("python_object", result)
|
The source code of the process method of the module. |
type_name |
parse.date_array |
The registered name for this item type. |
documentation |
Create an array of date objects from an array of strings.
This module is very simplistic at the moment, more functionality and options will be added in the future.
At its core, this module uses the standard parser from the
dateutil package to parse strings into dates. As this parser can't handle
complex strings, the input strings can be pre-processed in the following ways:
- 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
- remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)
By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
will appear as 'NULL' value in the resulting array.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
ExtractDateModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.array |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.array.ExtractDateModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_field(
field_name="force_non_null", inputs=inputs
)
min_pos: Union[None, int] = self.get_data_for_field(
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_for_field(
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_data_for_field(
field_name="remove_tokens", inputs=inputs
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type: ignore
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_pos] # type: ignore # noqa
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=True)
except Exception as e:
if force_non_null:
raise KiaraProcessingException(e)
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from string: {text}"
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=array.arrow_array)
job_log.add_log(f"start parsing date for {len(array)} items")
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date for {len(array)} items")
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
|
The source code of the process method of the module. |
type_name |
create.database |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
CreateDatabaseModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.db |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.db.CreateDatabaseModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
source_type = self.get_config_value("source_type")
target_type = self.get_config_value("target_type")
func_name = f"create__{target_type}__from__{source_type}"
func = getattr(self, func_name)
source_value = inputs.get_value_obj(source_type)
signature = inspect.signature(func)
if "optional" in signature.parameters:
optional: Dict[str, Value] = {}
op_schemas = {}
for field, schema in self.inputs_schema.items():
if field == source_type:
continue
optional[field] = inputs.get_value_obj(field)
op_schemas[field] = schema
result = func(
source_value=source_value,
optional=ValueMapReadOnly(
value_items=optional, values_schema=op_schemas
),
)
else:
result = func(source_value=source_value)
outputs.set_value(target_type, result)
|
The source code of the process method of the module. |
type_name |
create.table |
The registered name for this item type. |
documentation |
-- n/a --
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
CreateTableModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.CreateTableModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
source_type = self.get_config_value("source_type")
target_type = self.get_config_value("target_type")
func_name = f"create__{target_type}__from__{source_type}"
func = getattr(self, func_name)
source_value = inputs.get_value_obj(source_type)
signature = inspect.signature(func)
if "optional" in signature.parameters:
optional: Dict[str, Value] = {}
op_schemas = {}
for field, schema in self.inputs_schema.items():
if field == source_type:
continue
optional[field] = inputs.get_value_obj(field)
op_schemas[field] = schema
result = func(
source_value=source_value,
optional=ValueMapReadOnly(
value_items=optional, values_schema=op_schemas
),
)
else:
result = func(source_value=source_value)
outputs.set_value(target_type, result)
|
The source code of the process method of the module. |
type_name |
query.database |
The registered name for this item type. |
documentation |
Execute a sql query against a (sqlite) database.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
QueryDatabaseModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.db |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.db.QueryDatabaseModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap):
import pyarrow as pa
database: KiaraDatabase = inputs.get_value_data("database")
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, []).append(v)
table = pa.Table.from_pydict(result_columns)
outputs.set_value("query_result", table)
|
The source code of the process method of the module. |
type_name |
table.cut_column |
The registered name for this item type. |
documentation |
Cut off one column from a table, returning an array.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
CutColumnModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.CutColumnModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: str = inputs.get_value_data("column_name")
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)
|
The source code of the process method of the module. |
type_name |
table.merge |
The registered name for this item type. |
documentation |
Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
MergeTableModule |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.MergeTableModule |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
else:
if all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)
|
The source code of the process method of the module. |
type_name |
query.table |
The registered name for this item type. |
documentation |
Execute a sql query against an (Arrow) table.
The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.
If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
one will be used.
|
Documentation for the item. |
authors |
|
Information about authorship for the item. |
context |
tags |
tabular |
labels |
- package: kiara_plugin.tabular
|
references |
|
|
Generic properties of this item (description, tags, labels, references, ...). |
python_class |
python_class_name |
QueryTableSQL |
The name of the Python class. |
python_module_name |
kiara_plugin.tabular.modules.table |
The name of the Python module this class lives in. |
full_name |
kiara_plugin.tabular.modules.table.QueryTableSQL |
The full class namespace. |
|
The python class that implements this module type. |
process_src |
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("query")
_relation_name: str = inputs.get_value_data("relation_name")
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value("relation_name")
if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
raise KiaraProcessingException(
f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
)
_table: KiaraTable = inputs.get_value_data("table")
rel_from_arrow = duckdb.arrow(_table.arrow_table)
result: duckdb.DuckDBPyResult = rel_from_arrow.query(_relation_name, _query)
outputs.set_value("query_result", result.fetch_arrow_table())
|
The source code of the process method of the module. |