Skip to content

modules

Modules

array special

FORCE_NON_NULL_DOC
MAX_INDEX_DOC
MIN_INDEX_DOC
REMOVE_TOKENS_DOC

Classes

DeserializeArrayModule (DeserializeValueModule)

Deserialize array data.

Source code in tabular/modules/array/__init__.py
class DeserializeArrayModule(DeserializeValueModule):
    """Deserialize array data."""

    _module_type_name = "load.array"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraArray}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "array"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("array.arrow")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        array_file = files[0]

        array = KiaraArray(data_path=array_file)
        return array
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "array"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraArray}
to__python_object(self, data, **config)
Source code in tabular/modules/array/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("array.arrow")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    array_file = files[0]

    array = KiaraArray(data_path=array_file)
    return array
ExtractDateConfig (KiaraInputsConfig) pydantic-model
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )
Attributes
force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

ExtractDateModule (AutoInputsKiaraModule)

Create an array of date objects from an array of strings.

This module is very simplistic at the moment, more functionality and options will be added in the future.

At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:

  • 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
  • remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.

Source code in tabular/modules/array/__init__.py
class ExtractDateModule(AutoInputsKiaraModule):
    """Create an array of date objects from an array of strings.

    This module is very simplistic at the moment, more functionality and options will be added in the future.

    At its core, this module uses the standard parser from the
    [dateutil](https://github.com/dateutil/dateutil) package to parse strings into dates. As this parser can't handle
     complex strings, the input strings can be pre-processed in the following ways:

    - 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
    - remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

    By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
    setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
    will appear as 'NULL' value in the resulting array.
    """

    _module_type_name = "parse.date_array"
    _config_cls = ExtractDateConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {"array": {"type": "array", "doc": "The input array."}}
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {
            "date_array": {
                "type": "array",
                "doc": "The resulting array with items of a date data type.",
            }
        }

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

        import polars as pl
        import pyarrow as pa
        from dateutil import parser

        force_non_null: bool = self.get_data_for_field(
            field_name="force_non_null", inputs=inputs
        )
        min_pos: Union[None, int] = self.get_data_for_field(
            field_name="min_index", inputs=inputs
        )
        if min_pos is None:
            min_pos = 0
        max_pos: Union[None, int] = self.get_data_for_field(
            field_name="max_index", inputs=inputs
        )
        remove_tokens: Iterable[str] = self.get_data_for_field(
            field_name="remove_tokens", inputs=inputs
        )

        def parse_date(_text: str):

            text = _text
            if min_pos:
                try:
                    text = text[min_pos:]  # type: ignore
                except Exception:
                    return None
            if max_pos:
                try:
                    text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
                except Exception:
                    pass

            if remove_tokens:
                for t in remove_tokens:
                    text = text.replace(t, " ")

            try:
                d_obj = parser.parse(text, fuzzy=True)
            except Exception as e:
                if force_non_null:
                    raise KiaraProcessingException(e)
                return None

            if d_obj is None:
                if force_non_null:
                    raise KiaraProcessingException(
                        f"Can't parse date from string: {text}"
                    )
                return None

            return d_obj

        value = inputs.get_value_obj("array")
        array: KiaraArray = value.data

        series = pl.Series(name="tokens", values=array.arrow_array)
        job_log.add_log(f"start parsing date for {len(array)} items")
        result = series.apply(parse_date)
        job_log.add_log(f"finished parsing date for {len(array)} items")
        result_array = result.to_arrow()

        # TODO: remove this cast once the array data type can handle non-chunked arrays
        chunked = pa.chunked_array(result_array)
        outputs.set_values(date_array=chunked)
Classes
_config_cls (KiaraInputsConfig) private pydantic-model
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )
Attributes
force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/array/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {"array": {"type": "array", "doc": "The input array."}}
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/array/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {
        "date_array": {
            "type": "array",
            "doc": "The resulting array with items of a date data type.",
        }
    }
process(self, inputs, outputs, job_log)
Source code in tabular/modules/array/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

    import polars as pl
    import pyarrow as pa
    from dateutil import parser

    force_non_null: bool = self.get_data_for_field(
        field_name="force_non_null", inputs=inputs
    )
    min_pos: Union[None, int] = self.get_data_for_field(
        field_name="min_index", inputs=inputs
    )
    if min_pos is None:
        min_pos = 0
    max_pos: Union[None, int] = self.get_data_for_field(
        field_name="max_index", inputs=inputs
    )
    remove_tokens: Iterable[str] = self.get_data_for_field(
        field_name="remove_tokens", inputs=inputs
    )

    def parse_date(_text: str):

        text = _text
        if min_pos:
            try:
                text = text[min_pos:]  # type: ignore
            except Exception:
                return None
        if max_pos:
            try:
                text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
            except Exception:
                pass

        if remove_tokens:
            for t in remove_tokens:
                text = text.replace(t, " ")

        try:
            d_obj = parser.parse(text, fuzzy=True)
        except Exception as e:
            if force_non_null:
                raise KiaraProcessingException(e)
            return None

        if d_obj is None:
            if force_non_null:
                raise KiaraProcessingException(
                    f"Can't parse date from string: {text}"
                )
            return None

        return d_obj

    value = inputs.get_value_obj("array")
    array: KiaraArray = value.data

    series = pl.Series(name="tokens", values=array.arrow_array)
    job_log.add_log(f"start parsing date for {len(array)} items")
    result = series.apply(parse_date)
    job_log.add_log(f"finished parsing date for {len(array)} items")
    result_array = result.to_arrow()

    # TODO: remove this cast once the array data type can handle non-chunked arrays
    chunked = pa.chunked_array(result_array)
    outputs.set_values(date_array=chunked)

