Skip to content

table

EMPTY_COLUMN_NAME_MARKER

Classes

CreateTableModule (CreateFromModule)

Source code in tabular/modules/table/__init__.py
class CreateTableModule(CreateFromModule):

    _module_type_name = "create.table"
    _config_cls = CreateTableModuleConfig

    def create__table__from__csv_file(self, source_value: Value) -> Any:
        """Create a table from a csv_file value."""

        from pyarrow import csv

        input_file: FileModel = source_value.data
        imported_data = csv.read_csv(input_file.path)

        # import pandas as pd
        # df = pd.read_csv(input_file.path)
        # imported_data = pa.Table.from_pandas(df)

        return KiaraTable.create_table(imported_data)

    def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
        """Create a table value from a text file_bundle.

        The resulting table will have (at a minimum) the following collumns:
        - id: an auto-assigned index
        - rel_path: the relative path of the file (from the provided base path)
        - content: the text file content
        """

        import pyarrow as pa

        bundle: FileBundle = source_value.data

        columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

        ignore_errors = self.get_config_value("ignore_errors")
        file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

        # TODO: use chunks to save on memory
        tabular: Dict[str, List[Any]] = {}
        for column in columns:
            for index, rel_path in enumerate(sorted(file_dict.keys())):

                if column == "content":
                    _value: Any = file_dict[rel_path]
                elif column == "id":
                    _value = index
                elif column == "rel_path":
                    _value = rel_path
                else:
                    file_model = bundle.included_files[rel_path]
                    _value = getattr(file_model, column)

                tabular.setdefault(column, []).append(_value)

        table = pa.Table.from_pydict(tabular)
        return KiaraTable.create_table(table)

Classes

_config_cls (CreateFromModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

Methods

create__table__from__csv_file(self, source_value)

Create a table from a csv_file value.

Source code in tabular/modules/table/__init__.py
def create__table__from__csv_file(self, source_value: Value) -> Any:
    """Create a table from a csv_file value."""

    from pyarrow import csv

    input_file: FileModel = source_value.data
    imported_data = csv.read_csv(input_file.path)

    # import pandas as pd
    # df = pd.read_csv(input_file.path)
    # imported_data = pa.Table.from_pandas(df)

    return KiaraTable.create_table(imported_data)
create__table__from__text_file_bundle(self, source_value)

Create a table value from a text file_bundle.

The resulting table will have (at a minimum) the following collumns: - id: an auto-assigned index - rel_path: the relative path of the file (from the provided base path) - content: the text file content

Source code in tabular/modules/table/__init__.py
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
    """Create a table value from a text file_bundle.

    The resulting table will have (at a minimum) the following collumns:
    - id: an auto-assigned index
    - rel_path: the relative path of the file (from the provided base path)
    - content: the text file content
    """

    import pyarrow as pa

    bundle: FileBundle = source_value.data

    columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

    ignore_errors = self.get_config_value("ignore_errors")
    file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

    # TODO: use chunks to save on memory
    tabular: Dict[str, List[Any]] = {}
    for column in columns:
        for index, rel_path in enumerate(sorted(file_dict.keys())):

            if column == "content":
                _value: Any = file_dict[rel_path]
            elif column == "id":
                _value = index
            elif column == "rel_path":
                _value = rel_path
            else:
                file_model = bundle.included_files[rel_path]
                _value = getattr(file_model, column)

            tabular.setdefault(column, []).append(_value)

    table = pa.Table.from_pydict(tabular)
    return KiaraTable.create_table(table)

CreateTableModuleConfig (CreateFromModuleConfig) pydantic-model

Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )

Attributes

ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

CutColumnModule (KiaraModule)

Cut off one column from a table, returning an array.

Source code in tabular/modules/table/__init__.py
class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""

    _module_type_name = "table.cut_column"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs: Mapping[str, Any] = {
            "table": {"type": "table", "doc": "A table."},
            "column_name": {
                "type": "string",
                "doc": "The name of the column to extract.",
            },
        }
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import pyarrow as pa

        column_name: str = inputs.get_value_data("column_name")

        table_value: Value = inputs.get_value_obj("table")
        table_metadata: KiaraTableMetadata = table_value.get_property_data(
            "metadata.table"
        )

        available = table_metadata.table.column_names

        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
            )

        table: pa.Table = table_value.data.arrow_table
        column = table.column(column_name)

        outputs.set_value("array", column)

Methods

