Skip to content

kiara_modules.core.table

ConvertToTableModule

Convert an Arrow table.

This module supportes two conversion targets currently:

  • bytes: a memoryview of the byte-representation of the Table
  • string: the base64-encoded byte-representation of the Table

CutColumnModule

Cut off one column from a table, returning an array.

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "array": {"type": "array", "doc": "The column."}
    }
    return outputs

ExportArrowTable

Export a table object to disk.

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The table object."},
        "path": {
            "type": "string",
            "doc": "The path to the file to write.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
        "force_overwrite": {
            "type": "boolean",
            "doc": "Whether to overwrite an existing file.",
            "default": False,
        },
        "compression": {
            "type": "string",
            "doc": "The compression to use. Use either: 'zstd' (default), 'lz4', or 'uncompressed'.",
            "default": "zstd",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "load_config": {
            "type": "load_config",
            "doc": "The configuration to use with kiara to load the saved value.",
        }
    }

    return outputs

LoadArrowTable

Load a table object from disk.

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Mapping[str, typing.Any] = {
        "base_path": {
            "type": "string",
            "doc": "The path to the folder that contains the table file.",
        },
        "rel_path": {
            "type": "string",
            "doc": "The relative path to the table file within base_path.",
        },
        "format": {
            "type": "string",
            "doc": "The format of the table file ('feather' or 'parquet').",
            "default": "feather",
        },
    }
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs: typing.Mapping[str, typing.Any] = {
        "table": {"type": "table", "doc": "The pyarrow table object."}
    }
    return outputs

MapColumnModule

Map the items of one column of a table onto an array, using another module.

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs: typing.Dict[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ] = {
        "table": {
            "type": "table",
            "doc": "The table to use as input.",
        },
        "column_name": {
            "type": "string",
            "doc": "The name of the table column to run the mapping operation on.",
        },
    }
    for input_name, schema in self.child_module.input_schemas.items():
        assert input_name != "table"
        assert input_name != "column_name"
        if input_name == self.module_input_name:
            continue
        inputs[input_name] = schema
    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {
        "array": {
            "type": "array",
            "doc": "An array of equal length to the input array, containing the 'mapped' values.",
        }
    }
    return outputs

module_instance_doc(self)

Return documentation for this instance of the module.

If not overwritten, will return this class' method doc().

Source code in core/table/__init__.py
def module_instance_doc(self) -> str:

    config: MapColumnsModuleConfig = self.config  # type: ignore

    module_type = config.module_type
    module_config = config.module_config

    m = self._kiara.create_module(
        id="map_column_child", module_type=module_type, module_config=module_config
    )
    type_md = m.get_type_metadata()
    doc = type_md.documentation.full_doc
    link = type_md.context.get_url_for_reference("module_doc")
    if not link:
        link_str = f"``{module_type}``"
    else:
        link_str = f"[``{module_type}``]({link})"

    result = f"""Map the values of the rows of an input table onto a new array of the same length, using the {link_str} module."""

    if doc and doc != "-- n/a --":
        result = result + f"\n\n``{module_type}`` documentation:\n\n{doc}"
    return result

MapColumnsModuleConfig pydantic-model

input_name: str pydantic-field

The name of the input name of the module which will receive the rows from our input table. Can be omitted if the configured module only has a single input.

module_config: Dict[str, Any] pydantic-field

The config for the kiara filter module.

module_type: str pydantic-field required

The name of the kiara module to use to filter the input data.

output_name: str pydantic-field

The name of the output name of the module which will receive the items from our input array. Can be omitted if the configured module only has a single output.

MergeTableModule

Create a table from other tables and/or arrays.

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    input_schema_dict = self.get_config_value("input_schema")
    return input_schema_dict

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/table/__init__.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs

MergeTableModuleConfig pydantic-model

input_schema: Dict[str, Any] pydantic-field required

A dict describing the inputs for this merge process.

SampleTableModule

Sample a table.

Samples are used to randomly select a subset of a dataset, which helps test queries and workflows on smaller versions of the original data, to adjust parameters before a full run.

get_value_type() classmethod

Return the value type for this sample module.

Source code in core/table/__init__.py
@classmethod
def get_value_type(cls) -> str:
    return "table"

SaveArrowTableConfig pydantic-model

compression: str pydantic-field

The compression to use when saving the table.

StoreArrowTable

store_value(self, value, base_path)

Save the value, and return the load config needed to load it again.

Source code in core/table/__init__.py
def store_value(self, value: Value, base_path: str) -> typing.Dict[str, typing.Any]:

    import pyarrow as pa
    from pyarrow import feather

    table: pa.Table = value.get_value_data()
    full_path: str = os.path.join(base_path, DEFAULT_SAVE_TABLE_FILE_NAME)

    if os.path.exists(full_path):
        raise KiaraProcessingException(
            f"Can't save table, file already exists: {full_path}"
        )

    os.makedirs(os.path.dirname(full_path), exist_ok=True)

    compression = self.get_config_value("compression")

    feather.write_feather(table, full_path, compression=compression)

    result = {
        "module_type": "table.load",
        "base_path_input_name": "base_path",
        "inputs": {
            "base_path": os.path.dirname(full_path),
            "rel_path": os.path.basename(full_path),
            "format": "feather",
        },
        "output_name": "table",
    }
    return result

TableConversionModuleConfig pydantic-model

ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

TableMetadataModule

Extract metadata from a table object.