db special

Classes

CreateDatabaseModule (CreateFromModule)
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModule(CreateFromModule):

    _module_type_name = "create.database"
    _config_cls = CreateDatabaseModuleConfig

    def create__database__from__csv_file(self, source_value: Value) -> Any:
        """Create a database from a csv_file value."""

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        file_item: FileModel = source_value.data
        table_name = file_item.file_name_without_extension

        table_name = table_name.replace("-", "_")
        table_name = table_name.replace(".", "_")

        try:
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path, file_item=file_item, table_name=table_name
            )
        except Exception as e:
            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=file_item.path, reason=str(e))
            else:
                raise KiaraProcessingException(e)

        include_raw_content_in_file_info: bool = self.get_config_value(
            "include_source_metadata"
        )
        if include_raw_content_in_file_info:
            db = KiaraDatabase(db_file_path=db_path)
            db.create_if_not_exists()
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()
            included_files = {file_item.file_name: file_item}
            file_bundle = FileBundle.create_from_file_models(
                files=included_files, bundle_name=file_item.file_name
            )
            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=file_bundle,
                table_name="source_files_metadata",
                include_content=include_content,
            )
            db._lock_db()

        return db_path

    def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
        """Create a database from a csv_file_bundle value.

        Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
        in the resulting database. If this option is set, only a single table with all the values of all
        csv files will be created. For this to work, all csv files should follow the same schema.
        """

        merge_into_single_table = self.get_config_value("merge_into_single_table")
        if merge_into_single_table:
            raise NotImplementedError("Not supported (yet).")

        include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
            "include_source_metadata"
        )

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()

        # TODO: check whether/how to add indexes

        bundle: FileBundle = source_value.data
        table_names: List[str] = []
        for rel_path in sorted(bundle.included_files.keys()):

            file_item = bundle.included_files[rel_path]
            table_name = find_free_id(
                stem=file_item.file_name_without_extension, current_ids=table_names
            )
            try:
                table_names.append(table_name)
                create_sqlite_table_from_tabular_file(
                    target_db_file=db_path, file_item=file_item, table_name=table_name
                )
            except Exception as e:
                if self.get_config_value("ignore_errors") is True or True:
                    log_message("ignore.import_file", file=rel_path, reason=str(e))
                    continue
                raise KiaraProcessingException(e)

        if include_raw_content_in_file_info in [None, True]:
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()
            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=source_value.data,
                table_name="source_files_metadata",
                include_content=include_content,
            )
            db._lock_db()

        return db_path

    def create_optional_inputs(
        self, source_type: str, target_type
    ) -> Union[Mapping[str, Mapping[str, Any]], None]:

        if target_type == "database" and source_type == "table":

            return {
                "table_name": {
                    "type": "string",
                    "doc": "The name of the table in the new database.",
                    "default": "imported_table",
                }
            }
        else:
            return None

    def create__database__from__table(
        self, source_value: Value, optional: ValueMap
    ) -> Any:
        """Create a database value from a table."""

        table_name = optional.get_value_data("table_name")
        if not table_name:
            table_name = "imported_table"

        table: KiaraTable = source_value.data
        arrow_table = table.arrow_table

        column_map = None
        index_columns = None

        sqlite_schema = create_sqlite_schema_data_from_arrow_table(
            table=arrow_table, index_columns=index_columns, column_map=column_map
        )

        db = KiaraDatabase.create_in_temp_dir()
        db._unlock_db()
        engine = db.get_sqlalchemy_engine()

        _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

        with engine.connect() as conn:

            for batch in arrow_table.to_batches(
                max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
            ):
                conn.execute(insert(_table), batch.to_pylist())
                conn.commit()

        db._lock_db()
        return db