create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs: Mapping[str, Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
    return outputs
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import pyarrow as pa

    column_name: str = inputs.get_value_data("column_name")

    table_value: Value = inputs.get_value_obj("table")
    table_metadata: KiaraTableMetadata = table_value.get_property_data(
        "metadata.table"
    )

    available = table_metadata.table.column_names

    if column_name not in available:
        raise KiaraProcessingException(
            f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
        )

    table: pa.Table = table_value.data.arrow_table
    column = table.column(column_name)

    outputs.set_value("array", column)

DeserializeTableModule (DeserializeValueModule)

Source code in tabular/modules/table/__init__.py
class DeserializeTableModule(DeserializeValueModule):

    _module_type_name = "load.table"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraTable}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "table"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        import pyarrow as pa

        columns = {}

        for column_name in data.get_keys():

            chunks = data.get_serialized_data(column_name)

            # TODO: support multiple chunks
            assert chunks.get_number_of_chunks() == 1
            files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
            assert len(files) == 1

            file = files[0]
            with pa.memory_map(file, "r") as column_chunk:
                loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
                column = loaded_arrays.column(column_name)
                if column_name == EMPTY_COLUMN_NAME_MARKER:
                    columns[""] = column
                else:
                    columns[column_name] = column

        arrow_table = pa.table(columns)

        table = KiaraTable.create_table(arrow_table)
        return table
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "table"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraTable}
to__python_object(self, data, **config)
Source code in tabular/modules/table/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    import pyarrow as pa

    columns = {}

    for column_name in data.get_keys():

        chunks = data.get_serialized_data(column_name)

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        file = files[0]
        with pa.memory_map(file, "r") as column_chunk:
            loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
            column = loaded_arrays.column(column_name)
            if column_name == EMPTY_COLUMN_NAME_MARKER:
                columns[""] = column
            else:
                columns[column_name] = column

    arrow_table = pa.table(columns)

    table = KiaraTable.create_table(arrow_table)
    return table

ExportTableModule (DataExportModule)

Export table data items.

Source code in tabular/modules/table/__init__.py
class ExportTableModule(DataExportModule):
    """Export table data items."""

    _module_type_name = "export.table"

    def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
        """Export a table as csv file."""

        import pyarrow.csv as csv

        target_path = os.path.join(base_path, f"{name}.csv")

        csv.write_csv(value.arrow_table, target_path)

        return {"files": target_path}

    # def export__table__as__sqlite_db(
    #     self, value: KiaraTable, base_path: str, name: str
    # ):
    #
    #     target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
    #
    #     raise NotImplementedError()
    #     # shutil.copy2(value.db_file_path, target_path)
    #
    #     return {"files": target_path}

Methods

export__table__as__csv_file(self, value, base_path, name)

Export a table as csv file.

Source code in tabular/modules/table/__init__.py
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
    """Export a table as csv file."""

    import pyarrow.csv as csv

    target_path = os.path.join(base_path, f"{name}.csv")

    csv.write_csv(value.arrow_table, target_path)

    return {"files": target_path}

MergeTableConfig (KiaraModuleConfig) pydantic-model

Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )

Attributes

column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

MergeTableModule (KiaraModule)

Create a table from other tables and/or arrays.

This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.

Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table'

Source code in tabular/modules/table/__init__.py
class MergeTableModule(KiaraModule):
    """Create a table from other tables and/or arrays.

    This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
    number of tables/arrays, all tables to be merged must be specified in the module configuration.

    Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
    desired column name as key, and a field-name in the following format as value:
    - '[inputs_schema key]' for inputs of type 'array'
    - '[inputs_schema_key].orig_column_name' for inputs of type 'table'
    """

    _module_type_name = "table.merge"
    _config_cls = MergeTableConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        input_schema_dict = self.get_config_value("inputs_schema")
        return input_schema_dict

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs = {
            "table": {
                "type": "table",
                "doc": "The merged table, including all source tables and columns.",
            }
        }
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

        import pyarrow as pa

        inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
        column_map: Dict[str, str] = self.get_config_value("column_map")

        sources = {}
        for field_name in inputs_schema.keys():
            sources[field_name] = inputs.get_value_data(field_name)

        len_dict = {}
        arrays = {}

        column_map_final = dict(column_map)

        for source_key, table_or_array in sources.items():

            if isinstance(table_or_array, KiaraTable):
                rows = table_or_array.num_rows
                for name in table_or_array.column_names:
                    array_name = f"{source_key}.{name}"
                    if column_map and array_name not in column_map.values():
                        job_log.add_log(
                            f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                        )
                        continue

                    column = table_or_array.arrow_table.column(name)
                    arrays[array_name] = column
                    if not column_map:
                        if name in column_map_final:
                            raise Exception(
                                f"Can't merge table, duplicate column name: {name}."
                            )
                        column_map_final[name] = array_name

            elif isinstance(table_or_array, KiaraArray):

                if column_map and source_key not in column_map.values():
                    job_log.add_log(
                        f"Ignoring array '{source_key}': not listed in column_map."
                    )
                    continue

                rows = len(table_or_array)
                arrays[source_key] = table_or_array.arrow_array

                if not column_map:
                    if source_key in column_map_final.keys():
                        raise Exception(
                            f"Can't merge table, duplicate column name: {source_key}."
                        )
                    column_map_final[source_key] = source_key

            else:
                raise KiaraProcessingException(
                    f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
                )

            len_dict[source_key] = rows

        all_rows = None
        for source_key, rows in len_dict.items():
            if all_rows is None:
                all_rows = rows
            else:
                if all_rows != rows:
                    all_rows = None
                    break

        if all_rows is None:
            len_str = ""
            for name, rows in len_dict.items():
                len_str = f" {name} ({rows})"

            raise KiaraProcessingException(
                f"Can't merge table, sources have different lengths: {len_str}"
            )

        column_names = []
        columns = []
        for column_name, ref in column_map_final.items():
            column_names.append(column_name)
            column = arrays[ref]
            columns.append(column)

        table = pa.Table.from_arrays(arrays=columns, names=column_names)

        outputs.set_value("table", table)

