modules
Modules¶
array
special
¶
FORCE_NON_NULL_DOC
¶
MAX_INDEX_DOC
¶
MIN_INDEX_DOC
¶
REMOVE_TOKENS_DOC
¶
Classes¶
DeserializeArrayModule (DeserializeValueModule)
¶
Deserialize array data.
Source code in tabular/modules/array/__init__.py
class DeserializeArrayModule(DeserializeValueModule):
"""Deserialize array data."""
_module_type_name = "load.array"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraArray}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "array"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
def to__python_object(self, data: SerializedData, **config: Any):
assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("array.arrow")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
array_file = files[0]
array = KiaraArray(data_path=array_file)
return array
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "array"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraArray}
to__python_object(self, data, **config)
¶Source code in tabular/modules/array/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("array.arrow")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
array_file = files[0]
array = KiaraArray(data_path=array_file)
return array
ExtractDateConfig (KiaraInputsConfig)
pydantic-model
¶
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):
force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
min_index: Union[None, int] = Field(
description=MIN_INDEX_DOC,
default=None,
)
max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
remove_tokens: List[str] = Field(
description=REMOVE_TOKENS_DOC, default_factory=list
)
Attributes¶
force_non_null: bool
pydantic-field
¶If set to 'True', raise an error if any of the strings in the array can't be parsed.
max_index: int
pydantic-field
¶The maximum index until whic to parse the string(s).
min_index: int
pydantic-field
¶The minimum index from where to start parsing the string(s).
remove_tokens: List[str]
pydantic-field
¶A list of tokens/characters to replace with a single white-space before parsing the input.
ExtractDateModule (AutoInputsKiaraModule)
¶
Create an array of date objects from an array of strings.
This module is very simplistic at the moment, more functionality and options will be added in the future.
At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:
- 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
- remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)
By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.
Source code in tabular/modules/array/__init__.py
class ExtractDateModule(AutoInputsKiaraModule):
"""Create an array of date objects from an array of strings.
This module is very simplistic at the moment, more functionality and options will be added in the future.
At its core, this module uses the standard parser from the
[dateutil](https://github.com/dateutil/dateutil) package to parse strings into dates. As this parser can't handle
complex strings, the input strings can be pre-processed in the following ways:
- 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
- remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)
By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
will appear as 'NULL' value in the resulting array.
"""
_module_type_name = "parse.date_array"
_config_cls = ExtractDateConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {"array": {"type": "array", "doc": "The input array."}}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"date_array": {
"type": "array",
"doc": "The resulting array with items of a date data type.",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_field(
field_name="force_non_null", inputs=inputs
)
min_pos: Union[None, int] = self.get_data_for_field(
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_for_field(
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_data_for_field(
field_name="remove_tokens", inputs=inputs
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type: ignore
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_pos] # type: ignore # noqa
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=True)
except Exception as e:
if force_non_null:
raise KiaraProcessingException(e)
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from string: {text}"
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=array.arrow_array)
job_log.add_log(f"start parsing date for {len(array)} items")
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date for {len(array)} items")
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
Classes¶
_config_cls (KiaraInputsConfig)
private
pydantic-model
¶Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):
force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
min_index: Union[None, int] = Field(
description=MIN_INDEX_DOC,
default=None,
)
max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
remove_tokens: List[str] = Field(
description=REMOVE_TOKENS_DOC, default_factory=list
)
force_non_null: bool
pydantic-field
¶If set to 'True', raise an error if any of the strings in the array can't be parsed.
max_index: int
pydantic-field
¶The maximum index until whic to parse the string(s).
min_index: int
pydantic-field
¶The minimum index from where to start parsing the string(s).
remove_tokens: List[str]
pydantic-field
¶A list of tokens/characters to replace with a single white-space before parsing the input.
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/array/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {"array": {"type": "array", "doc": "The input array."}}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/array/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"date_array": {
"type": "array",
"doc": "The resulting array with items of a date data type.",
}
}
process(self, inputs, outputs, job_log)
¶Source code in tabular/modules/array/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_field(
field_name="force_non_null", inputs=inputs
)
min_pos: Union[None, int] = self.get_data_for_field(
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_for_field(
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_data_for_field(
field_name="remove_tokens", inputs=inputs
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type: ignore
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_pos] # type: ignore # noqa
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=True)
except Exception as e:
if force_non_null:
raise KiaraProcessingException(e)
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from string: {text}"
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=array.arrow_array)
job_log.add_log(f"start parsing date for {len(array)} items")
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date for {len(array)} items")
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
db
special
¶
Classes¶
CreateDatabaseModule (CreateFromModule)
¶
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.database"
_config_cls = CreateDatabaseModuleConfig
def create__database__from__csv_file(self, source_value: Value) -> Any:
"""Create a database from a csv_file value."""
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: FileModel = source_value.data
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=file_item.path, reason=str(e))
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value(
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
included_files = {file_item.file_name: file_item}
file_bundle = FileBundle.create_from_file_models(
files=included_files, bundle_name=file_item.file_name
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
"""Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table")
if merge_into_single_table:
raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: FileBundle = source_value.data
table_names: List[str] = []
for rel_path in sorted(bundle.included_files.keys()):
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
if target_type == "database" and source_type == "table":
return {
"table_name": {
"type": "string",
"doc": "The name of the table in the new database.",
"default": "imported_table",
}
}
else:
return None
def create__database__from__table(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("table_name")
if not table_name:
table_name = "imported_table"
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table(
table=arrow_table, index_columns=index_columns, column_map=column_map
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
):
conn.execute(insert(_table), batch.to_pylist())
conn.commit()
db._lock_db()
return db
Classes¶
_config_cls (CreateFromModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
merge_into_single_table: bool = Field(
description="Whether to merge all csv files into a single table.", default=False
)
include_source_metadata: Union[bool, None] = Field(
description="Whether to include a table with metadata about the source files.",
default=None,
)
include_source_file_content: bool = Field(
description="When including source metadata, whether to also include the original raw (string) content.",
default=False,
)
ignore_errors: bool
pydantic-field
¶Whether to ignore convert errors and omit the failed items.
include_source_file_content: bool
pydantic-field
¶When including source metadata, whether to also include the original raw (string) content.
include_source_metadata: bool
pydantic-field
¶Whether to include a table with metadata about the source files.
merge_into_single_table: bool
pydantic-field
¶Whether to merge all csv files into a single table.
Methods¶
create__database__from__csv_file(self, source_value)
¶Create a database from a csv_file value.
Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file(self, source_value: Value) -> Any:
"""Create a database from a csv_file value."""
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: FileModel = source_value.data
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=file_item.path, reason=str(e))
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value(
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
included_files = {file_item.file_name: file_item}
file_bundle = FileBundle.create_from_file_models(
files=included_files, bundle_name=file_item.file_name
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
create__database__from__csv_file_bundle(self, source_value)
¶Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.
Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
"""Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table")
if merge_into_single_table:
raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: FileBundle = source_value.data
table_names: List[str] = []
for rel_path in sorted(bundle.included_files.keys()):
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
create__database__from__table(self, source_value, optional)
¶Create a database value from a table.
Source code in tabular/modules/db/__init__.py
def create__database__from__table(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("table_name")
if not table_name:
table_name = "imported_table"
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table(
table=arrow_table, index_columns=index_columns, column_map=column_map
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
):
conn.execute(insert(_table), batch.to_pylist())
conn.commit()
db._lock_db()
return db
create_optional_inputs(self, source_type, target_type)
¶Source code in tabular/modules/db/__init__.py
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
if target_type == "database" and source_type == "table":
return {
"table_name": {
"type": "string",
"doc": "The name of the table in the new database.",
"default": "imported_table",
}
}
else:
return None
CreateDatabaseModuleConfig (CreateFromModuleConfig)
pydantic-model
¶
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
merge_into_single_table: bool = Field(
description="Whether to merge all csv files into a single table.", default=False
)
include_source_metadata: Union[bool, None] = Field(
description="Whether to include a table with metadata about the source files.",
default=None,
)
include_source_file_content: bool = Field(
description="When including source metadata, whether to also include the original raw (string) content.",
default=False,
)
Attributes¶
ignore_errors: bool
pydantic-field
¶Whether to ignore convert errors and omit the failed items.
include_source_file_content: bool
pydantic-field
¶When including source metadata, whether to also include the original raw (string) content.
include_source_metadata: bool
pydantic-field
¶Whether to include a table with metadata about the source files.
merge_into_single_table: bool
pydantic-field
¶Whether to merge all csv files into a single table.
LoadDatabaseFromDiskModule (DeserializeValueModule)
¶
Source code in tabular/modules/db/__init__.py
class LoadDatabaseFromDiskModule(DeserializeValueModule):
_module_type_name = "load.database"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraDatabase}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "database"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "copy"
def to__python_object(self, data: SerializedData, **config: Any):
assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("db.sqlite")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
db_file = files[0]
db = KiaraDatabase(db_file_path=db_file)
return db
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "database"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "copy"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraDatabase}
to__python_object(self, data, **config)
¶Source code in tabular/modules/db/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("db.sqlite")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
db_file = files[0]
db = KiaraDatabase(db_file_path=db_file)
return db
QueryDatabaseConfig (KiaraModuleConfig)
pydantic-model
¶
QueryDatabaseModule (KiaraModule)
¶
Execute a sql query against a (sqlite) database.
Source code in tabular/modules/db/__init__.py
class QueryDatabaseModule(KiaraModule):
"""Execute a sql query against a (sqlite) database."""
_config_cls = QueryDatabaseConfig
_module_type_name = "query.database"
def create_inputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {
"database": {"type": "database", "doc": "The database to query."}
}
if not self.get_config_value("query"):
result["query"] = {"type": "string", "doc": "The query to execute."}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
def process(self, inputs: ValueMap, outputs: ValueMap):
import pyarrow as pa
database: KiaraDatabase = inputs.get_value_data("database")
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, []).append(v)
table = pa.Table.from_pydict(result_columns)
outputs.set_value("query_result", table)
Classes¶
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/db/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {
"database": {"type": "database", "doc": "The database to query."}
}
if not self.get_config_value("query"):
result["query"] = {"type": "string", "doc": "The query to execute."}
return result
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/db/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
¶Source code in tabular/modules/db/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap):
import pyarrow as pa
database: KiaraDatabase = inputs.get_value_data("database")
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, []).append(v)
table = pa.Table.from_pydict(result_columns)
outputs.set_value("query_result", table)
RenderDatabaseModule (RenderDatabaseModuleBase)
¶
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModule(RenderDatabaseModuleBase):
_module_type_name = "render.database"
def render__database__as__string(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
rendered=pretty,
related_scenes=data_related_scenes,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
)
def render__database__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
rendered=pretty,
related_scenes=data_related_scenes,
render_manifest=self.manifest.manifest_hash,
)
render__database__as__string(self, value, render_config)
¶Source code in tabular/modules/db/__init__.py
def render__database__as__string(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
rendered=pretty,
related_scenes=data_related_scenes,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
)
render__database__as__terminal_renderable(self, value, render_config)
¶Source code in tabular/modules/db/__init__.py
def render__database__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
rendered=pretty,
related_scenes=data_related_scenes,
render_manifest=self.manifest.manifest_hash,
)
RenderDatabaseModuleBase (RenderValueModule)
¶
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModuleBase(RenderValueModule):
_module_type_name: str = None # type: ignore
def preprocess_database(
self,
value: Value,
table_name: Union[str, None],
input_number_of_rows: int,
input_row_offset: int,
):
database: KiaraDatabase = value.data
table_names = database.table_names
if not table_name:
table_name = list(table_names)[0]
if table_name not in table_names:
raise Exception(
f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
)
related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
t: RenderScene.construct(
title=t,
description=f"Display the '{t}' table.",
manifest_hash=self.manifest.manifest_hash,
render_config={"table_name": t},
)
for t in database.table_names
}
query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
result: Dict[str, List[Any]] = {}
# TODO: this could be written much more efficient
with database.get_sqlalchemy_engine().connect() as con:
num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
table_num_rows = num_rows_result.fetchone()[0]
rs = con.execute(text(query))
for r in rs:
for k, v in dict(r).items():
result.setdefault(k, []).append(v)
wrap = DictTabularWrap(data=result)
row_offset = table_num_rows - input_number_of_rows
related_scenes: Dict[str, Union[RenderScene, None]] = {}
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < table_num_rows:
next = {
"row_offset": n_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(table_num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > table_num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
"table_name": table_name,
},
)
related_scenes_tables[table_name].disabled = True # type: ignore
related_scenes_tables[table_name].related_scenes = related_scenes # type: ignore
return wrap, related_scenes_tables
preprocess_database(self, value, table_name, input_number_of_rows, input_row_offset)
¶Source code in tabular/modules/db/__init__.py
def preprocess_database(
self,
value: Value,
table_name: Union[str, None],
input_number_of_rows: int,
input_row_offset: int,
):
database: KiaraDatabase = value.data
table_names = database.table_names
if not table_name:
table_name = list(table_names)[0]
if table_name not in table_names:
raise Exception(
f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
)
related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
t: RenderScene.construct(
title=t,
description=f"Display the '{t}' table.",
manifest_hash=self.manifest.manifest_hash,
render_config={"table_name": t},
)
for t in database.table_names
}
query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
result: Dict[str, List[Any]] = {}
# TODO: this could be written much more efficient
with database.get_sqlalchemy_engine().connect() as con:
num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
table_num_rows = num_rows_result.fetchone()[0]
rs = con.execute(text(query))
for r in rs:
for k, v in dict(r).items():
result.setdefault(k, []).append(v)
wrap = DictTabularWrap(data=result)
row_offset = table_num_rows - input_number_of_rows
related_scenes: Dict[str, Union[RenderScene, None]] = {}
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < table_num_rows:
next = {
"row_offset": n_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(table_num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > table_num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
"table_name": table_name,
},
)
related_scenes_tables[table_name].disabled = True # type: ignore
related_scenes_tables[table_name].related_scenes = related_scenes # type: ignore
return wrap, related_scenes_tables
table
special
¶
EMPTY_COLUMN_NAME_MARKER
¶
Classes¶
CreateTableModule (CreateFromModule)
¶
Source code in tabular/modules/table/__init__.py
class CreateTableModule(CreateFromModule):
_module_type_name = "create.table"
_config_cls = CreateTableModuleConfig
def create__table__from__csv_file(self, source_value: Value) -> Any:
"""Create a table from a csv_file value."""
from pyarrow import csv
input_file: FileModel = source_value.data
imported_data = csv.read_csv(input_file.path)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_data)
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
"""Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns:
- id: an auto-assigned index
- rel_path: the relative path of the file (from the provided base path)
- content: the text file content
"""
import pyarrow as pa
bundle: FileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
ignore_errors = self.get_config_value("ignore_errors")
file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorted(file_dict.keys())):
if column == "content":
_value: Any = file_dict[rel_path]
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_files[rel_path]
_value = getattr(file_model, column)
tabular.setdefault(column, []).append(_value)
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)
Classes¶
_config_cls (CreateFromModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
Methods¶
create__table__from__csv_file(self, source_value)
¶Create a table from a csv_file value.
Source code in tabular/modules/table/__init__.py
def create__table__from__csv_file(self, source_value: Value) -> Any:
"""Create a table from a csv_file value."""
from pyarrow import csv
input_file: FileModel = source_value.data
imported_data = csv.read_csv(input_file.path)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_data)
create__table__from__text_file_bundle(self, source_value)
¶Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns: - id: an auto-assigned index - rel_path: the relative path of the file (from the provided base path) - content: the text file content
Source code in tabular/modules/table/__init__.py
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
"""Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns:
- id: an auto-assigned index
- rel_path: the relative path of the file (from the provided base path)
- content: the text file content
"""
import pyarrow as pa
bundle: FileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
ignore_errors = self.get_config_value("ignore_errors")
file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorted(file_dict.keys())):
if column == "content":
_value: Any = file_dict[rel_path]
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_files[rel_path]
_value = getattr(file_model, column)
tabular.setdefault(column, []).append(_value)
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)
CreateTableModuleConfig (CreateFromModuleConfig)
pydantic-model
¶
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
CutColumnModule (KiaraModule)
¶
Cut off one column from a table, returning an array.
Source code in tabular/modules/table/__init__.py
class CutColumnModule(KiaraModule):
"""Cut off one column from a table, returning an array."""
_module_type_name = "table.cut_column"
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Mapping[str, Any] = {
"table": {"type": "table", "doc": "A table."},
"column_name": {
"type": "string",
"doc": "The name of the column to extract.",
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: str = inputs.get_value_data("column_name")
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Mapping[str, Any] = {
"table": {"type": "table", "doc": "A table."},
"column_name": {
"type": "string",
"doc": "The name of the column to extract.",
},
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
return outputs
process(self, inputs, outputs)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: str = inputs.get_value_data("column_name")
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)
DeserializeTableModule (DeserializeValueModule)
¶
Source code in tabular/modules/table/__init__.py
class DeserializeTableModule(DeserializeValueModule):
_module_type_name = "load.table"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraTable}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "table"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
def to__python_object(self, data: SerializedData, **config: Any):
import pyarrow as pa
columns = {}
for column_name in data.get_keys():
chunks = data.get_serialized_data(column_name)
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as column_chunk:
loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
column = loaded_arrays.column(column_name)
if column_name == EMPTY_COLUMN_NAME_MARKER:
columns[""] = column
else:
columns[column_name] = column
arrow_table = pa.table(columns)
table = KiaraTable.create_table(arrow_table)
return table
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "table"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraTable}
to__python_object(self, data, **config)
¶Source code in tabular/modules/table/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
import pyarrow as pa
columns = {}
for column_name in data.get_keys():
chunks = data.get_serialized_data(column_name)
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as column_chunk:
loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
column = loaded_arrays.column(column_name)
if column_name == EMPTY_COLUMN_NAME_MARKER:
columns[""] = column
else:
columns[column_name] = column
arrow_table = pa.table(columns)
table = KiaraTable.create_table(arrow_table)
return table
ExportTableModule (DataExportModule)
¶
Export table data items.
Source code in tabular/modules/table/__init__.py
class ExportTableModule(DataExportModule):
"""Export table data items."""
_module_type_name = "export.table"
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
"""Export a table as csv file."""
import pyarrow.csv as csv
target_path = os.path.join(base_path, f"{name}.csv")
csv.write_csv(value.arrow_table, target_path)
return {"files": target_path}
# def export__table__as__sqlite_db(
# self, value: KiaraTable, base_path: str, name: str
# ):
#
# target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
#
# raise NotImplementedError()
# # shutil.copy2(value.db_file_path, target_path)
#
# return {"files": target_path}
Methods¶
export__table__as__csv_file(self, value, base_path, name)
¶Export a table as csv file.
Source code in tabular/modules/table/__init__.py
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
"""Export a table as csv file."""
import pyarrow.csv as csv
target_path = os.path.join(base_path, f"{name}.csv")
csv.write_csv(value.arrow_table, target_path)
return {"files": target_path}
MergeTableConfig (KiaraModuleConfig)
pydantic-model
¶
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):
inputs_schema: Dict[str, ValueSchema] = Field(
description="A dict describing the inputs for this merge process."
)
column_map: Dict[str, str] = Field(
description="A map describing", default_factory=dict
)
MergeTableModule (KiaraModule)
¶
Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table'
Source code in tabular/modules/table/__init__.py
class MergeTableModule(KiaraModule):
"""Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
"""
_module_type_name = "table.merge"
_config_cls = MergeTableConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_dict = self.get_config_value("inputs_schema")
return input_schema_dict
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, including all source tables and columns.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
else:
if all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)
Classes¶
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):
inputs_schema: Dict[str, ValueSchema] = Field(
description="A dict describing the inputs for this merge process."
)
column_map: Dict[str, str] = Field(
description="A map describing", default_factory=dict
)
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_dict = self.get_config_value("inputs_schema")
return input_schema_dict
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, including all source tables and columns.",
}
}
return outputs
process(self, inputs, outputs, job_log)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
else:
if all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)
QueryTableSQL (KiaraModule)
¶
Execute a sql query against an (Arrow) table.
The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.
If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.
Source code in tabular/modules/table/__init__.py
class QueryTableSQL(KiaraModule):
"""Execute a sql query against an (Arrow) table.
The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.
If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
one will be used.
"""
_module_type_name = "query.table"
_config_cls = QueryTableSQLModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {
"table": {
"type": "table",
"doc": "The table to query",
}
}
if self.get_config_value("query") is None:
inputs["query"] = {"type": "string", "doc": "The query."}
inputs["relation_name"] = {
"type": "string",
"doc": "The name the table is referred to in the sql query.",
"default": "data",
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("query")
_relation_name: str = inputs.get_value_data("relation_name")
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value("relation_name")
if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
raise KiaraProcessingException(
f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
)
_table: KiaraTable = inputs.get_value_data("table")
rel_from_arrow = duckdb.arrow(_table.arrow_table)
result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)
outputs.set_value("query_result", result.arrow())
Classes¶
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):
query: Union[str, None] = Field(
description="The query to execute. If not specified, the user will be able to provide their own.",
default=None,
)
relation_name: Union[str, None] = Field(
description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
default="data",
)
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {
"table": {
"type": "table",
"doc": "The table to query",
}
}
if self.get_config_value("query") is None:
inputs["query"] = {"type": "string", "doc": "The query."}
inputs["relation_name"] = {
"type": "string",
"doc": "The name the table is referred to in the sql query.",
"default": "data",
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("query")
_relation_name: str = inputs.get_value_data("relation_name")
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value("relation_name")
if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
raise KiaraProcessingException(
f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
)
_table: KiaraTable = inputs.get_value_data("table")
rel_from_arrow = duckdb.arrow(_table.arrow_table)
result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)
outputs.set_value("query_result", result.arrow())
QueryTableSQLModuleConfig (KiaraModuleConfig)
pydantic-model
¶
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):
query: Union[str, None] = Field(
description="The query to execute. If not specified, the user will be able to provide their own.",
default=None,
)
relation_name: Union[str, None] = Field(
description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
default="data",
)
RenderTableModule (RenderTableModuleBase)
¶
Source code in tabular/modules/table/__init__.py
class RenderTableModule(RenderTableModuleBase):
_module_type_name = "render.table"
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
def render__table__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
render__table__as__string(self, value, render_config)
¶Source code in tabular/modules/table/__init__.py
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
render__table__as__terminal_renderable(self, value, render_config)
¶Source code in tabular/modules/table/__init__.py
def render__table__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
RenderTableModuleBase (RenderValueModule)
¶
Source code in tabular/modules/table/__init__.py
class RenderTableModuleBase(RenderValueModule):
_module_type_name: str = None # type: ignore
def preprocess_table(
self, value: Value, input_number_of_rows: int, input_row_offset: int
):
import duckdb
import pyarrow as pa
if value.data_type_name == "array":
array: KiaraArray = value.data
arrow_table = pa.table(data=[array.arrow_array], names=["array"])
column_names: Iterable[str] = ["array"]
else:
table: KiaraTable = value.data
arrow_table = table.arrow_table
column_names = table.column_names
columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]
query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
rel_from_arrow = duckdb.arrow(arrow_table)
query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)
result_table = query_result.arrow()
wrap = ArrowTabularWrap(table=result_table)
related_scenes: Dict[str, Union[None, RenderScene]] = {}
row_offset = arrow_table.num_rows - input_number_of_rows
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < arrow_table.num_rows:
next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(arrow_table.num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
},
)
else:
related_scenes["first"] = None
related_scenes["previous"] = None
related_scenes["next"] = None
related_scenes["last"] = None
return wrap, related_scenes
preprocess_table(self, value, input_number_of_rows, input_row_offset)
¶Source code in tabular/modules/table/__init__.py
def preprocess_table(
self, value: Value, input_number_of_rows: int, input_row_offset: int
):
import duckdb
import pyarrow as pa
if value.data_type_name == "array":
array: KiaraArray = value.data
arrow_table = pa.table(data=[array.arrow_array], names=["array"])
column_names: Iterable[str] = ["array"]
else:
table: KiaraTable = value.data
arrow_table = table.arrow_table
column_names = table.column_names
columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]
query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
rel_from_arrow = duckdb.arrow(arrow_table)
query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)
result_table = query_result.arrow()
wrap = ArrowTabularWrap(table=result_table)
related_scenes: Dict[str, Union[None, RenderScene]] = {}
row_offset = arrow_table.num_rows - input_number_of_rows
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < arrow_table.num_rows:
next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(arrow_table.num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
},
)
else:
related_scenes["first"] = None
related_scenes["previous"] = None
related_scenes["next"] = None
related_scenes["last"] = None
return wrap, related_scenes
filters
¶
TableFiltersModule (FilterModule)
¶Source code in tabular/modules/table/filters.py
class TableFiltersModule(FilterModule):
_module_type_name = "table.filters"
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
return "table"
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
if filter_name in ["select_columns", "drop_columns"]:
return {
"columns": {
"type": "list",
"doc": "The name of the columns to include.",
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore invalid column names.",
"default": True,
},
}
return None
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names_to_ignore = filter_inputs["columns"]
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
_column_names = []
_columns = []
for column_name in arrow_table.column_names:
if column_name in column_names_to_ignore:
continue
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):
pass
create_filter_inputs(self, filter_name)
¶Source code in tabular/modules/table/filters.py
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
if filter_name in ["select_columns", "drop_columns"]:
return {
"columns": {
"type": "list",
"doc": "The name of the columns to include.",
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore invalid column names.",
"default": True,
},
}
return None
filter__drop_columns(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names_to_ignore = filter_inputs["columns"]
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
_column_names = []
_columns = []
for column_name in arrow_table.column_names:
if column_name in column_names_to_ignore:
continue
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
filter__select_columns(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
filter__select_rows(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):
pass
retrieve_supported_type()
classmethod
¶Source code in tabular/modules/table/filters.py
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
return "table"