Classes
_config_cls (CreateFromModuleConfig) private pydantic-model
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
    merge_into_single_table: bool = Field(
        description="Whether to merge all csv files into a single table.", default=False
    )
    include_source_metadata: Union[bool, None] = Field(
        description="Whether to include a table with metadata about the source files.",
        default=None,
    )
    include_source_file_content: bool = Field(
        description="When including source metadata, whether to also include the original raw (string) content.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

include_source_file_content: bool pydantic-field

When including source metadata, whether to also include the original raw (string) content.

include_source_metadata: bool pydantic-field

Whether to include a table with metadata about the source files.

merge_into_single_table: bool pydantic-field

Whether to merge all csv files into a single table.

Methods
create__database__from__csv_file(self, source_value)

Create a database from a csv_file value.

Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file(self, source_value: Value) -> Any:
    """Create a database from a csv_file value."""

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    file_item: FileModel = source_value.data
    table_name = file_item.file_name_without_extension

    table_name = table_name.replace("-", "_")
    table_name = table_name.replace(".", "_")

    try:
        create_sqlite_table_from_tabular_file(
            target_db_file=db_path, file_item=file_item, table_name=table_name
        )
    except Exception as e:
        if self.get_config_value("ignore_errors") is True or True:
            log_message("ignore.import_file", file=file_item.path, reason=str(e))
        else:
            raise KiaraProcessingException(e)

    include_raw_content_in_file_info: bool = self.get_config_value(
        "include_source_metadata"
    )
    if include_raw_content_in_file_info:
        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()
        included_files = {file_item.file_name: file_item}
        file_bundle = FileBundle.create_from_file_models(
            files=included_files, bundle_name=file_item.file_name
        )
        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=file_bundle,
            table_name="source_files_metadata",
            include_content=include_content,
        )
        db._lock_db()

    return db_path
create__database__from__csv_file_bundle(self, source_value)

Create a database from a csv_file_bundle value.

Unless 'merge_into_single_table' is set to 'True', each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.

Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
    """Create a database from a csv_file_bundle value.

    Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
    in the resulting database. If this option is set, only a single table with all the values of all
    csv files will be created. For this to work, all csv files should follow the same schema.
    """

    merge_into_single_table = self.get_config_value("merge_into_single_table")
    if merge_into_single_table:
        raise NotImplementedError("Not supported (yet).")

    include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
        "include_source_metadata"
    )

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = KiaraDatabase(db_file_path=db_path)
    db.create_if_not_exists()

    # TODO: check whether/how to add indexes

    bundle: FileBundle = source_value.data
    table_names: List[str] = []
    for rel_path in sorted(bundle.included_files.keys()):

        file_item = bundle.included_files[rel_path]
        table_name = find_free_id(
            stem=file_item.file_name_without_extension, current_ids=table_names
        )
        try:
            table_names.append(table_name)
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path, file_item=file_item, table_name=table_name
            )
        except Exception as e:
            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=rel_path, reason=str(e))
                continue
            raise KiaraProcessingException(e)

    if include_raw_content_in_file_info in [None, True]:
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()
        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=source_value.data,
            table_name="source_files_metadata",
            include_content=include_content,
        )
        db._lock_db()

    return db_path
create__database__from__table(self, source_value, optional)

Create a database value from a table.

Source code in tabular/modules/db/__init__.py
def create__database__from__table(
    self, source_value: Value, optional: ValueMap
) -> Any:
    """Create a database value from a table."""

    table_name = optional.get_value_data("table_name")
    if not table_name:
        table_name = "imported_table"

    table: KiaraTable = source_value.data
    arrow_table = table.arrow_table

    column_map = None
    index_columns = None

    sqlite_schema = create_sqlite_schema_data_from_arrow_table(
        table=arrow_table, index_columns=index_columns, column_map=column_map
    )

    db = KiaraDatabase.create_in_temp_dir()
    db._unlock_db()
    engine = db.get_sqlalchemy_engine()

    _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

    with engine.connect() as conn:

        for batch in arrow_table.to_batches(
            max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
        ):
            conn.execute(insert(_table), batch.to_pylist())
            conn.commit()

    db._lock_db()
    return db
create_optional_inputs(self, source_type, target_type)
Source code in tabular/modules/db/__init__.py
def create_optional_inputs(
    self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:

    if target_type == "database" and source_type == "table":

        return {
            "table_name": {
                "type": "string",
                "doc": "The name of the table in the new database.",
                "default": "imported_table",
            }
        }
    else:
        return None
CreateDatabaseModuleConfig (CreateFromModuleConfig) pydantic-model
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
    merge_into_single_table: bool = Field(
        description="Whether to merge all csv files into a single table.", default=False
    )
    include_source_metadata: Union[bool, None] = Field(
        description="Whether to include a table with metadata about the source files.",
        default=None,
    )
    include_source_file_content: bool = Field(
        description="When including source metadata, whether to also include the original raw (string) content.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

include_source_file_content: bool pydantic-field

When including source metadata, whether to also include the original raw (string) content.

include_source_metadata: bool pydantic-field

Whether to include a table with metadata about the source files.

merge_into_single_table: bool pydantic-field

Whether to merge all csv files into a single table.

LoadDatabaseFromDiskModule (DeserializeValueModule)
Source code in tabular/modules/db/__init__.py
class LoadDatabaseFromDiskModule(DeserializeValueModule):

    _module_type_name = "load.database"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraDatabase}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "database"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "copy"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("db.sqlite")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        db_file = files[0]

        db = KiaraDatabase(db_file_path=db_file)
        return db
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "database"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "copy"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraDatabase}
to__python_object(self, data, **config)
Source code in tabular/modules/db/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("db.sqlite")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    db_file = files[0]

    db = KiaraDatabase(db_file_path=db_file)
    return db
QueryDatabaseConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/db/__init__.py
class QueryDatabaseConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(description="The query.", default=None)
Attributes
query: str pydantic-field

The query.

QueryDatabaseModule (KiaraModule)

Execute a sql query against a (sqlite) database.

Source code in tabular/modules/db/__init__.py
class QueryDatabaseModule(KiaraModule):
    """Execute a sql query against a (sqlite) database."""

    _config_cls = QueryDatabaseConfig
    _module_type_name = "query.database"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        result: Dict[str, Dict[str, Any]] = {
            "database": {"type": "database", "doc": "The database to query."}
        }

        if not self.get_config_value("query"):
            result["query"] = {"type": "string", "doc": "The query to execute."}

        return result

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap):

        import pyarrow as pa

        database: KiaraDatabase = inputs.get_value_data("database")
        query = self.get_config_value("query")
        if query is None:
            query = inputs.get_value_data("query")

        # TODO: make this memory efficent

        result_columns: Dict[str, List[Any]] = {}
        with database.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(query))
            for r in result:
                for k, v in dict(r).items():
                    result_columns.setdefault(k, []).append(v)

        table = pa.Table.from_pydict(result_columns)
        outputs.set_value("query_result", table)
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/db/__init__.py
class QueryDatabaseConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(description="The query.", default=None)
Attributes
query: str pydantic-field

The query.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/db/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    result: Dict[str, Dict[str, Any]] = {
        "database": {"type": "database", "doc": "The database to query."}
    }

    if not self.get_config_value("query"):
        result["query"] = {"type": "string", "doc": "The query to execute."}

    return result
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/db/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
Source code in tabular/modules/db/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap):

    import pyarrow as pa

    database: KiaraDatabase = inputs.get_value_data("database")
    query = self.get_config_value("query")
    if query is None:
        query = inputs.get_value_data("query")

    # TODO: make this memory efficent

    result_columns: Dict[str, List[Any]] = {}
    with database.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(query))
        for r in result:
            for k, v in dict(r).items():
                result_columns.setdefault(k, []).append(v)

    table = pa.Table.from_pydict(result_columns)
    outputs.set_value("query_result", table)
RenderDatabaseModule (RenderDatabaseModuleBase)
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModule(RenderDatabaseModuleBase):
    _module_type_name = "render.database"

    def render__database__as__string(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
        )

    def render__database__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_manifest=self.manifest.manifest_hash,
        )
render__database__as__string(self, value, render_config)
Source code in tabular/modules/db/__init__.py
def render__database__as__string(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
    )
render__database__as__terminal_renderable(self, value, render_config)
Source code in tabular/modules/db/__init__.py
def render__database__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_manifest=self.manifest.manifest_hash,
    )
RenderDatabaseModuleBase (RenderValueModule)
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_database(
        self,
        value: Value,
        table_name: Union[str, None],
        input_number_of_rows: int,
        input_row_offset: int,
    ):

        database: KiaraDatabase = value.data
        table_names = database.table_names

        if not table_name:
            table_name = list(table_names)[0]

        if table_name not in table_names:
            raise Exception(
                f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
            )

        related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
            t: RenderScene.construct(
                title=t,
                description=f"Display the '{t}' table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={"table_name": t},
            )
            for t in database.table_names
        }

        query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
        result: Dict[str, List[Any]] = {}
        # TODO: this could be written much more efficient
        with database.get_sqlalchemy_engine().connect() as con:
            num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
            table_num_rows = num_rows_result.fetchone()[0]
            rs = con.execute(text(query))
            for r in rs:
                for k, v in dict(r).items():
                    result.setdefault(k, []).append(v)

        wrap = DictTabularWrap(data=result)

        row_offset = table_num_rows - input_number_of_rows
        related_scenes: Dict[str, Union[RenderScene, None]] = {}
        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                        "table_name": table_name,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < table_num_rows:
                next = {
                    "row_offset": n_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(table_num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > table_num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                        "table_name": table_name,
                    },
                )
        related_scenes_tables[table_name].disabled = True  # type: ignore
        related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
        return wrap, related_scenes_tables