Classes

_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )
Attributes
column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

Methods

create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    input_schema_dict = self.get_config_value("inputs_schema")
    return input_schema_dict
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs
process(self, inputs, outputs, job_log)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

    import pyarrow as pa

    inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
    column_map: Dict[str, str] = self.get_config_value("column_map")

    sources = {}
    for field_name in inputs_schema.keys():
        sources[field_name] = inputs.get_value_data(field_name)

    len_dict = {}
    arrays = {}

    column_map_final = dict(column_map)

    for source_key, table_or_array in sources.items():

        if isinstance(table_or_array, KiaraTable):
            rows = table_or_array.num_rows
            for name in table_or_array.column_names:
                array_name = f"{source_key}.{name}"
                if column_map and array_name not in column_map.values():
                    job_log.add_log(
                        f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                    )
                    continue

                column = table_or_array.arrow_table.column(name)
                arrays[array_name] = column
                if not column_map:
                    if name in column_map_final:
                        raise Exception(
                            f"Can't merge table, duplicate column name: {name}."
                        )
                    column_map_final[name] = array_name

        elif isinstance(table_or_array, KiaraArray):

            if column_map and source_key not in column_map.values():
                job_log.add_log(
                    f"Ignoring array '{source_key}': not listed in column_map."
                )
                continue

            rows = len(table_or_array)
            arrays[source_key] = table_or_array.arrow_array

            if not column_map:
                if source_key in column_map_final.keys():
                    raise Exception(
                        f"Can't merge table, duplicate column name: {source_key}."
                    )
                column_map_final[source_key] = source_key

        else:
            raise KiaraProcessingException(
                f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
            )

        len_dict[source_key] = rows

    all_rows = None
    for source_key, rows in len_dict.items():
        if all_rows is None:
            all_rows = rows
        else:
            if all_rows != rows:
                all_rows = None
                break

    if all_rows is None:
        len_str = ""
        for name, rows in len_dict.items():
            len_str = f" {name} ({rows})"

        raise KiaraProcessingException(
            f"Can't merge table, sources have different lengths: {len_str}"
        )

    column_names = []
    columns = []
    for column_name, ref in column_map_final.items():
        column_names.append(column_name)
        column = arrays[ref]
        columns.append(column)

    table = pa.Table.from_arrays(arrays=columns, names=column_names)

    outputs.set_value("table", table)

QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.

