module_types
table.filters
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
filter_n… string The name yes
of the
filter.
Python class
python_class_name TableFiltersModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class TableFiltersModule(FilterModule):
_module_type_name = "table.filters"
@classmethod
def retrieve_supported_type(cls) -> Union[Dict…
return "table"
def create_filter_inputs(self, filter_name: st…
if filter_name in ["select_columns", "drop…
return {
"columns": {
"type": "list",
"doc": "The name of the column…
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore inva…
"default": True,
},
}
elif filter_name == "select_rows":
return {
"match": {
"type": "string",
"doc": "The string token to ma…
"optional": True,
},
"case_insensitive": {
"type": "boolean",
"doc": "Whether to ignore case…
"default": True,
},
}
return None
def filter__select_columns(self, value: Value,…
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_inv…
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.colu…
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{co…
)
column = arrow_table.column(column_nam…
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_colu…
def filter__drop_columns(self, value: Value, f…
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_inv…
column_names_to_ignore = filter_inputs["co…
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.colu…
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{co…
)
_column_names = []
_columns = []
for column_name in arrow_table.column_name…
if column_name in column_names_to_igno…
continue
column = arrow_table.column(column_nam…
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_colu…
def filter__select_rows(self, value: Value, fi…
match = filter_inputs.get("match", None)
if not match:
return value
case_insensitive = filter_inputs.get("case…
import duckdb
_table: KiaraTable = value.data
rel_from_arrow = duckdb.arrow(_table.arrow…
if case_insensitive:
# query_tokens = [f"LOWER({c}) GLOB LO…
query_tokens = [
f"regexp_matches(LOWER({c}), LOWER…
for c in rel_from_arrow.columns
]
else:
query_tokens = [
f"regexp_matches({c}, '{match}')" …
]
query = " OR ".join(query_tokens)
result = rel_from_arrow.filter(query)
return result.arrow()
─────────────────────────────────────────────────────
render.database
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The yes
(kiara)
data type
to be
rendered.
target_t… string The yes
(kiara)
data type
of210 the
rendered
result.
Python class
python_class_name RenderDatabaseModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class RenderDatabaseModule(RenderDatabaseModuleBas…
_module_type_name = "render.database"
def render__database__as__string(
self, value: Value, render_config: Mapping…
):
input_number_of_rows = render_config.get("…
input_row_offset = render_config.get("row_…
table_name = render_config.get("table_name…
wrap, data_related_scenes = self.preproces…
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_r…
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
rendered=pretty,
related_scenes=data_related_scenes,
render_config=render_config,
render_manifest=self.manifest.manifest…
)
def render__database__as__terminal_renderable(
self, value: Value, render_config: Mapping…
):
input_number_of_rows = render_config.get("…
input_row_offset = render_config.get("row_…
table_name = render_config.get("table_name…
wrap, data_related_scenes = self.preproces…
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_r…
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_r…
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
rendered=pretty,
related_scenes=data_related_scenes,
render_manifest=self.manifest.manifest…
)
─────────────────────────────────────────────────────
render.table
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The yes
(kiara)
data type
to be
rendered.
target_t… string The yes
(kiara)
data type
of210 the
rendered
result.
Python class
python_class_name RenderTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class RenderTableModule(RenderTableModuleBase):
_module_type_name = "render.table"
def render__table__as__string(self, value: Val…
input_number_of_rows = render_config.get("…
input_row_offset = render_config.get("row_…
wrap, data_related_scenes = self.preproces…
value=value,
input_number_of_rows=input_number_of_r…
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest…
rendered=pretty,
related_scenes=data_related_scenes,
)
def render__table__as__terminal_renderable(
self, value: Value, render_config: Mapping…
):
input_number_of_rows = render_config.get("…
input_row_offset = render_config.get("row_…
wrap, data_related_scenes = self.preproces…
value=value,
input_number_of_rows=input_number_of_r…
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_r…
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest…
rendered=pretty,
related_scenes=data_related_scenes,
)
─────────────────────────────────────────────────────
export.database
Documentation
Export database values.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The type yes
of the
source
data that
is going
to be
exported.
target_p… string The name yes
of the
target
profile.
Used to
distingu…
different
target
formats
for the
same data
type.
Python class
python_class_name ExportNetworkDataModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class ExportNetworkDataModule(DataExportModule):
"""Export database values."""
_module_type_name = "export.database"
def export__database__as__sqlite_db(
self, value: KiaraDatabase, base_path: str…
):
"""Export network data as a sqlite databas…
target_path = os.path.abspath(os.path.join…
shutil.copy2(value.db_file_path, target_pa…
return {"files": target_path}
def export__database__as__sql_dump(
self, value: KiaraDatabase, base_path: str…
):
"""Export network data as a sql dump file.…
import sqlite_utils
db = sqlite_utils.Database(value.db_file_p…
target_path = Path(os.path.join(base_path,…
with target_path.open("wt") as f:
for line in db.conn.iterdump():
f.write(line + "\n")
return {"files": target_path.as_posix()}
def export__database__as__csv_files(
self, value: KiaraDatabase, base_path: str…
):
"""Export network data as 2 csv files (one…
import sqlite3
files = []
for table_name in value.table_names:
target_path = os.path.join(base_path, …
os.makedirs(os.path.dirname(target_pat…
# copied from: https://stackoverflow.c…
con = sqlite3.connect(value.db_file_pa…
outfile = open(target_path, "wt")
outcsv = csv.writer(outfile)
cursor = con.execute(f"select * from {…
# dump column titles (optional)
outcsv.writerow(x[0] for x in cursor.d…
# dump rows
outcsv.writerows(cursor.fetchall())
outfile.close()
files.append(target_path)
return {"files": files}
─────────────────────────────────────────────────────
export.table
Documentation
Export table data items.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The type yes
of the
source
data that
is going
to be
exported.
target_p… string The name yes
of the
target
profile.
Used to
distingu…
different
target
formats
for the
same data
type.
Python class
python_class_name ExportTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class ExportTableModule(DataExportModule):
"""Export table data items."""
_module_type_name = "export.table"
def export__table__as__csv_file(self, value: K…
"""Export a table as csv file."""
from pyarrow import csv
target_path = os.path.join(base_path, f"{n…
csv.write_csv(value.arrow_table, target_pa…
return {"files": target_path}
# def export__table__as__sqlite_db(
# self, value: KiaraTable, base_path: str,…
# ):
#
# target_path = os.path.abspath(os.path.jo…
#
# raise NotImplementedError()
# # shutil.copy2(value.db_file_path, targe…
#
# return {"files": target_path}
─────────────────────────────────────────────────────
export.tables
Documentation
Export network data items.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The type yes
of the
source
data that
is going
to be
exported.
target_p… string The name yes
of the
target
profile.
Used to
distingu…
different
target
formats
for the
same data
type.
Python class
python_class_name ExportNetworkDataModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class ExportNetworkDataModule(DataExportModule):
"""Export network data items."""
_module_type_name = "export.tables"
# def export__network_data__as__graphml_file(
# self, value: NetworkData, base_path: str…
# ):
# """Export network data as graphml file."…
#
# import networkx as nx
#
# target_path = os.path.join(base_path, f"…
#
# # TODO: can't just assume digraph
# graph: nx.Graph = value.as_networkx_grap…
# nx.write_graphml(graph, target_path)
#
# return {"files": target_path}
#
def export__tables__as__sqlite_db(
self, value: KiaraTables, base_path: str, …
):
"""Export network data as a sqlite databas…
from kiara_plugin.tabular.utils.tables imp…
db = create_database_from_tables(tables=va…
target_path = os.path.abspath(os.path.join…
shutil.move(db.db_file_path, target_path)
return {"files": target_path}
def export__tables__as__sql_dump(
self, value: KiaraTables, base_path: str, …
):
"""Export network data as a sql dump file.…
import sqlite_utils
from kiara_plugin.tabular.utils.tables imp…
kiara_db = create_database_from_tables(tab…
db = sqlite_utils.Database(kiara_db.db_fil…
target_path = Path(os.path.join(base_path,…
with target_path.open("wt") as f:
for line in db.conn.iterdump():
f.write(line + "\n")
return {"files": target_path.as_posix()}
def export__tables__as__csv_files(
self, value: KiaraTables, base_path: str, …
):
"""Export network data as 2 csv files (one…
from pyarrow import csv
files = []
for table_name in value.table_names:
target_path = os.path.join(base_path, …
os.makedirs(os.path.dirname(target_pat…
table = value.get_table(table_name)
csv.write_csv(table.arrow_table, targe…
files.append(target_path)
return {"files": files}
─────────────────────────────────────────────────────
load.array
Documentation
Deserialize array data.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
serializ… string The name yes
of the
serializ…
profile
used to
serialize
the
source
value.
target_p… string The yes
profile
name of
the
de-seria…
result
data.
value_ty… string The value yes
type of
the
actual
(unseria…
value.
Python class
python_class_name DeserializeArrayModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class DeserializeArrayModule(DeserializeValueModul…
"""Deserialize array data."""
_module_type_name = "load.array"
@classmethod
def retrieve_supported_target_profiles(cls) ->…
return {"python_object": KiaraArray}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "array"
@classmethod
def retrieve_supported_serialization_profile(c…
return "feather"
def to__python_object(self, data: SerializedDa…
assert "array.arrow" in data.get_keys() an…
chunks = data.get_serialized_data("array.a…
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=Tr…
assert len(files) == 1
array_file = files[0]
array = KiaraArray(data_path=array_file)
return array
─────────────────────────────────────────────────────
load.database
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
serializ… string The name yes
of the
serializ…
profile
used to
serialize
the
source
value.
target_p… string The yes
profile
name of
the
de-seria…
result
data.
value_ty… string The value yes
type of
the
actual
(unseria…
value.
Python class
python_class_name LoadDatabaseFromDiskModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class LoadDatabaseFromDiskModule(DeserializeValueM…
_module_type_name = "load.database"
@classmethod
def retrieve_supported_target_profiles(cls) ->…
return {"python_object": KiaraDatabase}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "database"
@classmethod
def retrieve_supported_serialization_profile(c…
return "copy"
def to__python_object(self, data: SerializedDa…
assert "db.sqlite" in data.get_keys() and …
chunks = data.get_serialized_data("db.sqli…
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=Tr…
assert len(files) == 1
db_file = files[0]
db = KiaraDatabase(db_file_path=db_file)
return db
─────────────────────────────────────────────────────
load.table
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
serializ… string The name yes
of the
serializ…
profile
used to
serialize
the
source
value.
target_p… string The yes
profile
name of
the
de-seria…
result
data.
value_ty… string The value yes
type of
the
actual
(unseria…
value.
Python class
python_class_name DeserializeTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class DeserializeTableModule(DeserializeValueModul…
_module_type_name = "load.table"
@classmethod
def retrieve_supported_target_profiles(cls) ->…
return {"python_object": KiaraTable}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "table"
@classmethod
def retrieve_supported_serialization_profile(c…
return "feather"
def to__python_object(self, data: SerializedDa…
import pyarrow as pa
columns = {}
table_schema_chunks = data.get_serialized_…
chunks_generator = table_schema_chunks.get…
schema_chunk = next(chunks_generator) # t…
schema = pa.ipc.read_schema(pa.py_buffer(s…
for column_name in data.get_keys():
if column_name == TABLE_SCHEMA_CHUNKS_…
continue
chunks = data.get_serialized_data(colu…
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() =…
files = list(chunks.get_chunks(as_file…
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as colum…
loaded_arrays: pa.Table = pa.ipc.o…
column = loaded_arrays.column(colu…
if column_name == EMPTY_COLUMN_NAM…
columns[""] = column
else:
columns[column_name] = column
arrow_table = pa.table(columns, schema=sch…
table = KiaraTable.create_table(arrow_tabl…
return table
─────────────────────────────────────────────────────
load.tables
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
serializ… string The name yes
of the
serializ…
profile
used to
serialize
the
source
value.
target_p… string The yes
profile
name of
the
de-seria…
result
data.
value_ty… string The value yes
type of
the
actual
(unseria…
value.
Python class
python_class_name DeserializeTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class DeserializeTableModule(DeserializeValueModul…
_module_type_name = "load.tables"
@classmethod
def retrieve_supported_target_profiles(cls) ->…
return {"python_object": KiaraTables}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "tables"
@classmethod
def retrieve_supported_serialization_profile(c…
return "feather"
def to__python_object(self, data: SerializedDa…
import pyarrow as pa
tables: Dict[str, Any] = {}
for column_id in data.get_keys():
if TABLE_COLUMN_SPLIT_MARKER not in co…
raise KiaraException(
f"Invalid serialized 'tables' …
)
table_id, column_name = column_id.spli…
TABLE_COLUMN_SPLIT_MARKER, maxspli…
)
chunks = data.get_serialized_data(colu…
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() =…
files = list(chunks.get_chunks(as_file…
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as colum…
loaded_arrays: pa.Table = pa.ipc.o…
column = loaded_arrays.column(colu…
tables.setdefault(table_id, {})[co…
table = KiaraTables.create_tables(tables)
return table
─────────────────────────────────────────────────────
parse.date_array
Documentation
Create an array of date objects from an array of
strings.
This module is very simplistic at the moment, more
functionality and options will be added in the
future.
At its core, this module uses the standard parser
from the dateutil package to parse strings into
dates. As this parser can't handle complex strings,
the input strings can be pre-processed in the
following ways:
• 'cut' non-relevant parts of the string (using
'min_index' & 'max_index' input/config options)
• remove matching tokens from the string, and
replace them with a single whitespace (using the
'remove_tokens' option)
By default, if an input string can't be parsed this
module will raise an exception. This can be
prevented by setting this modules 'force_non_null'
config option or input to 'False', in which case
un-parsable strings will appear as 'NULL' value in
the resulting array.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
add_inp… boolean If set to no true
'True',
parse
options
will be
available
as
inputs.
constan… object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
force_n… boolean If set to no true
'True',
raise an
error if
any of
the
strings
in the
array
can't be
parsed.
input_f… array If not no
empty,
only add
the
fields
specified
in here
to the
module
inputs
schema.
max_ind… integer The no
maximum
index
until
whic to
parse the
string(s…
min_ind… integer The no
minimum
index
from
where to
start
parsing
the
string(s…
remove_… array A list of no
tokens/c…
to
replace
with a
single
white-sp…
before
parsing
the
input.
Python class
python_class_name ExtractDateModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class ExtractDateModule(AutoInputsKiaraModule):
"""Create an array of date objects from an arr…
This module is very simplistic at the moment, …
At its core, this module uses the standard par…
[dateutil](https://github.com/dateutil/dateuti…
complex strings, the input strings can be pre…
- 'cut' non-relevant parts of the string (usin…
- remove matching tokens from the string, and …
By default, if an input string can't be parsed…
setting this modules 'force_non_null' config o…
will appear as 'NULL' value in the resulting a…
"""
_module_type_name = "parse.date_array"
_config_cls = ExtractDateConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {"array": {"type": "array", "doc"…
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"date_array": {
"type": "array",
"doc": "The resulting array with i…
}
}
def process(self, inputs: ValueMap, outputs: V…
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_f…
field_name="force_non_null", inputs=in…
)
min_pos: Union[None, int] = self.get_data_…
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_…
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_da…
field_name="remove_tokens", inputs=inp…
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type:…
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_…
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=T…
except Exception as e:
if force_non_null:
raise KiaraProcessingException…
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from st…
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=a…
job_log.add_log(f"start parsing date for {…
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date fo…
result_array = result.to_arrow()
# TODO: remove this cast once the array da…
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
─────────────────────────────────────────────────────
create.database
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
constants object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
ignore_e… boolean Whether no false
to
ignore
convert
errors
and omit
the
failed
items.
include_… boolean When no false
includi…
source
metadat…
whether
to also
include
the
original
raw
(string)
content.
include_… boolean Whether no
to
include
a table
with
metadata
about
the
source
files.
merge_in… boolean Whether no false
to merge
all csv
files
into a
single
table.
source_t… string The yes
value
type of
the
source
value.
target_t… string The yes
value
type of
the
target.
Python class
python_class_name CreateDatabaseModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.database"
_config_cls = CreateDatabaseModuleConfig
def create__database__from__file(
self, source_value: Value, optional: Value…
) -> Any:
"""Create a database from a file.
Currently, only csv files are supported.
"""
import csv as py_csv
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=T…
atexit.register(cleanup)
file_item: KiaraFile = source_value.data
if not file_item.file_name.endswith(".csv"…
raise KiaraProcessingException(
"Only csv files are supported (at …
)
table_name = file_item.file_name_without_e…
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
has_header = optional.get_value_data("firs…
if has_header is None:
try:
has_header = True
with open(source_value.data.path, …
sniffer = py_csv.Sniffer()
has_header = sniffer.has_heade…
csvfile.seek(0)
except Exception as e:
# TODO: add this to the procss log
log_message(
"csv_sniffer.error",
file=source_value.data.path,
error=str(e),
details="assuming csv file has…
)
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path,
file_item=file_item,
table_name=table_name,
no_headers=not has_header,
)
except Exception as e:
if self.get_config_value("ignore_error…
log_message("ignore.import_file", …
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = s…
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_pat…
db.create_if_not_exists()
include_content: bool = self.get_confi…
db._unlock_db()
included_files = {file_item.file_name:…
file_bundle = KiaraFileBundle.create_f…
files=included_files, bundle_name=…
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create__database__from__file_bundle(
self, source_value: Value, job_log: JobLog
) -> Any:
"""Create a database from a file_bundle va…
Currently, only csv files are supported, f…
Unless 'merge_into_single_table' is set to…
in the resulting database. If this option …
csv files will be created. For this to wor…
"""
merge_into_single_table = self.get_config_…
if merge_into_single_table:
raise NotImplementedError("Not support…
include_raw_content_in_file_info: Union[bo…
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=T…
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = []
included_files: Dict[str, bool] = {}
errors: Dict[str, Union[None, str]] = {}
for rel_path in sorted(bundle.included_fil…
if not rel_path.endswith(".csv"):
job_log.add_log(
f"Ignoring file (not csv): {re…
)
included_files[rel_path] = False
errors[rel_path] = "Not a csv file…
continue
file_item = bundle.included_files[rel_…
table_name = find_free_id(
stem=file_item.file_name_without_e…
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_f…
target_db_file=db_path, file_i…
)
included_files[rel_path] = True
except Exception as e:
included_files[rel_path] = False
errors[rel_path] = KiaraException.…
if self.get_config_value("ignore_e…
log_message("ignore.import_fil…
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [No…
include_content: bool = self.get_confi…
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
included_files=included_files,
errors=errors,
)
db._lock_db()
return db_path
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], No…
inputs = {}
if source_type == "file":
inputs["first_row_is_header"] = {
"type": "boolean",
"optional": True,
"doc": "Whether the first row of t…
}
if target_type == "database" and source_ty…
inputs["table_name"] = {
"type": "string",
"doc": "The name of the table in t…
"default": "imported_table",
}
return inputs
def create__database__from__tables(
self, source_value: Value, optional: Value…
) -> Any:
"""Create a database value from a list of …
from kiara_plugin.tabular.utils.tables imp…
tables: KiaraTables = source_value.data
db = create_database_from_tables(tables=ta…
return db
def create__database__from__table(
self, source_value: Value, optional: Value…
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("tabl…
if not table_name:
table_name = DEFAULT_TABLE_NAME
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_…
table=arrow_table, index_columns=index…
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_…
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA…
):
conn.execute(insert(_table), batch…
conn.commit()
db._lock_db()
return db
─────────────────────────────────────────────────────
create.table
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
constants object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
ignore_e… boolean Whether no false
to
ignore
convert
errors
and omit
the
failed
items.
source_t… string The yes
value
type of
the
source
value.
target_t… string The yes
value
type of
the
target.
Python class
python_class_name CreateTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class CreateTableModule(CreateFromModule):
_module_type_name = "create.table"
_config_cls = CreateTableModuleConfig
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], No…
if source_type == "file":
return {
"first_row_is_header": {
"type": "boolean",
"optional": True,
"doc": "Whether the first row …
}
}
return None
def create__table__from__file(self, source_val…
"""Create a table from a file, trying to a…
import csv as py_csv
from pyarrow import csv
input_file: KiaraFile = source_value.data
imported_data = None
errors = []
has_header = optional.get_value_data("firs…
if has_header is None:
try:
has_header = True
with open(input_file.path, "rt") a…
sniffer = py_csv.Sniffer()
has_header = sniffer.has_heade…
csvfile.seek(0)
except Exception as e:
# TODO: add this to the procss log
log_message(
"csv_sniffer.error",
file=input_file.path,
error=str(e),
details="assuming csv file has…
)
try:
if has_header:
imported_data = csv.read_csv(input…
else:
read_options = csv.ReadOptions(aut…
imported_data = csv.read_csv(input…
except Exception as e:
errors.append(e)
if imported_data is None:
raise KiaraProcessingException(
f"Failed to import file '{input_fi…
)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_da…
# def create__table__from__csv_file(self, sour…
# """Create a table from a csv_file value.…
#
# from pyarrow import csv
#
# input_file: FileModel = source_value.data
# imported_data = csv.read_csv(input_file.…
#
# # import pandas as pd
# # df = pd.read_csv(input_file.path)
# # imported_data = pa.Table.from_pandas(d…
#
# return KiaraTable.create_table(imported_…
def create__table__from__file_bundle(self, sou…
"""Create a table value from a text file_b…
The resulting table will have (at a minimu…
- id: an auto-assigned index
- rel_path: the relative path of the file …
- content: the text file content
"""
import pyarrow as pa
bundle: KiaraFileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COL…
ignore_errors = self.get_config_value("ign…
file_dict = bundle.read_text_file_contents…
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorte…
if column == "content":
_value: Any = file_dict[rel_pa…
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_f…
_value = getattr(file_model, c…
tabular.setdefault(column, []).app…
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)
─────────────────────────────────────────────────────
create.tables
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
constants object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
ignore_e… boolean Whether no false
to
ignore
convert
errors
and omit
the
failed
items.
include_… boolean When no false
includi…
source
metadat…
whether
to also
include
the
original
raw
(string)
content.
include_… boolean Whether no
to
include
a table
with
metadata
about
the
source
files.
source_t… string The yes
value
type of
the
source
value.
target_t… string The yes
value
type of
the
target.
Python class
python_class_name CreateDatabaseModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.tables"
_config_cls = CreateTablesModuleConfig
def create__tables__from__file_bundle(
self, source_value: Value, job_log: JobLog
) -> Any:
"""Create a database from a file_bundle va…
Currently, only csv files are supported, f…
Unless 'merge_into_single_table' is set to…
in the resulting database. If this option …
csv files will be created. For this to wor…
"""
from pyarrow import csv as pa_csv
include_raw_content_in_file_info: Union[bo…
"include_source_metadata"
)
tables = {}
bundle: KiaraFileBundle = source_value.data
table_names: List[str] = []
included_files: Dict[str, bool] = {}
errors: Dict[str, Union[None, str]] = {}
for rel_path in sorted(bundle.included_fil…
if not rel_path.endswith(".csv"):
job_log.add_log(
f"Ignoring file (not csv): {re…
)
included_files[rel_path] = False
errors[rel_path] = "Not a csv file…
continue
file_item = bundle.included_files[rel_…
table_name = find_free_id(
stem=file_item.file_name_without_e…
)
try:
table_names.append(table_name)
table = pa_csv.read_csv(file_item.…
tables[table_name] = table
included_files[rel_path] = True
except Exception as e:
included_files[rel_path] = False
errors[rel_path] = KiaraException.…
if self.get_config_value("ignore_e…
log_message("ignore.import_fil…
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [No…
include_content: bool = self.get_confi…
if "file_items" in tables:
raise KiaraProcessingException(
"Can't create table: 'file_ite…
)
table = create_table_from_file_bundle(
file_bundle=source_value.data,
include_content=include_content,
included_files=included_files,
errors=errors,
)
tables["file_items"] = table
return tables
─────────────────────────────────────────────────────
query.database
Documentation
Execute a sql query against a (sqlite) database.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
query string The no
query.
Python class
python_class_name QueryDatabaseModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class QueryDatabaseModule(KiaraModule):
"""Execute a sql query against a (sqlite) data…
_config_cls = QueryDatabaseConfig
_module_type_name = "query.database"
def create_inputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {
"database": {"type": "database", "doc"…
}
if not self.get_config_value("query"):
result["query"] = {"type": "string", "…
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", …
def process(self, inputs: ValueMap, outputs: V…
import pyarrow as pa
database: KiaraDatabase = inputs.get_value…
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().conn…
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, […
table = pa.Table.from_pydict(result_column…
outputs.set_value("query_result", table)
─────────────────────────────────────────────────────
table.pick.column
Documentation
Pick one column from a table, returning an array.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
column_n… string A no
hardcoded
column
name to
cut.
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
Python class
python_class_name PickColumnModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class PickColumnModule(KiaraModule):
"""Pick one column from a table, returning an …
_module_type_name = "table.pick.column"
_config_cls = PickColumnModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Dict[str, Any] = {"table": {"type"…
column_name = self.get_config_value("colum…
if not column_name:
inputs["column_name"] = {
"type": "string",
"doc": "The name of the column to …
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"t…
return outputs
def process(self, inputs: ValueMap, outputs: V…
import pyarrow as pa
column_name: Union[str, None] = self.get_c…
if not column_name:
column_name = inputs.get_value_data("c…
if not column_name:
raise KiaraProcessingException(
"Could not cut column from table: …
)
table_value: Value = inputs.get_value_obj(…
table_metadata: KiaraTableMetadata = table…
"metadata.table"
)
available = table_metadata.table.column_na…
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_nam…
)
table: pa.Table = table_value.data.arrow_t…
column = table.column(column_name)
outputs.set_value("array", column)
─────────────────────────────────────────────────────
table.merge
Documentation
Create a table from other tables and/or arrays.
This module needs configuration to be set (for
now). It's currently not possible to merge an
arbitrary number of tables/arrays, all tables to be
merged must be specified in the module
configuration.
Column names of the resulting table can be
controlled by the 'column_map' configuration, which
takes the desired column name as key, and a
field-name in the following format as value:
• '[inputs_schema key]' for inputs of type 'array'
• '[inputs_schema_key].orig_column_name' for
inputs of type 'table'
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
column_m… object A map no
describi…
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
inputs_s… object A dict yes
describi…
the
inputs
for this
merge
process.
Python class
python_class_name MergeTableModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class MergeTableModule(KiaraModule):
"""Create a table from other tables and/or arr…
This module needs configuration to be set (for…
number of tables/arrays, all tables to be merg…
Column names of the resulting table can be con…
desired column name as key, and a field-name i…
- '[inputs_schema key]' for inputs of type 'ar…
- '[inputs_schema_key].orig_column_name' for i…
"""
_module_type_name = "table.merge"
_config_cls = MergeTableConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_dict = self.get_config_value(…
return input_schema_dict
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, includin…
}
}
return outputs
def process(self, inputs: ValueMap, outputs: V…
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_c…
column_map: Dict[str, str] = self.get_conf…
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value…
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.…
if isinstance(table_or_array, KiaraTab…
rows = table_or_array.num_rows
for name in table_or_array.column_…
array_name = f"{source_key}.{n…
if column_map and array_name n…
job_log.add_log(
f"Ignoring column '{na…
)
continue
column = table_or_array.arrow_…
arrays[array_name] = column
if not column_map:
if name in column_map_fina…
raise Exception(
f"Can't merge tabl…
)
column_map_final[name] = a…
elif isinstance(table_or_array, KiaraA…
if column_map and source_key not i…
job_log.add_log(
f"Ignoring array '{source_…
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_arra…
if not column_map:
if source_key in column_map_fi…
raise Exception(
f"Can't merge table, d…
)
column_map_final[source_key] =…
else:
raise KiaraProcessingException(
f"Can't merge table: invalid t…
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
elif all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have …
)
column_names = []
columns = []
for column_name, ref in column_map_final.i…
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=column…
outputs.set_value("table", table)
─────────────────────────────────────────────────────
query.table
Documentation
Execute a sql query against an (Arrow) table.
The default relation name for the sql query is
'data', but can be modified by the 'relation_name'
config option/input.
If the 'query' module config option is not set,
users can provide their own query, otherwise the
pre-set one will be used.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
query string The query no
to
execute.
If not
specifie…
the user
will be
able to
provide
their
own.
relation… string The name no "data"
the table
is
referred
to in the
sql
query. If
not
specifie…
the user
will be
able to
provide
their
own.
Python class
python_class_name QueryTableSQL
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class QueryTableSQL(KiaraModule):
"""Execute a sql query against an (Arrow) tabl…
The default relation name for the sql query is…
If the 'query' module config option is not set…
one will be used.
"""
_module_type_name = "query.table"
_config_cls = QueryTableSQLModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {
"table": {
"type": "table",
"doc": "The table to query",
}
}
if self.get_config_value("query") is None:
inputs["query"] = {
"type": "string",
"doc": "The query, use the value o…
}
inputs["relation_name"] = {
"type": "string",
"doc": "The name the table is refe…
"default": "data",
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", …
def process(self, inputs: ValueMap, outputs: V…
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("q…
_relation_name: str = inputs.get_value…
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value…
if _relation_name.upper() in RESERVED_SQL_…
raise KiaraProcessingException(
f"Invalid relation name '{_relatio…
)
_table: KiaraTable = inputs.get_value_data…
rel_from_arrow = duckdb.arrow(_table.arrow…
result: duckdb.DuckDBPyRelation = rel_from…
outputs.set_value("query_result", result.a…
─────────────────────────────────────────────────────
assemble.tables
Documentation
Assemble a 'tables' value from multiple tables.
Depending on the module configuration, 2 or more
tables can be merged into a single 'tables' value.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags tabular
Labels package: kiara_plugin.tabular
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constan… object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
number_… integer How many no
tables
should be
merged.
If
'table_n…
is empty,
this
defaults
to '2',
otherwise
the
length of
the
'table_n…
input.
table_n… array A no
pre-defi…
list of
table
names. If
not
defined,
users
will be
asked for
the table
name(s).
Python class
python_class_name AssembleTablesModule
python_module_name kiara_plugin.tabular.modules.…
full_name kiara_plugin.tabular.modules.…
Processing source code ─────────────────────────────────────────────────────
class AssembleTablesModule(KiaraModule):
"""Assemble a 'tables' value from multiple tab…
Depending on the module configuration, 2 or mo…
"""
_module_type_name = "assemble.tables"
_config_cls = AssembleTablesConfig
@functools.cached_property
def _table_details(self) -> Tuple[int, Union[L…
number_tables: Union[int, None] = self.get…
table_names: Union[None, List[str]] = self…
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length…
)
if number_tables < 2:
raise KiaraException("The 'number_of_t…
return number_tables, table_names
@property
def number_of_tables(self) -> int:
number_tables, _ = self._table_details
return number_tables
@property
def table_names(self) -> Union[List[str], None…
_, table_names = self._table_details
return table_names
def create_inputs_schema(
self,
) -> ValueMapSchema:
number_tables = self.number_of_tables
table_names = self.table_names
if not table_names:
if not number_tables:
number_tables = 2
elif not number_tables:
number_tables = len(table_names)
elif not number_tables == len(table_names):
raise KiaraException(
"The 'number_of_tables' and length…
)
if number_tables < 2:
raise KiaraException("The 'number_of_t…
inputs_schema = {}
if not table_names:
for i in range(1, number_tables + 1):
inputs_schema[f"table_name_{i}"] =…
"type": "string",
"doc": f"The alias for table #…
}
inputs_schema[f"table_{i}"] = {
"type": "table",
"doc": f"The table to merge (#…
}
else:
for table_name in table_names:
inputs_schema[f"table_{table_name}…
"type": "table",
"doc": f"The table to merge fo…
}
return inputs_schema
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"tables": {
"type": "tables",
"doc": "The assembled tables insta…
}
}
return outputs
def process(self, inputs: ValueMap, outputs: V…
number_tables = self.number_of_tables
table_names = self.table_names
tables: Dict[str, Any] = {}
if not table_names:
for i in range(1, number_tables + 1):
table_name = inputs.get_value_data…
table = inputs.get_value_obj(f"tab…
if table_name in tables.keys():
raise KiaraException(f"Duplica…
tables[table_name] = table
else:
for table_name in table_names:
table = inputs.get_value_obj(f"tab…
tables[table_name] = table
outputs.set_value("tables", tables)
─────────────────────────────────────────────────────