preprocess_database(self, value, table_name, input_number_of_rows, input_row_offset)
Source code in tabular/modules/db/__init__.py
def preprocess_database(
    self,
    value: Value,
    table_name: Union[str, None],
    input_number_of_rows: int,
    input_row_offset: int,
):

    database: KiaraDatabase = value.data
    table_names = database.table_names

    if not table_name:
        table_name = list(table_names)[0]

    if table_name not in table_names:
        raise Exception(
            f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
        )

    related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
        t: RenderScene.construct(
            title=t,
            description=f"Display the '{t}' table.",
            manifest_hash=self.manifest.manifest_hash,
            render_config={"table_name": t},
        )
        for t in database.table_names
    }

    query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
    result: Dict[str, List[Any]] = {}
    # TODO: this could be written much more efficient
    with database.get_sqlalchemy_engine().connect() as con:
        num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
        table_num_rows = num_rows_result.fetchone()[0]
        rs = con.execute(text(query))
        for r in rs:
            for k, v in dict(r).items():
                result.setdefault(k, []).append(v)

    wrap = DictTabularWrap(data=result)

    row_offset = table_num_rows - input_number_of_rows
    related_scenes: Dict[str, Union[RenderScene, None]] = {}
    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < table_num_rows:
            next = {
                "row_offset": n_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(table_num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > table_num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                    "table_name": table_name,
                },
            )
    related_scenes_tables[table_name].disabled = True  # type: ignore
    related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
    return wrap, related_scenes_tables

table special

EMPTY_COLUMN_NAME_MARKER

Classes

CreateTableModule (CreateFromModule)
Source code in tabular/modules/table/__init__.py
class CreateTableModule(CreateFromModule):

    _module_type_name = "create.table"
    _config_cls = CreateTableModuleConfig

    def create__table__from__csv_file(self, source_value: Value) -> Any:
        """Create a table from a csv_file value."""

        from pyarrow import csv

        input_file: FileModel = source_value.data
        imported_data = csv.read_csv(input_file.path)

        # import pandas as pd
        # df = pd.read_csv(input_file.path)
        # imported_data = pa.Table.from_pandas(df)

        return KiaraTable.create_table(imported_data)

    def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
        """Create a table value from a text file_bundle.

        The resulting table will have (at a minimum) the following collumns:
        - id: an auto-assigned index
        - rel_path: the relative path of the file (from the provided base path)
        - content: the text file content
        """

        import pyarrow as pa

        bundle: FileBundle = source_value.data

        columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

        ignore_errors = self.get_config_value("ignore_errors")
        file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

        # TODO: use chunks to save on memory
        tabular: Dict[str, List[Any]] = {}
        for column in columns:
            for index, rel_path in enumerate(sorted(file_dict.keys())):

                if column == "content":
                    _value: Any = file_dict[rel_path]
                elif column == "id":
                    _value = index
                elif column == "rel_path":
                    _value = rel_path
                else:
                    file_model = bundle.included_files[rel_path]
                    _value = getattr(file_model, column)

                tabular.setdefault(column, []).append(_value)

        table = pa.Table.from_pydict(tabular)
        return KiaraTable.create_table(table)