Source code in tabular/modules/table/__init__.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table.

    The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

    If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
    one will be used.
    """

    _module_type_name = "query.table"
    _config_cls = QueryTableSQLModuleConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {
            "table": {
                "type": "table",
                "doc": "The table to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
            inputs["relation_name"] = {
                "type": "string",
                "doc": "The name the table is referred to in the sql query.",
                "default": "data",
            }

        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import duckdb

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
            _relation_name: str = inputs.get_value_data("relation_name")
        else:
            _query = self.get_config_value("query")
            _relation_name = self.get_config_value("relation_name")

        if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
            raise KiaraProcessingException(
                f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
            )

        _table: KiaraTable = inputs.get_value_data("table")
        rel_from_arrow = duckdb.arrow(_table.arrow_table)
        result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

        outputs.set_value("query_result", result.arrow())

Classes

_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
Attributes
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

Methods

create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {
        "table": {
            "type": "table",
            "doc": "The table to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
        inputs["relation_name"] = {
            "type": "string",
            "doc": "The name the table is referred to in the sql query.",
            "default": "data",
        }

    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import duckdb

    if self.get_config_value("query") is None:
        _query: str = inputs.get_value_data("query")
        _relation_name: str = inputs.get_value_data("relation_name")
    else:
        _query = self.get_config_value("query")
        _relation_name = self.get_config_value("relation_name")

    if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
        raise KiaraProcessingException(
            f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
        )

    _table: KiaraTable = inputs.get_value_data("table")
    rel_from_arrow = duckdb.arrow(_table.arrow_table)
    result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

    outputs.set_value("query_result", result.arrow())

QueryTableSQLModuleConfig (KiaraModuleConfig) pydantic-model

Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )

Attributes

query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

RenderTableModule (RenderTableModuleBase)

Source code in tabular/modules/table/__init__.py
class RenderTableModule(RenderTableModuleBase):
    _module_type_name = "render.table"

    def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )

    def render__table__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )
render__table__as__string(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )
render__table__as__terminal_renderable(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )

RenderTableModuleBase (RenderValueModule)

Source code in tabular/modules/table/__init__.py
class RenderTableModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_table(
        self, value: Value, input_number_of_rows: int, input_row_offset: int
    ):

        import duckdb
        import pyarrow as pa

        if value.data_type_name == "array":
            array: KiaraArray = value.data
            arrow_table = pa.table(data=[array.arrow_array], names=["array"])
            column_names: Iterable[str] = ["array"]
        else:
            table: KiaraTable = value.data
            arrow_table = table.arrow_table
            column_names = table.column_names

        columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

        query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

        rel_from_arrow = duckdb.arrow(arrow_table)
        query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

        result_table = query_result.arrow()
        wrap = ArrowTabularWrap(table=result_table)

        related_scenes: Dict[str, Union[None, RenderScene]] = {}

        row_offset = arrow_table.num_rows - input_number_of_rows

        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < arrow_table.num_rows:
                next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(arrow_table.num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                    },
                )
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None
            related_scenes["next"] = None
            related_scenes["last"] = None

        return wrap, related_scenes
preprocess_table(self, value, input_number_of_rows, input_row_offset)
Source code in tabular/modules/table/__init__.py
def preprocess_table(
    self, value: Value, input_number_of_rows: int, input_row_offset: int
):

    import duckdb
    import pyarrow as pa

    if value.data_type_name == "array":
        array: KiaraArray = value.data
        arrow_table = pa.table(data=[array.arrow_array], names=["array"])
        column_names: Iterable[str] = ["array"]
    else:
        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        column_names = table.column_names

    columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

    query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

    rel_from_arrow = duckdb.arrow(arrow_table)
    query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

    result_table = query_result.arrow()
    wrap = ArrowTabularWrap(table=result_table)

    related_scenes: Dict[str, Union[None, RenderScene]] = {}

    row_offset = arrow_table.num_rows - input_number_of_rows

    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < arrow_table.num_rows:
            next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(arrow_table.num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                },
            )
    else:
        related_scenes["first"] = None
        related_scenes["previous"] = None
        related_scenes["next"] = None
        related_scenes["last"] = None

    return wrap, related_scenes

filters

TableFiltersModule (FilterModule)
Source code in tabular/modules/table/filters.py
class TableFiltersModule(FilterModule):

    _module_type_name = "table.filters"

    @classmethod
    def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

        return "table"

    def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

        if filter_name in ["select_columns", "drop_columns"]:

            return {
                "columns": {
                    "type": "list",
                    "doc": "The name of the columns to include.",
                    "optional": True,
                },
                "ignore_invalid_column_names": {
                    "type": "boolean",
                    "doc": "Whether to ignore invalid column names.",
                    "default": True,
                },
            }

        return None

    def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names = filter_inputs["columns"]

        if not column_names:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        _column_names = []
        _columns = []

        for column_name in column_names:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names_to_ignore = filter_inputs["columns"]

        if not column_names_to_ignore:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table

        for column_name in column_names_to_ignore:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

        _column_names = []
        _columns = []
        for column_name in arrow_table.column_names:

            if column_name in column_names_to_ignore:
                continue

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

        pass
create_filter_inputs(self, filter_name)
Source code in tabular/modules/table/filters.py
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

    if filter_name in ["select_columns", "drop_columns"]:

        return {
            "columns": {
                "type": "list",
                "doc": "The name of the columns to include.",
                "optional": True,
            },
            "ignore_invalid_column_names": {
                "type": "boolean",
                "doc": "Whether to ignore invalid column names.",
                "default": True,
            },
        }

    return None
filter__drop_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names_to_ignore = filter_inputs["columns"]

    if not column_names_to_ignore:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table

    for column_name in column_names_to_ignore:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

    _column_names = []
    _columns = []
    for column_name in arrow_table.column_names:

        if column_name in column_names_to_ignore:
            continue

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names = filter_inputs["columns"]

    if not column_names:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table
    _column_names = []
    _columns = []

    for column_name in column_names:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_rows(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

    pass
retrieve_supported_type() classmethod
Source code in tabular/modules/table/filters.py
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

    return "table"