Classes
_config_cls (CreateFromModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

Methods
create__table__from__csv_file(self, source_value)

Create a table from a csv_file value.

Source code in tabular/modules/table/__init__.py
def create__table__from__csv_file(self, source_value: Value) -> Any:
    """Create a table from a csv_file value."""

    from pyarrow import csv

    input_file: FileModel = source_value.data
    imported_data = csv.read_csv(input_file.path)

    # import pandas as pd
    # df = pd.read_csv(input_file.path)
    # imported_data = pa.Table.from_pandas(df)

    return KiaraTable.create_table(imported_data)
create__table__from__text_file_bundle(self, source_value)

Create a table value from a text file_bundle.

The resulting table will have (at a minimum) the following collumns: - id: an auto-assigned index - rel_path: the relative path of the file (from the provided base path) - content: the text file content

Source code in tabular/modules/table/__init__.py
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
    """Create a table value from a text file_bundle.

    The resulting table will have (at a minimum) the following collumns:
    - id: an auto-assigned index
    - rel_path: the relative path of the file (from the provided base path)
    - content: the text file content
    """

    import pyarrow as pa

    bundle: FileBundle = source_value.data

    columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

    ignore_errors = self.get_config_value("ignore_errors")
    file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

    # TODO: use chunks to save on memory
    tabular: Dict[str, List[Any]] = {}
    for column in columns:
        for index, rel_path in enumerate(sorted(file_dict.keys())):

            if column == "content":
                _value: Any = file_dict[rel_path]
            elif column == "id":
                _value = index
            elif column == "rel_path":
                _value = rel_path
            else:
                file_model = bundle.included_files[rel_path]
                _value = getattr(file_model, column)

            tabular.setdefault(column, []).append(_value)

    table = pa.Table.from_pydict(tabular)
    return KiaraTable.create_table(table)
CreateTableModuleConfig (CreateFromModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

CutColumnModule (KiaraModule)

Cut off one column from a table, returning an array.

Source code in tabular/modules/table/__init__.py
class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""

    _module_type_name = "table.cut_column"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs: Mapping[str, Any] = {
            "table": {"type": "table", "doc": "A table."},
            "column_name": {
                "type": "string",
                "doc": "The name of the column to extract.",
            },
        }
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import pyarrow as pa

        column_name: str = inputs.get_value_data("column_name")

        table_value: Value = inputs.get_value_obj("table")
        table_metadata: KiaraTableMetadata = table_value.get_property_data(
            "metadata.table"
        )

        available = table_metadata.table.column_names

        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
            )

        table: pa.Table = table_value.data.arrow_table
        column = table.column(column_name)

        outputs.set_value("array", column)
Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs: Mapping[str, Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
    return outputs
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import pyarrow as pa

    column_name: str = inputs.get_value_data("column_name")

    table_value: Value = inputs.get_value_obj("table")
    table_metadata: KiaraTableMetadata = table_value.get_property_data(
        "metadata.table"
    )

    available = table_metadata.table.column_names

    if column_name not in available:
        raise KiaraProcessingException(
            f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
        )

    table: pa.Table = table_value.data.arrow_table
    column = table.column(column_name)

    outputs.set_value("array", column)
DeserializeTableModule (DeserializeValueModule)
Source code in tabular/modules/table/__init__.py
class DeserializeTableModule(DeserializeValueModule):

    _module_type_name = "load.table"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraTable}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "table"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        import pyarrow as pa

        columns = {}

        for column_name in data.get_keys():

            chunks = data.get_serialized_data(column_name)

            # TODO: support multiple chunks
            assert chunks.get_number_of_chunks() == 1
            files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
            assert len(files) == 1

            file = files[0]
            with pa.memory_map(file, "r") as column_chunk:
                loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
                column = loaded_arrays.column(column_name)
                if column_name == EMPTY_COLUMN_NAME_MARKER:
                    columns[""] = column
                else:
                    columns[column_name] = column

        arrow_table = pa.table(columns)

        table = KiaraTable.create_table(arrow_table)
        return table
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "table"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraTable}
to__python_object(self, data, **config)
Source code in tabular/modules/table/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    import pyarrow as pa

    columns = {}

    for column_name in data.get_keys():

        chunks = data.get_serialized_data(column_name)

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        file = files[0]
        with pa.memory_map(file, "r") as column_chunk:
            loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
            column = loaded_arrays.column(column_name)
            if column_name == EMPTY_COLUMN_NAME_MARKER:
                columns[""] = column
            else:
                columns[column_name] = column

    arrow_table = pa.table(columns)

    table = KiaraTable.create_table(arrow_table)
    return table
ExportTableModule (DataExportModule)

Export table data items.

Source code in tabular/modules/table/__init__.py
class ExportTableModule(DataExportModule):
    """Export table data items."""

    _module_type_name = "export.table"

    def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
        """Export a table as csv file."""

        import pyarrow.csv as csv

        target_path = os.path.join(base_path, f"{name}.csv")

        csv.write_csv(value.arrow_table, target_path)

        return {"files": target_path}

    # def export__table__as__sqlite_db(
    #     self, value: KiaraTable, base_path: str, name: str
    # ):
    #
    #     target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
    #
    #     raise NotImplementedError()
    #     # shutil.copy2(value.db_file_path, target_path)
    #
    #     return {"files": target_path}
Methods
export__table__as__csv_file(self, value, base_path, name)

Export a table as csv file.

Source code in tabular/modules/table/__init__.py
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
    """Export a table as csv file."""

    import pyarrow.csv as csv

    target_path = os.path.join(base_path, f"{name}.csv")

    csv.write_csv(value.arrow_table, target_path)

    return {"files": target_path}
MergeTableConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )
Attributes
column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

MergeTableModule (KiaraModule)

Create a table from other tables and/or arrays.

This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.

Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table'

Source code in tabular/modules/table/__init__.py
class MergeTableModule(KiaraModule):
    """Create a table from other tables and/or arrays.

    This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
    number of tables/arrays, all tables to be merged must be specified in the module configuration.

    Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
    desired column name as key, and a field-name in the following format as value:
    - '[inputs_schema key]' for inputs of type 'array'
    - '[inputs_schema_key].orig_column_name' for inputs of type 'table'
    """

    _module_type_name = "table.merge"
    _config_cls = MergeTableConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        input_schema_dict = self.get_config_value("inputs_schema")
        return input_schema_dict

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs = {
            "table": {
                "type": "table",
                "doc": "The merged table, including all source tables and columns.",
            }
        }
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

        import pyarrow as pa

        inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
        column_map: Dict[str, str] = self.get_config_value("column_map")

        sources = {}
        for field_name in inputs_schema.keys():
            sources[field_name] = inputs.get_value_data(field_name)

        len_dict = {}
        arrays = {}

        column_map_final = dict(column_map)

        for source_key, table_or_array in sources.items():

            if isinstance(table_or_array, KiaraTable):
                rows = table_or_array.num_rows
                for name in table_or_array.column_names:
                    array_name = f"{source_key}.{name}"
                    if column_map and array_name not in column_map.values():
                        job_log.add_log(
                            f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                        )
                        continue

                    column = table_or_array.arrow_table.column(name)
                    arrays[array_name] = column
                    if not column_map:
                        if name in column_map_final:
                            raise Exception(
                                f"Can't merge table, duplicate column name: {name}."
                            )
                        column_map_final[name] = array_name

            elif isinstance(table_or_array, KiaraArray):

                if column_map and source_key not in column_map.values():
                    job_log.add_log(
                        f"Ignoring array '{source_key}': not listed in column_map."
                    )
                    continue

                rows = len(table_or_array)
                arrays[source_key] = table_or_array.arrow_array

                if not column_map:
                    if source_key in column_map_final.keys():
                        raise Exception(
                            f"Can't merge table, duplicate column name: {source_key}."
                        )
                    column_map_final[source_key] = source_key

            else:
                raise KiaraProcessingException(
                    f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
                )

            len_dict[source_key] = rows

        all_rows = None
        for source_key, rows in len_dict.items():
            if all_rows is None:
                all_rows = rows
            else:
                if all_rows != rows:
                    all_rows = None
                    break

        if all_rows is None:
            len_str = ""
            for name, rows in len_dict.items():
                len_str = f" {name} ({rows})"

            raise KiaraProcessingException(
                f"Can't merge table, sources have different lengths: {len_str}"
            )

        column_names = []
        columns = []
        for column_name, ref in column_map_final.items():
            column_names.append(column_name)
            column = arrays[ref]
            columns.append(column)

        table = pa.Table.from_arrays(arrays=columns, names=column_names)

        outputs.set_value("table", table)
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )
Attributes
column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    input_schema_dict = self.get_config_value("inputs_schema")
    return input_schema_dict
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs
process(self, inputs, outputs, job_log)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

    import pyarrow as pa

    inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
    column_map: Dict[str, str] = self.get_config_value("column_map")

    sources = {}
    for field_name in inputs_schema.keys():
        sources[field_name] = inputs.get_value_data(field_name)

    len_dict = {}
    arrays = {}

    column_map_final = dict(column_map)

    for source_key, table_or_array in sources.items():

        if isinstance(table_or_array, KiaraTable):
            rows = table_or_array.num_rows
            for name in table_or_array.column_names:
                array_name = f"{source_key}.{name}"
                if column_map and array_name not in column_map.values():
                    job_log.add_log(
                        f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                    )
                    continue

                column = table_or_array.arrow_table.column(name)
                arrays[array_name] = column
                if not column_map:
                    if name in column_map_final:
                        raise Exception(
                            f"Can't merge table, duplicate column name: {name}."
                        )
                    column_map_final[name] = array_name

        elif isinstance(table_or_array, KiaraArray):

            if column_map and source_key not in column_map.values():
                job_log.add_log(
                    f"Ignoring array '{source_key}': not listed in column_map."
                )
                continue

            rows = len(table_or_array)
            arrays[source_key] = table_or_array.arrow_array

            if not column_map:
                if source_key in column_map_final.keys():
                    raise Exception(
                        f"Can't merge table, duplicate column name: {source_key}."
                    )
                column_map_final[source_key] = source_key

        else:
            raise KiaraProcessingException(
                f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
            )

        len_dict[source_key] = rows

    all_rows = None
    for source_key, rows in len_dict.items():
        if all_rows is None:
            all_rows = rows
        else:
            if all_rows != rows:
                all_rows = None
                break

    if all_rows is None:
        len_str = ""
        for name, rows in len_dict.items():
            len_str = f" {name} ({rows})"

        raise KiaraProcessingException(
            f"Can't merge table, sources have different lengths: {len_str}"
        )

    column_names = []
    columns = []
    for column_name, ref in column_map_final.items():
        column_names.append(column_name)
        column = arrays[ref]
        columns.append(column)

    table = pa.Table.from_arrays(arrays=columns, names=column_names)

    outputs.set_value("table", table)
QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.

Source code in tabular/modules/table/__init__.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table.

    The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

    If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
    one will be used.
    """

    _module_type_name = "query.table"
    _config_cls = QueryTableSQLModuleConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {
            "table": {
                "type": "table",
                "doc": "The table to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
            inputs["relation_name"] = {
                "type": "string",
                "doc": "The name the table is referred to in the sql query.",
                "default": "data",
            }

        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import duckdb

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
            _relation_name: str = inputs.get_value_data("relation_name")
        else:
            _query = self.get_config_value("query")
            _relation_name = self.get_config_value("relation_name")

        if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
            raise KiaraProcessingException(
                f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
            )

        _table: KiaraTable = inputs.get_value_data("table")
        rel_from_arrow = duckdb.arrow(_table.arrow_table)
        result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

        outputs.set_value("query_result", result.arrow())
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
Attributes
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {
        "table": {
            "type": "table",
            "doc": "The table to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
        inputs["relation_name"] = {
            "type": "string",
            "doc": "The name the table is referred to in the sql query.",
            "default": "data",
        }

    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import duckdb

    if self.get_config_value("query") is None:
        _query: str = inputs.get_value_data("query")
        _relation_name: str = inputs.get_value_data("relation_name")
    else:
        _query = self.get_config_value("query")
        _relation_name = self.get_config_value("relation_name")

    if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
        raise KiaraProcessingException(
            f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
        )

    _table: KiaraTable = inputs.get_value_data("table")
    rel_from_arrow = duckdb.arrow(_table.arrow_table)
    result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

    outputs.set_value("query_result", result.arrow())
QueryTableSQLModuleConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
Attributes
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

RenderTableModule (RenderTableModuleBase)
Source code in tabular/modules/table/__init__.py
class RenderTableModule(RenderTableModuleBase):
    _module_type_name = "render.table"

    def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )

    def render__table__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )
render__table__as__string(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )
render__table__as__terminal_renderable(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )
RenderTableModuleBase (RenderValueModule)
Source code in tabular/modules/table/__init__.py
class RenderTableModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_table(
        self, value: Value, input_number_of_rows: int, input_row_offset: int
    ):

        import duckdb
        import pyarrow as pa

        if value.data_type_name == "array":
            array: KiaraArray = value.data
            arrow_table = pa.table(data=[array.arrow_array], names=["array"])
            column_names: Iterable[str] = ["array"]
        else:
            table: KiaraTable = value.data
            arrow_table = table.arrow_table
            column_names = table.column_names

        columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

        query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

        rel_from_arrow = duckdb.arrow(arrow_table)
        query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

        result_table = query_result.arrow()
        wrap = ArrowTabularWrap(table=result_table)

        related_scenes: Dict[str, Union[None, RenderScene]] = {}

        row_offset = arrow_table.num_rows - input_number_of_rows

        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < arrow_table.num_rows:
                next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(arrow_table.num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                    },
                )
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None
            related_scenes["next"] = None
            related_scenes["last"] = None

        return wrap, related_scenes
preprocess_table(self, value, input_number_of_rows, input_row_offset)
Source code in tabular/modules/table/__init__.py
def preprocess_table(
    self, value: Value, input_number_of_rows: int, input_row_offset: int
):

    import duckdb
    import pyarrow as pa

    if value.data_type_name == "array":
        array: KiaraArray = value.data
        arrow_table = pa.table(data=[array.arrow_array], names=["array"])
        column_names: Iterable[str] = ["array"]
    else:
        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        column_names = table.column_names

    columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

    query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

    rel_from_arrow = duckdb.arrow(arrow_table)
    query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

    result_table = query_result.arrow()
    wrap = ArrowTabularWrap(table=result_table)

    related_scenes: Dict[str, Union[None, RenderScene]] = {}

    row_offset = arrow_table.num_rows - input_number_of_rows

    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < arrow_table.num_rows:
            next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(arrow_table.num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                },
            )
    else:
        related_scenes["first"] = None
        related_scenes["previous"] = None
        related_scenes["next"] = None
        related_scenes["last"] = None

    return wrap, related_scenes
filters
TableFiltersModule (FilterModule)
Source code in tabular/modules/table/filters.py
class TableFiltersModule(FilterModule):

    _module_type_name = "table.filters"

    @classmethod
    def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

        return "table"

    def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

        if filter_name in ["select_columns", "drop_columns"]:

            return {
                "columns": {
                    "type": "list",
                    "doc": "The name of the columns to include.",
                    "optional": True,
                },
                "ignore_invalid_column_names": {
                    "type": "boolean",
                    "doc": "Whether to ignore invalid column names.",
                    "default": True,
                },
            }

        return None

    def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names = filter_inputs["columns"]

        if not column_names:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        _column_names = []
        _columns = []

        for column_name in column_names:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names_to_ignore = filter_inputs["columns"]

        if not column_names_to_ignore:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table

        for column_name in column_names_to_ignore:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

        _column_names = []
        _columns = []
        for column_name in arrow_table.column_names:

            if column_name in column_names_to_ignore:
                continue

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

        pass
create_filter_inputs(self, filter_name)
Source code in tabular/modules/table/filters.py
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

    if filter_name in ["select_columns", "drop_columns"]:

        return {
            "columns": {
                "type": "list",
                "doc": "The name of the columns to include.",
                "optional": True,
            },
            "ignore_invalid_column_names": {
                "type": "boolean",
                "doc": "Whether to ignore invalid column names.",
                "default": True,
            },
        }

    return None
filter__drop_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names_to_ignore = filter_inputs["columns"]

    if not column_names_to_ignore:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table

    for column_name in column_names_to_ignore:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

    _column_names = []
    _columns = []
    for column_name in arrow_table.column_names:

        if column_name in column_names_to_ignore:
            continue

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names = filter_inputs["columns"]

    if not column_names:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table
    _column_names = []
    _columns = []

    for column_name in column_names:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_rows(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

    pass
retrieve_supported_type() classmethod
Source code in tabular/modules/table/filters.py
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

    return "table"