Skip to content

Top-level package for kiara_plugin.tabular.

KIARA_METADATA

find_data_types: Union[Type, Tuple, Callable]

find_model_classes: Union[Type, Tuple, Callable]

find_modules: Union[Type, Tuple, Callable]

find_pipelines: Union[Type, Tuple, Callable]

get_version()

Source code in tabular/__init__.py
def get_version():
    from pkg_resources import DistributionNotFound, get_distribution

    try:
        # Change here if project is renamed and does not equal the package name
        dist_name = __name__
        __version__ = get_distribution(dist_name).version
    except DistributionNotFound:

        try:
            version_file = os.path.join(os.path.dirname(__file__), "version.txt")

            if os.path.exists(version_file):
                with open(version_file, encoding="utf-8") as vf:
                    __version__ = vf.read()
            else:
                __version__ = "unknown"

        except (Exception):
            pass

        if __version__ is None:
            __version__ = "unknown"

    return __version__

Modules

data_types special

This module contains the value type classes that are used in the kiara_plugin.tabular package.

Modules

array
Classes
ArrayType (AnyType)

An array, in most cases used as a column within a table.

Internally, this type uses the KiaraArray wrapper class to manage array data. This wrapper class, in turn, uses an Apache Arrow Array to store the data in memory (and on disk).

Source code in tabular/data_types/array.py
class ArrayType(AnyType[KiaraArray, DataTypeConfig]):
    """An array, in most cases used as a column within a table.

    Internally, this type uses the [KiaraArray][kiara_plugin.tabular.models.array.KiaraArray] wrapper class to manage array data. This wrapper class, in turn, uses an [Apache Arrow](https://arrow.apache.org) [Array](https://arrow.apache.org/docs/python/generated/pyarrow.Array.html#pyarrow.Array) to store the data in memory (and on disk).
    """

    _data_type_name = "array"

    @classmethod
    def python_class(cls) -> Type:
        return KiaraArray

    def parse_python_obj(self, data: Any) -> KiaraArray:

        return KiaraArray.create_array(data)

    def _validate(cls, value: Any) -> None:

        if not isinstance(value, (KiaraArray)):
            raise Exception(
                f"Invalid type '{type(value).__name__}', must be an instance of the 'KiaraArray' class."
            )

    def serialize(self, data: KiaraArray) -> SerializedData:

        import pyarrow as pa

        # TODO: make sure temp dir is in the same partition as file store
        temp_f = tempfile.mkdtemp()

        def cleanup():
            shutil.rmtree(temp_f, ignore_errors=True)

        atexit.register(cleanup)

        column: pa.Array = data.arrow_array
        file_name = os.path.join(temp_f, "array.arrow")

        store_array(array_obj=column, file_name=file_name, column_name="array")

        chunks = {"array.arrow": {"type": "file", "codec": "raw", "file": file_name}}

        serialized_data = {
            "data_type": self.data_type_name,
            "data_type_config": self.type_config.dict(),
            "data": chunks,
            "serialization_profile": "feather",
            "metadata": {
                "environment": {},
                "deserialize": {
                    "python_object": {
                        "module_type": "load.array",
                        "module_config": {
                            "value_type": "array",
                            "target_profile": "python_object",
                            "serialization_profile": "feather",
                        },
                    }
                },
            },
        }

        serialized = SerializationResult(**serialized_data)
        return serialized

    def pretty_print_as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ) -> Any:

        max_rows = render_config.get(
            "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
        )
        max_row_height = render_config.get(
            "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
        )
        max_cell_length = render_config.get(
            "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
        )

        half_lines: Union[int, None] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        import pyarrow as pa

        array: pa.Array = value.data.arrow_array

        temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
        atw = ArrowTabularWrap(temp_table)
        result = atw.as_terminal_renderable(
            rows_head=half_lines,
            rows_tail=half_lines,
            max_row_height=max_row_height,
            max_cell_length=max_cell_length,
            show_table_header=False,
        )

        return result
Methods
parse_python_obj(self, data)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
KiaraArray

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in tabular/data_types/array.py
def parse_python_obj(self, data: Any) -> KiaraArray:

    return KiaraArray.create_array(data)
pretty_print_as__terminal_renderable(self, value, render_config)
Source code in tabular/data_types/array.py
def pretty_print_as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
) -> Any:

    max_rows = render_config.get(
        "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
    )
    max_row_height = render_config.get(
        "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
    )
    max_cell_length = render_config.get(
        "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
    )

    half_lines: Union[int, None] = None
    if max_rows:
        half_lines = int(max_rows / 2)

    import pyarrow as pa

    array: pa.Array = value.data.arrow_array

    temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
    atw = ArrowTabularWrap(temp_table)
    result = atw.as_terminal_renderable(
        rows_head=half_lines,
        rows_tail=half_lines,
        max_row_height=max_row_height,
        max_cell_length=max_cell_length,
        show_table_header=False,
    )

    return result
python_class() classmethod
Source code in tabular/data_types/array.py
@classmethod
def python_class(cls) -> Type:
    return KiaraArray
serialize(self, data)
Source code in tabular/data_types/array.py
def serialize(self, data: KiaraArray) -> SerializedData:

    import pyarrow as pa

    # TODO: make sure temp dir is in the same partition as file store
    temp_f = tempfile.mkdtemp()

    def cleanup():
        shutil.rmtree(temp_f, ignore_errors=True)

    atexit.register(cleanup)

    column: pa.Array = data.arrow_array
    file_name = os.path.join(temp_f, "array.arrow")

    store_array(array_obj=column, file_name=file_name, column_name="array")

    chunks = {"array.arrow": {"type": "file", "codec": "raw", "file": file_name}}

    serialized_data = {
        "data_type": self.data_type_name,
        "data_type_config": self.type_config.dict(),
        "data": chunks,
        "serialization_profile": "feather",
        "metadata": {
            "environment": {},
            "deserialize": {
                "python_object": {
                    "module_type": "load.array",
                    "module_config": {
                        "value_type": "array",
                        "target_profile": "python_object",
                        "serialization_profile": "feather",
                    },
                }
            },
        },
    }

    serialized = SerializationResult(**serialized_data)
    return serialized
Functions
store_array(array_obj, file_name, column_name='array')

Utility methdo to stora an array to a file.

Source code in tabular/data_types/array.py
def store_array(array_obj: "pa.Array", file_name: str, column_name: "str" = "array"):
    """Utility methdo to stora an array to a file."""

    import pyarrow as pa
    from pyarrow import ChunkedArray

    schema = pa.schema([pa.field(column_name, array_obj.type)])

    # TODO: support non-single chunk columns
    with pa.OSFile(file_name, "wb") as sink:
        with pa.ipc.new_file(sink, schema=schema) as writer:
            if isinstance(array_obj, ChunkedArray):
                for chunk in array_obj.chunks:
                    batch = pa.record_batch([chunk], schema=schema)
                    writer.write(batch)
            else:
                raise NotImplementedError()
db
Classes
DatabaseType (AnyType)

A database, containing one or several tables.

This is backed by the KiaraDatabase class to manage the stored data.

Source code in tabular/data_types/db.py
class DatabaseType(AnyType[KiaraDatabase, DataTypeConfig]):
    """A database, containing one or several tables.

    This is backed by the [KiaraDatabase][kiara_plugin.tabular.models.db.KiaraDatabase] class to manage
    the stored data.
    """

    _data_type_name = "database"

    @classmethod
    def python_class(self) -> Type[KiaraDatabase]:
        return KiaraDatabase

    def parse_python_obj(self, data: Any) -> KiaraDatabase:

        if isinstance(data, Path):
            data = data.as_posix()

        if isinstance(data, str):
            if not os.path.exists(data):
                raise ValueError(
                    f"Can't create database from path '{data}': path does not exist."
                )

            return KiaraDatabase(db_file_path=data)

        return data

    def _validate(cls, value: Any) -> None:

        if not isinstance(value, (KiaraDatabase)):
            raise ValueError(
                f"Invalid type '{type(value).__name__}', must be an instance of the 'KiaraDatabase' class."
            )

    def serialize(self, data: KiaraDatabase) -> SerializedData:

        chunks = {
            "db.sqlite": {"type": "file", "codec": "raw", "file": data.db_file_path}
        }

        serialized_data = {
            "data_type": self.data_type_name,
            "data_type_config": self.type_config.dict(),
            "data": chunks,
            "serialization_profile": "feather",
            "metadata": {
                "environment": {},
                "deserialize": {
                    "python_object": {
                        "module_type": "load.database",
                        "module_config": {
                            "value_type": self.data_type_name,
                            "target_profile": "python_object",
                            "serialization_profile": "copy",
                        },
                    }
                },
            },
        }

        serialized = SerializationResult(**serialized_data)
        return serialized

    def pretty_print_as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ) -> Any:

        max_rows = render_config.get(
            "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
        )
        max_row_height = render_config.get(
            "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
        )
        max_cell_length = render_config.get(
            "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
        )

        half_lines: Union[int, None] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        db: KiaraDatabase = value.data

        result: List[Any] = [""]
        for table_name in db.table_names:
            atw = SqliteTabularWrap(
                engine=db.get_sqlalchemy_engine(), table_name=table_name
            )
            pretty = atw.as_terminal_renderable(
                rows_head=half_lines,
                rows_tail=half_lines,
                max_row_height=max_row_height,
                max_cell_length=max_cell_length,
            )
            result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
            result.append(pretty)

        return Group(*result)
Methods
parse_python_obj(self, data)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
KiaraDatabase

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in tabular/data_types/db.py
def parse_python_obj(self, data: Any) -> KiaraDatabase:

    if isinstance(data, Path):
        data = data.as_posix()

    if isinstance(data, str):
        if not os.path.exists(data):
            raise ValueError(
                f"Can't create database from path '{data}': path does not exist."
            )

        return KiaraDatabase(db_file_path=data)

    return data
pretty_print_as__terminal_renderable(self, value, render_config)
Source code in tabular/data_types/db.py
def pretty_print_as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
) -> Any:

    max_rows = render_config.get(
        "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
    )
    max_row_height = render_config.get(
        "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
    )
    max_cell_length = render_config.get(
        "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
    )

    half_lines: Union[int, None] = None
    if max_rows:
        half_lines = int(max_rows / 2)

    db: KiaraDatabase = value.data

    result: List[Any] = [""]
    for table_name in db.table_names:
        atw = SqliteTabularWrap(
            engine=db.get_sqlalchemy_engine(), table_name=table_name
        )
        pretty = atw.as_terminal_renderable(
            rows_head=half_lines,
            rows_tail=half_lines,
            max_row_height=max_row_height,
            max_cell_length=max_cell_length,
        )
        result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
        result.append(pretty)

    return Group(*result)
python_class() classmethod
Source code in tabular/data_types/db.py
@classmethod
def python_class(self) -> Type[KiaraDatabase]:
    return KiaraDatabase
serialize(self, data)
Source code in tabular/data_types/db.py
def serialize(self, data: KiaraDatabase) -> SerializedData:

    chunks = {
        "db.sqlite": {"type": "file", "codec": "raw", "file": data.db_file_path}
    }

    serialized_data = {
        "data_type": self.data_type_name,
        "data_type_config": self.type_config.dict(),
        "data": chunks,
        "serialization_profile": "feather",
        "metadata": {
            "environment": {},
            "deserialize": {
                "python_object": {
                    "module_type": "load.database",
                    "module_config": {
                        "value_type": self.data_type_name,
                        "target_profile": "python_object",
                        "serialization_profile": "copy",
                    },
                }
            },
        },
    }

    serialized = SerializationResult(**serialized_data)
    return serialized
SqliteTabularWrap (TabularWrap)
Source code in tabular/data_types/db.py
class SqliteTabularWrap(TabularWrap):
    def __init__(self, engine: "Engine", table_name: str):
        self._engine: Engine = engine
        self._table_name: str = table_name
        super().__init__()

    def retrieve_number_of_rows(self) -> int:

        from sqlalchemy import text

        with self._engine.connect() as con:
            result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
            num_rows = result.fetchone()[0]

        return num_rows

    def retrieve_column_names(self) -> Iterable[str]:

        from sqlalchemy import inspect

        engine = self._engine
        inspector = inspect(engine)
        columns = inspector.get_columns(self._table_name)
        result = [column["name"] for column in columns]
        return result

    def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"
        if length:
            query = f"{query} LIMIT {length}"
        else:
            query = f"{query} LIMIT {self.num_rows}"
        if offset > 0:
            query = f"{query} OFFSET {offset}"
        with self._engine.connect() as con:
            result = con.execute(text(query))
            result_dict: Dict[str, List[Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return DictTabularWrap(result_dict)

    def to_pydict(self) -> Mapping:

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"

        with self._engine.connect() as con:
            result = con.execute(text(query))
            result_dict: Dict[str, List[Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return result_dict
retrieve_column_names(self)
Source code in tabular/data_types/db.py
def retrieve_column_names(self) -> Iterable[str]:

    from sqlalchemy import inspect

    engine = self._engine
    inspector = inspect(engine)
    columns = inspector.get_columns(self._table_name)
    result = [column["name"] for column in columns]
    return result
retrieve_number_of_rows(self)
Source code in tabular/data_types/db.py
def retrieve_number_of_rows(self) -> int:

    from sqlalchemy import text

    with self._engine.connect() as con:
        result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
        num_rows = result.fetchone()[0]

    return num_rows
slice(self, offset=0, length=None)
Source code in tabular/data_types/db.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"
    if length:
        query = f"{query} LIMIT {length}"
    else:
        query = f"{query} LIMIT {self.num_rows}"
    if offset > 0:
        query = f"{query} OFFSET {offset}"
    with self._engine.connect() as con:
        result = con.execute(text(query))
        result_dict: Dict[str, List[Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return DictTabularWrap(result_dict)
to_pydict(self)
Source code in tabular/data_types/db.py
def to_pydict(self) -> Mapping:

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"

    with self._engine.connect() as con:
        result = con.execute(text(query))
        result_dict: Dict[str, List[Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return result_dict
table
Classes
TableType (AnyType)

Tabular data (table, spreadsheet, data_frame, what have you).

The table data is organized in sets of columns (arrays of data of the same type), with each column having a string identifier.

kiara uses an instance of the KiaraTable class to manage the table data, which let's developers access it in different formats (Apache Arrow Table, Pandas dataframe, Python dict of lists, more to follow...).

Please consult the API doc of the KiaraTable class for more information about how to access and query the data:

Internally, the data is stored in Apache Feather format -- both in memory and on disk when saved, which enables some advanced usage to preserve memory and compute overhead.

Source code in tabular/data_types/table.py
class TableType(AnyType[KiaraTable, DataTypeConfig]):
    """Tabular data (table, spreadsheet, data_frame, what have you).

    The table data is organized in sets of columns (arrays of data of the same type), with each column having a string identifier.

    *kiara* uses an instance of the [`KiaraTable`][kiara_plugin.tabular.models.table.KiaraTable]
    class to manage the table data, which let's developers access it in different formats ([Apache Arrow Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html), [Pandas dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), Python dict of lists, more to follow...).

    Please consult the API doc of the `KiaraTable` class for more information about how to access and query the data:

    - [`KiaraTable` API doc](https://dharpa.org/kiara_plugin.tabular/latest/reference/kiara_plugin/tabular/models/__init__/#kiara_plugin.tabular.models.table.KiaraTable)

    Internally, the data is stored in [Apache Feather format](https://arrow.apache.org/docs/python/feather.html) -- both
    in memory and on disk when saved, which enables some advanced usage to preserve memory and compute overhead.
    """

    _data_type_name = "table"

    @classmethod
    def python_class(cls) -> Type:
        return KiaraTable

    def parse_python_obj(self, data: Any) -> KiaraTable:

        return KiaraTable.create_table(data)

    # def calculate_hash(self, data: KiaraTable) -> CID:
    #     hashes = []
    #     for column_name in data.arrow_table.column_names:
    #         hashes.append(column_name)
    #         column = data.arrow_table.column(column_name)
    #         for chunk in column.chunks:
    #             for buf in chunk.buffers():
    #                 if not buf:
    #                     continue
    #                 h = hash_from_buffer(memoryview(buf))
    #                 hashes.append(h)
    #     return compute_cid(hashes)
    #     return KIARA_HASH_FUNCTION(memoryview(data.arrow_array))

    # def calculate_size(self, data: KiaraTable) -> int:
    #     return len(data.arrow_table)

    def _validate(cls, value: Any) -> None:

        pass

        if not isinstance(value, KiaraTable):
            raise Exception(
                f"invalid type '{type(value).__name__}', must be 'KiaraTable'."
            )

    def serialize(self, data: KiaraTable) -> SerializedData:

        import pyarrow as pa

        chunk_map = {}

        # TODO: make sure temp dir is in the same partition as file store
        temp_f = tempfile.mkdtemp()

        def cleanup():
            shutil.rmtree(temp_f, ignore_errors=True)

        atexit.register(cleanup)

        for column_name in data.arrow_table.column_names:
            column: pa.Array = data.arrow_table.column(column_name)
            if column_name == "":
                file_name = os.path.join(temp_f, EMPTY_COLUMN_NAME_MARKER)
            else:
                file_name = os.path.join(temp_f, column_name)
            store_array(array_obj=column, file_name=file_name, column_name=column_name)
            chunk_map[column_name] = {"type": "file", "file": file_name, "codec": "raw"}

        serialized_data = {
            "data_type": self.data_type_name,
            "data_type_config": self.type_config.dict(),
            "data": chunk_map,
            "serialization_profile": "feather",
            "metadata": {
                "environment": {},
                "deserialize": {
                    "python_object": {
                        "module_type": "load.table",
                        "module_config": {
                            "value_type": "table",
                            "target_profile": "python_object",
                            "serialization_profile": "feather",
                        },
                    }
                },
            },
        }

        serialized = SerializationResult(**serialized_data)
        return serialized

    def pretty_print_as__terminal_renderable(
        self, value: "Value", render_config: Mapping[str, Any]
    ) -> Any:

        max_rows = render_config.get(
            "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
        )
        max_row_height = render_config.get(
            "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
        )
        max_cell_length = render_config.get(
            "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
        )

        half_lines: Union[int, None] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        atw = ArrowTabularWrap(value.data.arrow_table)
        result = atw.as_terminal_renderable(
            rows_head=half_lines,
            rows_tail=half_lines,
            max_row_height=max_row_height,
            max_cell_length=max_cell_length,
        )
        return result
Methods
parse_python_obj(self, data)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
KiaraTable

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in tabular/data_types/table.py
def parse_python_obj(self, data: Any) -> KiaraTable:

    return KiaraTable.create_table(data)
pretty_print_as__terminal_renderable(self, value, render_config)
Source code in tabular/data_types/table.py
def pretty_print_as__terminal_renderable(
    self, value: "Value", render_config: Mapping[str, Any]
) -> Any:

    max_rows = render_config.get(
        "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
    )
    max_row_height = render_config.get(
        "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
    )
    max_cell_length = render_config.get(
        "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
    )

    half_lines: Union[int, None] = None
    if max_rows:
        half_lines = int(max_rows / 2)

    atw = ArrowTabularWrap(value.data.arrow_table)
    result = atw.as_terminal_renderable(
        rows_head=half_lines,
        rows_tail=half_lines,
        max_row_height=max_row_height,
        max_cell_length=max_cell_length,
    )
    return result
python_class() classmethod
Source code in tabular/data_types/table.py
@classmethod
def python_class(cls) -> Type:
    return KiaraTable
serialize(self, data)
Source code in tabular/data_types/table.py
def serialize(self, data: KiaraTable) -> SerializedData:

    import pyarrow as pa

    chunk_map = {}

    # TODO: make sure temp dir is in the same partition as file store
    temp_f = tempfile.mkdtemp()

    def cleanup():
        shutil.rmtree(temp_f, ignore_errors=True)

    atexit.register(cleanup)

    for column_name in data.arrow_table.column_names:
        column: pa.Array = data.arrow_table.column(column_name)
        if column_name == "":
            file_name = os.path.join(temp_f, EMPTY_COLUMN_NAME_MARKER)
        else:
            file_name = os.path.join(temp_f, column_name)
        store_array(array_obj=column, file_name=file_name, column_name=column_name)
        chunk_map[column_name] = {"type": "file", "file": file_name, "codec": "raw"}

    serialized_data = {
        "data_type": self.data_type_name,
        "data_type_config": self.type_config.dict(),
        "data": chunk_map,
        "serialization_profile": "feather",
        "metadata": {
            "environment": {},
            "deserialize": {
                "python_object": {
                    "module_type": "load.table",
                    "module_config": {
                        "value_type": "table",
                        "target_profile": "python_object",
                        "serialization_profile": "feather",
                    },
                }
            },
        },
    }

    serialized = SerializationResult(**serialized_data)
    return serialized

defaults

Attributes

DEFAULT_TABULAR_DATA_CHUNK_SIZE
KIARA_PLUGIN_TABULAR_BASE_FOLDER

Marker to indicate the base folder for the kiara network module package.

KIARA_PLUGIN_TABULAR_RESOURCES_FOLDER

Default resources folder for this package.

RESERVED_SQL_KEYWORDS
SQLALCHEMY_SQLITE_TYPE_MAP: Dict[Type, Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB']]
SQLITE_DATA_TYPE: Tuple[Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB'], ...]
SQLITE_SQLALCHEMY_TYPE_MAP: Dict[Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB'], Type]
SqliteDataType
TEMPLATES_FOLDER

models special

This module contains the metadata (and other) models that are used in the kiara_plugin.tabular package.

Those models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata -- but also other type of models -- that is attached to data, as well as kiara modules.

Metadata models must be a sub-class of [kiara.metadata.MetadataModel][]. Other models usually sub-class a pydantic BaseModel or implement custom base classes.

Classes

ColumnSchema (BaseModel) pydantic-model

Describes properties of a single column of the 'table' data type.

Source code in tabular/models/__init__.py
class ColumnSchema(BaseModel):
    """Describes properties of a single column of the 'table' data type."""

    type_name: str = Field(
        description="The type name of the column (backend-specific)."
    )
    metadata: Dict[str, Any] = Field(
        description="Other metadata for the column.", default_factory=dict
    )
Attributes
metadata: Dict[str, Any] pydantic-field

Other metadata for the column.

type_name: str pydantic-field required

The type name of the column (backend-specific).

TableMetadata (KiaraModel) pydantic-model

Describes properties for the 'table' data type.

Source code in tabular/models/__init__.py
class TableMetadata(KiaraModel):
    """Describes properties for the 'table' data type."""

    column_names: List[str] = Field(description="The name of the columns of the table.")
    column_schema: Dict[str, ColumnSchema] = Field(
        description="The schema description of the table."
    )
    rows: int = Field(description="The number of rows the table contains.")
    size: Union[int, None] = Field(
        description="The tables size in bytes.", default=None
    )

    def _retrieve_data_to_hash(self) -> Any:

        return {
            "column_schemas": {k: v.dict() for k, v in self.column_schema.items()},
            "rows": self.rows,
            "size": self.size,
        }
Attributes
column_names: List[str] pydantic-field required

The name of the columns of the table.

column_schema: Dict[str, kiara_plugin.tabular.models.ColumnSchema] pydantic-field required

The schema description of the table.

rows: int pydantic-field required

The number of rows the table contains.

size: int pydantic-field

The tables size in bytes.

Modules

array
Classes
KiaraArray (KiaraModel) pydantic-model

A class to manage array-like data.

Internally, this uses an Apache Arrow Array to handle the data in memory and on disk.

Source code in tabular/models/array.py
class KiaraArray(KiaraModel):
    """A class to manage array-like data.

    Internally, this uses an [Apache Arrow Array](https://arrow.apache.org/docs/python/generated/pyarrow.Array.html#pyarrow.Array) to handle the data in memory and on disk.
    """

    # @classmethod
    # def create_in_temp_dir(cls, ):
    #
    #     temp_f = tempfile.mkdtemp()
    #     file_path = os.path.join(temp_f, "array.feather")
    #
    #     def cleanup():
    #         shutil.rmtree(file_path, ignore_errors=True)
    #
    #     atexit.register(cleanup)
    #
    #     array_obj = cls(feather_path=file_path)
    #     return array_obj

    @classmethod
    def create_array(cls, data: Any) -> "KiaraArray":

        if isinstance(data, KiaraArray):
            return data

        array_obj = None
        if isinstance(data, (pa.Array, pa.ChunkedArray)):
            array_obj = data
        elif isinstance(data, pa.Table):
            if len(data.columns) != 1:
                raise Exception(
                    f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(data.columns)} columns."
                )
            array_obj = data.column(0)
        else:
            try:
                array_obj = pa.array(data)
            except Exception:
                pass

        if array_obj is None:
            raise Exception(
                f"Can't create table, invalid source data type: {type(data)}."
            )

        obj = KiaraArray()
        if not isinstance(array_obj, pa.lib.ChunkedArray):
            array_obj = pa.chunked_array(array_obj)
        obj._array_obj = array_obj
        return obj

    data_path: Union[str, None] = Field(
        description="The path to the (feather) file backing this array.", default=None
    )

    _array_obj: pa.Array = PrivateAttr(default=None)

    def _retrieve_data_to_hash(self) -> Any:
        raise NotImplementedError()

    def __len__(self):
        return len(self.arrow_array)

    @property
    def arrow_array(self) -> pa.Array:

        if self._array_obj is not None:
            return self._array_obj

        if not self.data_path:
            raise Exception("Can't retrieve array data, object not initialized (yet).")

        with pa.memory_map(self.data_path, "r") as source:
            table: pa.Table = pa.ipc.open_file(source).read_all()

        if len(table.columns) != 1:
            raise Exception(
                f"Invalid serialized array data, only a single-column Table is allowed. This value is a table with {len(table.columns)} columns."
            )

        self._array_obj = table.column(0)
        return self._array_obj

    def to_pylist(self):
        return self.arrow_array.to_pylist()

    def to_pandas(self):
        return self.arrow_array.to_pandas()
Attributes
arrow_array: Array property readonly
data_path: str pydantic-field

The path to the (feather) file backing this array.

create_array(data) classmethod
Source code in tabular/models/array.py
@classmethod
def create_array(cls, data: Any) -> "KiaraArray":

    if isinstance(data, KiaraArray):
        return data

    array_obj = None
    if isinstance(data, (pa.Array, pa.ChunkedArray)):
        array_obj = data
    elif isinstance(data, pa.Table):
        if len(data.columns) != 1:
            raise Exception(
                f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(data.columns)} columns."
            )
        array_obj = data.column(0)
    else:
        try:
            array_obj = pa.array(data)
        except Exception:
            pass

    if array_obj is None:
        raise Exception(
            f"Can't create table, invalid source data type: {type(data)}."
        )

    obj = KiaraArray()
    if not isinstance(array_obj, pa.lib.ChunkedArray):
        array_obj = pa.chunked_array(array_obj)
    obj._array_obj = array_obj
    return obj
to_pandas(self)
Source code in tabular/models/array.py
def to_pandas(self):
    return self.arrow_array.to_pandas()
to_pylist(self)
Source code in tabular/models/array.py
def to_pylist(self):
    return self.arrow_array.to_pylist()
db
Classes
DatabaseMetadata (ValueMetadata) pydantic-model

Database and table properties.

Source code in tabular/models/db.py
class DatabaseMetadata(ValueMetadata):
    """Database and table properties."""

    _metadata_key = "database"

    @classmethod
    def retrieve_supported_data_types(cls) -> Iterable[str]:
        return ["database"]

    @classmethod
    def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

        database: KiaraDatabase = value.data

        insp = database.get_sqlalchemy_inspector()

        mds = {}

        for table_name in insp.get_table_names():

            with database.get_sqlalchemy_engine().connect() as con:
                result = con.execute(text(f"SELECT count(*) from {table_name}"))
                num_rows = result.fetchone()[0]

                try:
                    result = con.execute(
                        text(
                            f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                        )
                    )
                    size: Union[int, None] = result.fetchone()[0]
                except Exception:
                    size = None

            columns = {}
            for column in insp.get_columns(table_name=table_name):
                name = column["name"]
                _type = column["type"]
                type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
                columns[name] = {
                    "type_name": type_name,
                    "metadata": {
                        "nullable": column["nullable"],
                        "primary_key": True if column["primary_key"] else False,
                    },
                }

            schema = {
                "column_names": list(columns.keys()),
                "column_schema": columns,
                "rows": num_rows,
                "size": size,
            }

            md = TableMetadata(**schema)
            mds[table_name] = md

        return DatabaseMetadata.construct(tables=mds)

    tables: Dict[str, TableMetadata] = Field(description="The table schema.")
Attributes
tables: Dict[str, kiara_plugin.tabular.models.TableMetadata] pydantic-field required

The table schema.

create_value_metadata(value) classmethod
Source code in tabular/models/db.py
@classmethod
def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

    database: KiaraDatabase = value.data

    insp = database.get_sqlalchemy_inspector()

    mds = {}

    for table_name in insp.get_table_names():

        with database.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(f"SELECT count(*) from {table_name}"))
            num_rows = result.fetchone()[0]

            try:
                result = con.execute(
                    text(
                        f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                    )
                )
                size: Union[int, None] = result.fetchone()[0]
            except Exception:
                size = None

        columns = {}
        for column in insp.get_columns(table_name=table_name):
            name = column["name"]
            _type = column["type"]
            type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
            columns[name] = {
                "type_name": type_name,
                "metadata": {
                    "nullable": column["nullable"],
                    "primary_key": True if column["primary_key"] else False,
                },
            }

        schema = {
            "column_names": list(columns.keys()),
            "column_schema": columns,
            "rows": num_rows,
            "size": size,
        }

        md = TableMetadata(**schema)
        mds[table_name] = md

    return DatabaseMetadata.construct(tables=mds)
retrieve_supported_data_types() classmethod
Source code in tabular/models/db.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
    return ["database"]
KiaraDatabase (KiaraModel) pydantic-model

A wrapper class to manage a sqlite database.

Source code in tabular/models/db.py
class KiaraDatabase(KiaraModel):
    """A wrapper class to manage a sqlite database."""

    @classmethod
    def create_in_temp_dir(
        cls,
        init_statement: Union[None, str, "TextClause"] = None,
        init_data: Union[Mapping[str, Any], None] = None,
    ):

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(db_file_path=db_path)
        db.create_if_not_exists()

        if init_statement:
            db._unlock_db()
            db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
            db._lock_db()

        return db

    db_file_path: str = Field(description="The path to the sqlite database file.")

    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _tables: Dict[str, Table] = PrivateAttr(default_factory=dict)
    _metadata_obj: Union[MetaData, None] = PrivateAttr(default=None)
    # _table_schemas: Optional[Dict[str, SqliteTableSchema]] = PrivateAttr(default=None)
    # _file_hash: Optional[str] = PrivateAttr(default=None)
    _file_cid: Union[CID, None] = PrivateAttr(default=None)
    _lock: bool = PrivateAttr(default=True)
    _immutable: bool = PrivateAttr(default=None)

    def _retrieve_id(self) -> str:
        return str(self.file_cid)

    def _retrieve_data_to_hash(self) -> Any:
        return self.file_cid

    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):

        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path

    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"

    @property
    def file_cid(self) -> CID:

        if self._file_cid is not None:
            return self._file_cid

        self._file_cid = compute_cid_from_file(file=self.db_file_path, codec="raw")
        return self._file_cid

    def get_sqlalchemy_engine(self) -> "Engine":

        if self._cached_engine is not None:
            return self._cached_engine

        def _pragma_on_connect(dbapi_con, con_record):
            dbapi_con.execute("PRAGMA query_only = ON")

        self._cached_engine = create_engine(self.db_url, future=True)

        if self._lock:
            event.listen(self._cached_engine, "connect", _pragma_on_connect)

        return self._cached_engine

    def _lock_db(self):
        self._lock = True
        self._invalidate()

    def _unlock_db(self):
        if self._immutable:
            raise Exception("Can't unlock db, it's immutable.")
        self._lock = False
        self._invalidate()

    def create_if_not_exists(self):

        from sqlalchemy_utils import create_database, database_exists

        if not database_exists(self.db_url):
            create_database(self.db_url)

    def execute_sql(
        self,
        statement: Union[str, "TextClause"],
        data: Union[Mapping[str, Any], None] = None,
        invalidate: bool = False,
    ):
        """Execute an sql script.

        Arguments:
          statement: the sql statement
          data: (optional) data, to be bound to the statement
          invalidate: whether to invalidate cached values within this object
        """

        if isinstance(statement, str):
            statement = text(statement)

        if data:
            statement.bindparams(**data)

        with self.get_sqlalchemy_engine().connect() as con:
            con.execute(statement)

        if invalidate:
            self._invalidate()

    def _invalidate(self):
        self._cached_engine = None
        self._cached_inspector = None
        self._table_names = None
        # self._file_hash = None
        self._metadata_obj = None
        self._tables.clear()

    def _invalidate_other(self):
        pass

    def get_sqlalchemy_metadata(self) -> MetaData:
        """Return the sqlalchemy Metadtaa object for the underlying database.

        This is used internally, you typically don't need to access this attribute.

        """

        if self._metadata_obj is None:
            self._metadata_obj = MetaData()
        return self._metadata_obj

    def copy_database_file(self, target: str):

        os.makedirs(os.path.dirname(target))

        shutil.copy2(self.db_file_path, target)

        new_db = KiaraDatabase(db_file_path=target)
        # if self._file_hash:
        #     new_db._file_hash = self._file_hash
        return new_db

    def get_sqlalchemy_inspector(self) -> Inspector:

        if self._cached_inspector is not None:
            return self._cached_inspector

        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector

    @property
    def table_names(self) -> Iterable[str]:
        if self._table_names is not None:
            return self._table_names

        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names

    def get_sqlalchemy_table(self, table_name: str) -> Table:
        """Return the sqlalchemy edges table instance for this network datab."""

        if table_name in self._tables.keys():
            return self._tables[table_name]

        table = Table(
            table_name,
            self.get_sqlalchemy_metadata(),
            autoload_with=self.get_sqlalchemy_engine(),
        )
        self._tables[table_name] = table
        return table
Attributes
db_file_path: str pydantic-field required

The path to the sqlite database file.

db_url: str property readonly
file_cid: CID property readonly
table_names: Iterable[str] property readonly
Methods
copy_database_file(self, target)
Source code in tabular/models/db.py
def copy_database_file(self, target: str):

    os.makedirs(os.path.dirname(target))

    shutil.copy2(self.db_file_path, target)

    new_db = KiaraDatabase(db_file_path=target)
    # if self._file_hash:
    #     new_db._file_hash = self._file_hash
    return new_db
create_if_not_exists(self)
Source code in tabular/models/db.py
def create_if_not_exists(self):

    from sqlalchemy_utils import create_database, database_exists

    if not database_exists(self.db_url):
        create_database(self.db_url)
create_in_temp_dir(init_statement=None, init_data=None) classmethod
Source code in tabular/models/db.py
@classmethod
def create_in_temp_dir(
    cls,
    init_statement: Union[None, str, "TextClause"] = None,
    init_data: Union[Mapping[str, Any], None] = None,
):

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = cls(db_file_path=db_path)
    db.create_if_not_exists()

    if init_statement:
        db._unlock_db()
        db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
        db._lock_db()

    return db
ensure_absolute_path(path) classmethod
Source code in tabular/models/db.py
@validator("db_file_path", allow_reuse=True)
def ensure_absolute_path(cls, path: str):

    path = os.path.abspath(path)
    if not os.path.exists(os.path.dirname(path)):
        raise ValueError(f"Parent folder for database file does not exist: {path}")
    return path
execute_sql(self, statement, data=None, invalidate=False)

Execute an sql script.

Parameters:

Name Type Description Default
statement Union[str, TextClause]

the sql statement

required
data Optional[Mapping[str, Any]]

(optional) data, to be bound to the statement

None
invalidate bool

whether to invalidate cached values within this object

False
Source code in tabular/models/db.py
def execute_sql(
    self,
    statement: Union[str, "TextClause"],
    data: Union[Mapping[str, Any], None] = None,
    invalidate: bool = False,
):
    """Execute an sql script.

    Arguments:
      statement: the sql statement
      data: (optional) data, to be bound to the statement
      invalidate: whether to invalidate cached values within this object
    """

    if isinstance(statement, str):
        statement = text(statement)

    if data:
        statement.bindparams(**data)

    with self.get_sqlalchemy_engine().connect() as con:
        con.execute(statement)

    if invalidate:
        self._invalidate()
get_sqlalchemy_engine(self)
Source code in tabular/models/db.py
def get_sqlalchemy_engine(self) -> "Engine":

    if self._cached_engine is not None:
        return self._cached_engine

    def _pragma_on_connect(dbapi_con, con_record):
        dbapi_con.execute("PRAGMA query_only = ON")

    self._cached_engine = create_engine(self.db_url, future=True)

    if self._lock:
        event.listen(self._cached_engine, "connect", _pragma_on_connect)

    return self._cached_engine
get_sqlalchemy_inspector(self)
Source code in tabular/models/db.py
def get_sqlalchemy_inspector(self) -> Inspector:

    if self._cached_inspector is not None:
        return self._cached_inspector

    self._cached_inspector = inspect(self.get_sqlalchemy_engine())
    return self._cached_inspector
get_sqlalchemy_metadata(self)

Return the sqlalchemy Metadtaa object for the underlying database.

This is used internally, you typically don't need to access this attribute.

Source code in tabular/models/db.py
def get_sqlalchemy_metadata(self) -> MetaData:
    """Return the sqlalchemy Metadtaa object for the underlying database.

    This is used internally, you typically don't need to access this attribute.

    """

    if self._metadata_obj is None:
        self._metadata_obj = MetaData()
    return self._metadata_obj
get_sqlalchemy_table(self, table_name)

Return the sqlalchemy edges table instance for this network datab.

Source code in tabular/models/db.py
def get_sqlalchemy_table(self, table_name: str) -> Table:
    """Return the sqlalchemy edges table instance for this network datab."""

    if table_name in self._tables.keys():
        return self._tables[table_name]

    table = Table(
        table_name,
        self.get_sqlalchemy_metadata(),
        autoload_with=self.get_sqlalchemy_engine(),
    )
    self._tables[table_name] = table
    return table
SqliteTableSchema (BaseModel) pydantic-model
Source code in tabular/models/db.py
class SqliteTableSchema(BaseModel):

    columns: Dict[str, SqliteDataType] = Field(
        description="The table columns and their attributes."
    )
    index_columns: List[str] = Field(
        description="The columns to index", default_factory=list
    )
    nullable_columns: List[str] = Field(
        description="The columns that are nullable.", default_factory=list
    )
    unique_columns: List[str] = Field(
        description="The columns that should be marked 'UNIQUE'.", default_factory=list
    )
    primary_key: Union[str, None] = Field(
        description="The primary key for this table.", default=None
    )

    def create_table_metadata(
        self,
        table_name: str,
    ) -> Tuple[MetaData, Table]:
        """Create an sql script to initialize a table.

        Arguments:
            column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
        """

        table_columns = []
        for column_name, data_type in self.columns.items():
            column_obj = Column(
                column_name,
                SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
                nullable=column_name in self.nullable_columns,
                primary_key=column_name == self.primary_key,
                index=column_name in self.index_columns,
                unique=column_name in self.unique_columns,
            )
            table_columns.append(column_obj)

        meta = MetaData()
        table = Table(table_name, meta, *table_columns)
        return meta, table

    def create_table(self, table_name: str, engine: Engine) -> Table:

        meta, table = self.create_table_metadata(table_name=table_name)
        meta.create_all(engine)
        return table
Attributes
columns: Dict[str, Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB']] pydantic-field required

The table columns and their attributes.

index_columns: List[str] pydantic-field

The columns to index

nullable_columns: List[str] pydantic-field

The columns that are nullable.

primary_key: str pydantic-field

The primary key for this table.

unique_columns: List[str] pydantic-field

The columns that should be marked 'UNIQUE'.

Methods
create_table(self, table_name, engine)
Source code in tabular/models/db.py
def create_table(self, table_name: str, engine: Engine) -> Table:

    meta, table = self.create_table_metadata(table_name=table_name)
    meta.create_all(engine)
    return table
create_table_metadata(self, table_name)

Create an sql script to initialize a table.

Parameters:

Name Type Description Default
column_attrs

a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values

required
Source code in tabular/models/db.py
def create_table_metadata(
    self,
    table_name: str,
) -> Tuple[MetaData, Table]:
    """Create an sql script to initialize a table.

    Arguments:
        column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
    """

    table_columns = []
    for column_name, data_type in self.columns.items():
        column_obj = Column(
            column_name,
            SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
            nullable=column_name in self.nullable_columns,
            primary_key=column_name == self.primary_key,
            index=column_name in self.index_columns,
            unique=column_name in self.unique_columns,
        )
        table_columns.append(column_obj)

    meta = MetaData()
    table = Table(table_name, meta, *table_columns)
    return meta, table
table
Classes
KiaraTable (KiaraModel) pydantic-model

A wrapper class to manage tabular data in a memory efficient way.

Source code in tabular/models/table.py
class KiaraTable(KiaraModel):
    """A wrapper class to manage tabular data in a memory efficient way."""

    @classmethod
    def create_table(cls, data: Any) -> "KiaraTable":
        """Create a `KiaraTable` instance from an Apache Arrow Table, or dict of lists."""

        table_obj = None
        if isinstance(data, KiaraTable):
            return data

        if isinstance(data, (pa.Table)):
            table_obj = data
        else:
            try:
                table_obj = pa.table(data)
            except Exception:
                pass

        if table_obj is None:
            raise Exception(
                f"Can't create table, invalid source data type: {type(data)}."
            )

        obj = KiaraTable()
        obj._table_obj = table_obj
        return obj

    data_path: Union[None, str] = Field(
        description="The path to the (feather) file backing this array.", default=None
    )
    """The path where the table object is store (for internal or read-only use)."""
    _table_obj: pa.Table = PrivateAttr(default=None)

    def _retrieve_data_to_hash(self) -> Any:
        raise NotImplementedError()

    @property
    def arrow_table(self) -> pa.Table:
        """Return the data as an Apache Arrow Table instance."""

        if self._table_obj is not None:
            return self._table_obj

        if not self.data_path:
            raise Exception("Can't retrieve table data, object not initialized (yet).")

        with pa.memory_map(self.data_path, "r") as source:
            table: pa.Table = pa.ipc.open_file(source).read_all()

        self._table_obj = table
        return self._table_obj

    @property
    def column_names(self) -> Iterable[str]:
        """Retrieve the names of all the columns of this table."""
        return self.arrow_table.column_names

    @property
    def num_rows(self) -> int:
        """Return the number of rows in this table."""
        return self.arrow_table.num_rows

    def to_pydict(self):
        """Convert and return the table data as a dictionary of lists.

        This will load all data into memory, so you might or might not want to do that.
        """
        return self.arrow_table.to_pydict()

    def to_pylist(self):
        """Convert and return the table data as a list of rows/dictionaries.

        This will load all data into memory, so you might or might not want to do that.
        """

        return self.arrow_table.to_pylist()

    def to_pandas(self):
        """Convert and return the table data to a Pandas dataframe.

        This will load all data into memory, so you might or might not want to do that.
        """
        return self.arrow_table.to_pandas()
Attributes
arrow_table: Table property readonly

Return the data as an Apache Arrow Table instance.

column_names: Iterable[str] property readonly

Retrieve the names of all the columns of this table.

data_path: str pydantic-field

The path to the (feather) file backing this array.

num_rows: int property readonly

Return the number of rows in this table.

Methods
create_table(data) classmethod

Create a KiaraTable instance from an Apache Arrow Table, or dict of lists.

Source code in tabular/models/table.py
@classmethod
def create_table(cls, data: Any) -> "KiaraTable":
    """Create a `KiaraTable` instance from an Apache Arrow Table, or dict of lists."""

    table_obj = None
    if isinstance(data, KiaraTable):
        return data

    if isinstance(data, (pa.Table)):
        table_obj = data
    else:
        try:
            table_obj = pa.table(data)
        except Exception:
            pass

    if table_obj is None:
        raise Exception(
            f"Can't create table, invalid source data type: {type(data)}."
        )

    obj = KiaraTable()
    obj._table_obj = table_obj
    return obj
to_pandas(self)

Convert and return the table data to a Pandas dataframe.

This will load all data into memory, so you might or might not want to do that.

Source code in tabular/models/table.py
def to_pandas(self):
    """Convert and return the table data to a Pandas dataframe.

    This will load all data into memory, so you might or might not want to do that.
    """
    return self.arrow_table.to_pandas()
to_pydict(self)

Convert and return the table data as a dictionary of lists.

This will load all data into memory, so you might or might not want to do that.

Source code in tabular/models/table.py
def to_pydict(self):
    """Convert and return the table data as a dictionary of lists.

    This will load all data into memory, so you might or might not want to do that.
    """
    return self.arrow_table.to_pydict()
to_pylist(self)

Convert and return the table data as a list of rows/dictionaries.

This will load all data into memory, so you might or might not want to do that.

Source code in tabular/models/table.py
def to_pylist(self):
    """Convert and return the table data as a list of rows/dictionaries.

    This will load all data into memory, so you might or might not want to do that.
    """

    return self.arrow_table.to_pylist()
KiaraTableMetadata (ValueMetadata) pydantic-model

File stats.

Source code in tabular/models/table.py
class KiaraTableMetadata(ValueMetadata):
    """File stats."""

    _metadata_key = "table"

    @classmethod
    def retrieve_supported_data_types(cls) -> Iterable[str]:
        return ["table"]

    @classmethod
    def create_value_metadata(cls, value: "Value") -> "KiaraTableMetadata":

        kiara_table: KiaraTable = value.data

        table: pa.Table = kiara_table.arrow_table

        table_schema = {}
        for name in table.schema.names:
            field = table.schema.field(name)
            md = field.metadata
            _type = field.type
            if not md:
                md = {
                    "arrow_type_id": _type.id,
                }
            _d = {
                "type_name": str(_type),
                "metadata": md,
            }
            table_schema[name] = _d

        schema = {
            "column_names": table.column_names,
            "column_schema": table_schema,
            "rows": table.num_rows,
            "size": table.nbytes,
        }

        md = TableMetadata.construct(**schema)
        return KiaraTableMetadata.construct(table=md)

    table: TableMetadata = Field(description="The table schema.")
Attributes
table: TableMetadata pydantic-field required

The table schema.

create_value_metadata(value) classmethod
Source code in tabular/models/table.py
@classmethod
def create_value_metadata(cls, value: "Value") -> "KiaraTableMetadata":

    kiara_table: KiaraTable = value.data

    table: pa.Table = kiara_table.arrow_table

    table_schema = {}
    for name in table.schema.names:
        field = table.schema.field(name)
        md = field.metadata
        _type = field.type
        if not md:
            md = {
                "arrow_type_id": _type.id,
            }
        _d = {
            "type_name": str(_type),
            "metadata": md,
        }
        table_schema[name] = _d

    schema = {
        "column_names": table.column_names,
        "column_schema": table_schema,
        "rows": table.num_rows,
        "size": table.nbytes,
    }

    md = TableMetadata.construct(**schema)
    return KiaraTableMetadata.construct(table=md)
retrieve_supported_data_types() classmethod
Source code in tabular/models/table.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
    return ["table"]

modules special

Modules

array special
FORCE_NON_NULL_DOC
MAX_INDEX_DOC
MIN_INDEX_DOC
REMOVE_TOKENS_DOC
Classes
DeserializeArrayModule (DeserializeValueModule)

Deserialize array data.

Source code in tabular/modules/array/__init__.py
class DeserializeArrayModule(DeserializeValueModule):
    """Deserialize array data."""

    _module_type_name = "load.array"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraArray}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "array"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("array.arrow")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        array_file = files[0]

        array = KiaraArray(data_path=array_file)
        return array
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "array"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraArray}
to__python_object(self, data, **config)
Source code in tabular/modules/array/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("array.arrow")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    array_file = files[0]

    array = KiaraArray(data_path=array_file)
    return array
ExtractDateConfig (KiaraInputsConfig) pydantic-model
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )
Attributes
force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

ExtractDateModule (AutoInputsKiaraModule)

Create an array of date objects from an array of strings.

This module is very simplistic at the moment, more functionality and options will be added in the future.

At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:

  • 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
  • remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.

Source code in tabular/modules/array/__init__.py
class ExtractDateModule(AutoInputsKiaraModule):
    """Create an array of date objects from an array of strings.

    This module is very simplistic at the moment, more functionality and options will be added in the future.

    At its core, this module uses the standard parser from the
    [dateutil](https://github.com/dateutil/dateutil) package to parse strings into dates. As this parser can't handle
     complex strings, the input strings can be pre-processed in the following ways:

    - 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
    - remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)

    By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
    setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
    will appear as 'NULL' value in the resulting array.
    """

    _module_type_name = "parse.date_array"
    _config_cls = ExtractDateConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {"array": {"type": "array", "doc": "The input array."}}
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {
            "date_array": {
                "type": "array",
                "doc": "The resulting array with items of a date data type.",
            }
        }

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

        import polars as pl
        import pyarrow as pa
        from dateutil import parser

        force_non_null: bool = self.get_data_for_field(
            field_name="force_non_null", inputs=inputs
        )
        min_pos: Union[None, int] = self.get_data_for_field(
            field_name="min_index", inputs=inputs
        )
        if min_pos is None:
            min_pos = 0
        max_pos: Union[None, int] = self.get_data_for_field(
            field_name="max_index", inputs=inputs
        )
        remove_tokens: Iterable[str] = self.get_data_for_field(
            field_name="remove_tokens", inputs=inputs
        )

        def parse_date(_text: str):

            text = _text
            if min_pos:
                try:
                    text = text[min_pos:]  # type: ignore
                except Exception:
                    return None
            if max_pos:
                try:
                    text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
                except Exception:
                    pass

            if remove_tokens:
                for t in remove_tokens:
                    text = text.replace(t, " ")

            try:
                d_obj = parser.parse(text, fuzzy=True)
            except Exception as e:
                if force_non_null:
                    raise KiaraProcessingException(e)
                return None

            if d_obj is None:
                if force_non_null:
                    raise KiaraProcessingException(
                        f"Can't parse date from string: {text}"
                    )
                return None

            return d_obj

        value = inputs.get_value_obj("array")
        array: KiaraArray = value.data

        series = pl.Series(name="tokens", values=array.arrow_array)
        job_log.add_log(f"start parsing date for {len(array)} items")
        result = series.apply(parse_date)
        job_log.add_log(f"finished parsing date for {len(array)} items")
        result_array = result.to_arrow()

        # TODO: remove this cast once the array data type can handle non-chunked arrays
        chunked = pa.chunked_array(result_array)
        outputs.set_values(date_array=chunked)
Classes
_config_cls (KiaraInputsConfig) private pydantic-model
Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):

    force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
    min_index: Union[None, int] = Field(
        description=MIN_INDEX_DOC,
        default=None,
    )
    max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
    remove_tokens: List[str] = Field(
        description=REMOVE_TOKENS_DOC, default_factory=list
    )
Attributes
force_non_null: bool pydantic-field

If set to 'True', raise an error if any of the strings in the array can't be parsed.

max_index: int pydantic-field

The maximum index until whic to parse the string(s).

min_index: int pydantic-field

The minimum index from where to start parsing the string(s).

remove_tokens: List[str] pydantic-field

A list of tokens/characters to replace with a single white-space before parsing the input.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/array/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {"array": {"type": "array", "doc": "The input array."}}
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/array/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {
        "date_array": {
            "type": "array",
            "doc": "The resulting array with items of a date data type.",
        }
    }
process(self, inputs, outputs, job_log)
Source code in tabular/modules/array/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):

    import polars as pl
    import pyarrow as pa
    from dateutil import parser

    force_non_null: bool = self.get_data_for_field(
        field_name="force_non_null", inputs=inputs
    )
    min_pos: Union[None, int] = self.get_data_for_field(
        field_name="min_index", inputs=inputs
    )
    if min_pos is None:
        min_pos = 0
    max_pos: Union[None, int] = self.get_data_for_field(
        field_name="max_index", inputs=inputs
    )
    remove_tokens: Iterable[str] = self.get_data_for_field(
        field_name="remove_tokens", inputs=inputs
    )

    def parse_date(_text: str):

        text = _text
        if min_pos:
            try:
                text = text[min_pos:]  # type: ignore
            except Exception:
                return None
        if max_pos:
            try:
                text = text[0 : max_pos - min_pos]  # type: ignore  # noqa
            except Exception:
                pass

        if remove_tokens:
            for t in remove_tokens:
                text = text.replace(t, " ")

        try:
            d_obj = parser.parse(text, fuzzy=True)
        except Exception as e:
            if force_non_null:
                raise KiaraProcessingException(e)
            return None

        if d_obj is None:
            if force_non_null:
                raise KiaraProcessingException(
                    f"Can't parse date from string: {text}"
                )
            return None

        return d_obj

    value = inputs.get_value_obj("array")
    array: KiaraArray = value.data

    series = pl.Series(name="tokens", values=array.arrow_array)
    job_log.add_log(f"start parsing date for {len(array)} items")
    result = series.apply(parse_date)
    job_log.add_log(f"finished parsing date for {len(array)} items")
    result_array = result.to_arrow()

    # TODO: remove this cast once the array data type can handle non-chunked arrays
    chunked = pa.chunked_array(result_array)
    outputs.set_values(date_array=chunked)
db special
Classes
CreateDatabaseModule (CreateFromModule)
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModule(CreateFromModule):

    _module_type_name = "create.database"
    _config_cls = CreateDatabaseModuleConfig

    def create__database__from__csv_file(self, source_value: Value) -> Any:
        """Create a database from a csv_file value."""

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        file_item: FileModel = source_value.data
        table_name = file_item.file_name_without_extension

        table_name = table_name.replace("-", "_")
        table_name = table_name.replace(".", "_")

        try:
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path, file_item=file_item, table_name=table_name
            )
        except Exception as e:
            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=file_item.path, reason=str(e))
            else:
                raise KiaraProcessingException(e)

        include_raw_content_in_file_info: bool = self.get_config_value(
            "include_source_metadata"
        )
        if include_raw_content_in_file_info:
            db = KiaraDatabase(db_file_path=db_path)
            db.create_if_not_exists()
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()
            included_files = {file_item.file_name: file_item}
            file_bundle = FileBundle.create_from_file_models(
                files=included_files, bundle_name=file_item.file_name
            )
            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=file_bundle,
                table_name="source_files_metadata",
                include_content=include_content,
            )
            db._lock_db()

        return db_path

    def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
        """Create a database from a csv_file_bundle value.

        Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
        in the resulting database. If this option is set, only a single table with all the values of all
        csv files will be created. For this to work, all csv files should follow the same schema.
        """

        merge_into_single_table = self.get_config_value("merge_into_single_table")
        if merge_into_single_table:
            raise NotImplementedError("Not supported (yet).")

        include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
            "include_source_metadata"
        )

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()

        # TODO: check whether/how to add indexes

        bundle: FileBundle = source_value.data
        table_names: List[str] = []
        for rel_path in sorted(bundle.included_files.keys()):

            file_item = bundle.included_files[rel_path]
            table_name = find_free_id(
                stem=file_item.file_name_without_extension, current_ids=table_names
            )
            try:
                table_names.append(table_name)
                create_sqlite_table_from_tabular_file(
                    target_db_file=db_path, file_item=file_item, table_name=table_name
                )
            except Exception as e:
                if self.get_config_value("ignore_errors") is True or True:
                    log_message("ignore.import_file", file=rel_path, reason=str(e))
                    continue
                raise KiaraProcessingException(e)

        if include_raw_content_in_file_info in [None, True]:
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()
            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=source_value.data,
                table_name="source_files_metadata",
                include_content=include_content,
            )
            db._lock_db()

        return db_path

    def create_optional_inputs(
        self, source_type: str, target_type
    ) -> Union[Mapping[str, Mapping[str, Any]], None]:

        if target_type == "database" and source_type == "table":

            return {
                "table_name": {
                    "type": "string",
                    "doc": "The name of the table in the new database.",
                    "default": "imported_table",
                }
            }
        else:
            return None

    def create__database__from__table(
        self, source_value: Value, optional: ValueMap
    ) -> Any:
        """Create a database value from a table."""

        table_name = optional.get_value_data("table_name")
        if not table_name:
            table_name = "imported_table"

        table: KiaraTable = source_value.data
        arrow_table = table.arrow_table

        column_map = None
        index_columns = None

        sqlite_schema = create_sqlite_schema_data_from_arrow_table(
            table=arrow_table, index_columns=index_columns, column_map=column_map
        )

        db = KiaraDatabase.create_in_temp_dir()
        db._unlock_db()
        engine = db.get_sqlalchemy_engine()

        _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

        with engine.connect() as conn:

            for batch in arrow_table.to_batches(
                max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
            ):
                conn.execute(insert(_table), batch.to_pylist())
                conn.commit()

        db._lock_db()
        return db
Classes
_config_cls (CreateFromModuleConfig) private pydantic-model
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
    merge_into_single_table: bool = Field(
        description="Whether to merge all csv files into a single table.", default=False
    )
    include_source_metadata: Union[bool, None] = Field(
        description="Whether to include a table with metadata about the source files.",
        default=None,
    )
    include_source_file_content: bool = Field(
        description="When including source metadata, whether to also include the original raw (string) content.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

include_source_file_content: bool pydantic-field

When including source metadata, whether to also include the original raw (string) content.

include_source_metadata: bool pydantic-field

Whether to include a table with metadata about the source files.

merge_into_single_table: bool pydantic-field

Whether to merge all csv files into a single table.

Methods
create__database__from__csv_file(self, source_value)

Create a database from a csv_file value.

Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file(self, source_value: Value) -> Any:
    """Create a database from a csv_file value."""

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    file_item: FileModel = source_value.data
    table_name = file_item.file_name_without_extension

    table_name = table_name.replace("-", "_")
    table_name = table_name.replace(".", "_")

    try:
        create_sqlite_table_from_tabular_file(
            target_db_file=db_path, file_item=file_item, table_name=table_name
        )
    except Exception as e:
        if self.get_config_value("ignore_errors") is True or True:
            log_message("ignore.import_file", file=file_item.path, reason=str(e))
        else:
            raise KiaraProcessingException(e)

    include_raw_content_in_file_info: bool = self.get_config_value(
        "include_source_metadata"
    )
    if include_raw_content_in_file_info:
        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()
        included_files = {file_item.file_name: file_item}
        file_bundle = FileBundle.create_from_file_models(
            files=included_files, bundle_name=file_item.file_name
        )
        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=file_bundle,
            table_name="source_files_metadata",
            include_content=include_content,
        )
        db._lock_db()

    return db_path
create__database__from__csv_file_bundle(self, source_value)

Create a database from a csv_file_bundle value.

Unless 'merge_into_single_table' is set to 'True', each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.

Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
    """Create a database from a csv_file_bundle value.

    Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
    in the resulting database. If this option is set, only a single table with all the values of all
    csv files will be created. For this to work, all csv files should follow the same schema.
    """

    merge_into_single_table = self.get_config_value("merge_into_single_table")
    if merge_into_single_table:
        raise NotImplementedError("Not supported (yet).")

    include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
        "include_source_metadata"
    )

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = KiaraDatabase(db_file_path=db_path)
    db.create_if_not_exists()

    # TODO: check whether/how to add indexes

    bundle: FileBundle = source_value.data
    table_names: List[str] = []
    for rel_path in sorted(bundle.included_files.keys()):

        file_item = bundle.included_files[rel_path]
        table_name = find_free_id(
            stem=file_item.file_name_without_extension, current_ids=table_names
        )
        try:
            table_names.append(table_name)
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path, file_item=file_item, table_name=table_name
            )
        except Exception as e:
            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=rel_path, reason=str(e))
                continue
            raise KiaraProcessingException(e)

    if include_raw_content_in_file_info in [None, True]:
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()
        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=source_value.data,
            table_name="source_files_metadata",
            include_content=include_content,
        )
        db._lock_db()

    return db_path
create__database__from__table(self, source_value, optional)

Create a database value from a table.

Source code in tabular/modules/db/__init__.py
def create__database__from__table(
    self, source_value: Value, optional: ValueMap
) -> Any:
    """Create a database value from a table."""

    table_name = optional.get_value_data("table_name")
    if not table_name:
        table_name = "imported_table"

    table: KiaraTable = source_value.data
    arrow_table = table.arrow_table

    column_map = None
    index_columns = None

    sqlite_schema = create_sqlite_schema_data_from_arrow_table(
        table=arrow_table, index_columns=index_columns, column_map=column_map
    )

    db = KiaraDatabase.create_in_temp_dir()
    db._unlock_db()
    engine = db.get_sqlalchemy_engine()

    _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

    with engine.connect() as conn:

        for batch in arrow_table.to_batches(
            max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
        ):
            conn.execute(insert(_table), batch.to_pylist())
            conn.commit()

    db._lock_db()
    return db
create_optional_inputs(self, source_type, target_type)
Source code in tabular/modules/db/__init__.py
def create_optional_inputs(
    self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:

    if target_type == "database" and source_type == "table":

        return {
            "table_name": {
                "type": "string",
                "doc": "The name of the table in the new database.",
                "default": "imported_table",
            }
        }
    else:
        return None
CreateDatabaseModuleConfig (CreateFromModuleConfig) pydantic-model
Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
    merge_into_single_table: bool = Field(
        description="Whether to merge all csv files into a single table.", default=False
    )
    include_source_metadata: Union[bool, None] = Field(
        description="Whether to include a table with metadata about the source files.",
        default=None,
    )
    include_source_file_content: bool = Field(
        description="When including source metadata, whether to also include the original raw (string) content.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

include_source_file_content: bool pydantic-field

When including source metadata, whether to also include the original raw (string) content.

include_source_metadata: bool pydantic-field

Whether to include a table with metadata about the source files.

merge_into_single_table: bool pydantic-field

Whether to merge all csv files into a single table.

LoadDatabaseFromDiskModule (DeserializeValueModule)
Source code in tabular/modules/db/__init__.py
class LoadDatabaseFromDiskModule(DeserializeValueModule):

    _module_type_name = "load.database"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraDatabase}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "database"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "copy"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("db.sqlite")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        db_file = files[0]

        db = KiaraDatabase(db_file_path=db_file)
        return db
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "database"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "copy"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraDatabase}
to__python_object(self, data, **config)
Source code in tabular/modules/db/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("db.sqlite")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    db_file = files[0]

    db = KiaraDatabase(db_file_path=db_file)
    return db
QueryDatabaseConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/db/__init__.py
class QueryDatabaseConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(description="The query.", default=None)
Attributes
query: str pydantic-field

The query.

QueryDatabaseModule (KiaraModule)

Execute a sql query against a (sqlite) database.

Source code in tabular/modules/db/__init__.py
class QueryDatabaseModule(KiaraModule):
    """Execute a sql query against a (sqlite) database."""

    _config_cls = QueryDatabaseConfig
    _module_type_name = "query.database"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        result: Dict[str, Dict[str, Any]] = {
            "database": {"type": "database", "doc": "The database to query."}
        }

        if not self.get_config_value("query"):
            result["query"] = {"type": "string", "doc": "The query to execute."}

        return result

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap):

        import pyarrow as pa

        database: KiaraDatabase = inputs.get_value_data("database")
        query = self.get_config_value("query")
        if query is None:
            query = inputs.get_value_data("query")

        # TODO: make this memory efficent

        result_columns: Dict[str, List[Any]] = {}
        with database.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(query))
            for r in result:
                for k, v in dict(r).items():
                    result_columns.setdefault(k, []).append(v)

        table = pa.Table.from_pydict(result_columns)
        outputs.set_value("query_result", table)
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/db/__init__.py
class QueryDatabaseConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(description="The query.", default=None)
Attributes
query: str pydantic-field

The query.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/db/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    result: Dict[str, Dict[str, Any]] = {
        "database": {"type": "database", "doc": "The database to query."}
    }

    if not self.get_config_value("query"):
        result["query"] = {"type": "string", "doc": "The query to execute."}

    return result
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/db/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
Source code in tabular/modules/db/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap):

    import pyarrow as pa

    database: KiaraDatabase = inputs.get_value_data("database")
    query = self.get_config_value("query")
    if query is None:
        query = inputs.get_value_data("query")

    # TODO: make this memory efficent

    result_columns: Dict[str, List[Any]] = {}
    with database.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(query))
        for r in result:
            for k, v in dict(r).items():
                result_columns.setdefault(k, []).append(v)

    table = pa.Table.from_pydict(result_columns)
    outputs.set_value("query_result", table)
RenderDatabaseModule (RenderDatabaseModuleBase)
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModule(RenderDatabaseModuleBase):
    _module_type_name = "render.database"

    def render__database__as__string(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
        )

    def render__database__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_manifest=self.manifest.manifest_hash,
        )
render__database__as__string(self, value, render_config)
Source code in tabular/modules/db/__init__.py
def render__database__as__string(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
    )
render__database__as__terminal_renderable(self, value, render_config)
Source code in tabular/modules/db/__init__.py
def render__database__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_manifest=self.manifest.manifest_hash,
    )
RenderDatabaseModuleBase (RenderValueModule)
Source code in tabular/modules/db/__init__.py
class RenderDatabaseModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_database(
        self,
        value: Value,
        table_name: Union[str, None],
        input_number_of_rows: int,
        input_row_offset: int,
    ):

        database: KiaraDatabase = value.data
        table_names = database.table_names

        if not table_name:
            table_name = list(table_names)[0]

        if table_name not in table_names:
            raise Exception(
                f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
            )

        related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
            t: RenderScene.construct(
                title=t,
                description=f"Display the '{t}' table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={"table_name": t},
            )
            for t in database.table_names
        }

        query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
        result: Dict[str, List[Any]] = {}
        # TODO: this could be written much more efficient
        with database.get_sqlalchemy_engine().connect() as con:
            num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
            table_num_rows = num_rows_result.fetchone()[0]
            rs = con.execute(text(query))
            for r in rs:
                for k, v in dict(r).items():
                    result.setdefault(k, []).append(v)

        wrap = DictTabularWrap(data=result)

        row_offset = table_num_rows - input_number_of_rows
        related_scenes: Dict[str, Union[RenderScene, None]] = {}
        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                        "table_name": table_name,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < table_num_rows:
                next = {
                    "row_offset": n_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(table_num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > table_num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                        "table_name": table_name,
                    },
                )
        related_scenes_tables[table_name].disabled = True  # type: ignore
        related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
        return wrap, related_scenes_tables
preprocess_database(self, value, table_name, input_number_of_rows, input_row_offset)
Source code in tabular/modules/db/__init__.py
def preprocess_database(
    self,
    value: Value,
    table_name: Union[str, None],
    input_number_of_rows: int,
    input_row_offset: int,
):

    database: KiaraDatabase = value.data
    table_names = database.table_names

    if not table_name:
        table_name = list(table_names)[0]

    if table_name not in table_names:
        raise Exception(
            f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
        )

    related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
        t: RenderScene.construct(
            title=t,
            description=f"Display the '{t}' table.",
            manifest_hash=self.manifest.manifest_hash,
            render_config={"table_name": t},
        )
        for t in database.table_names
    }

    query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
    result: Dict[str, List[Any]] = {}
    # TODO: this could be written much more efficient
    with database.get_sqlalchemy_engine().connect() as con:
        num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
        table_num_rows = num_rows_result.fetchone()[0]
        rs = con.execute(text(query))
        for r in rs:
            for k, v in dict(r).items():
                result.setdefault(k, []).append(v)

    wrap = DictTabularWrap(data=result)

    row_offset = table_num_rows - input_number_of_rows
    related_scenes: Dict[str, Union[RenderScene, None]] = {}
    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < table_num_rows:
            next = {
                "row_offset": n_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(table_num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > table_num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                    "table_name": table_name,
                },
            )
    related_scenes_tables[table_name].disabled = True  # type: ignore
    related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
    return wrap, related_scenes_tables
table special
EMPTY_COLUMN_NAME_MARKER
Classes
CreateTableModule (CreateFromModule)
Source code in tabular/modules/table/__init__.py
class CreateTableModule(CreateFromModule):

    _module_type_name = "create.table"
    _config_cls = CreateTableModuleConfig

    def create__table__from__csv_file(self, source_value: Value) -> Any:
        """Create a table from a csv_file value."""

        from pyarrow import csv

        input_file: FileModel = source_value.data
        imported_data = csv.read_csv(input_file.path)

        # import pandas as pd
        # df = pd.read_csv(input_file.path)
        # imported_data = pa.Table.from_pandas(df)

        return KiaraTable.create_table(imported_data)

    def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
        """Create a table value from a text file_bundle.

        The resulting table will have (at a minimum) the following collumns:
        - id: an auto-assigned index
        - rel_path: the relative path of the file (from the provided base path)
        - content: the text file content
        """

        import pyarrow as pa

        bundle: FileBundle = source_value.data

        columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

        ignore_errors = self.get_config_value("ignore_errors")
        file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

        # TODO: use chunks to save on memory
        tabular: Dict[str, List[Any]] = {}
        for column in columns:
            for index, rel_path in enumerate(sorted(file_dict.keys())):

                if column == "content":
                    _value: Any = file_dict[rel_path]
                elif column == "id":
                    _value = index
                elif column == "rel_path":
                    _value = rel_path
                else:
                    file_model = bundle.included_files[rel_path]
                    _value = getattr(file_model, column)

                tabular.setdefault(column, []).append(_value)

        table = pa.Table.from_pydict(tabular)
        return KiaraTable.create_table(table)
Classes
_config_cls (CreateFromModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

Methods
create__table__from__csv_file(self, source_value)

Create a table from a csv_file value.

Source code in tabular/modules/table/__init__.py
def create__table__from__csv_file(self, source_value: Value) -> Any:
    """Create a table from a csv_file value."""

    from pyarrow import csv

    input_file: FileModel = source_value.data
    imported_data = csv.read_csv(input_file.path)

    # import pandas as pd
    # df = pd.read_csv(input_file.path)
    # imported_data = pa.Table.from_pandas(df)

    return KiaraTable.create_table(imported_data)
create__table__from__text_file_bundle(self, source_value)

Create a table value from a text file_bundle.

The resulting table will have (at a minimum) the following collumns: - id: an auto-assigned index - rel_path: the relative path of the file (from the provided base path) - content: the text file content

Source code in tabular/modules/table/__init__.py
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
    """Create a table value from a text file_bundle.

    The resulting table will have (at a minimum) the following collumns:
    - id: an auto-assigned index
    - rel_path: the relative path of the file (from the provided base path)
    - content: the text file content
    """

    import pyarrow as pa

    bundle: FileBundle = source_value.data

    columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS

    ignore_errors = self.get_config_value("ignore_errors")
    file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)

    # TODO: use chunks to save on memory
    tabular: Dict[str, List[Any]] = {}
    for column in columns:
        for index, rel_path in enumerate(sorted(file_dict.keys())):

            if column == "content":
                _value: Any = file_dict[rel_path]
            elif column == "id":
                _value = index
            elif column == "rel_path":
                _value = rel_path
            else:
                file_model = bundle.included_files[rel_path]
                _value = getattr(file_model, column)

            tabular.setdefault(column, []).append(_value)

    table = pa.Table.from_pydict(tabular)
    return KiaraTable.create_table(table)
CreateTableModuleConfig (CreateFromModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
Attributes
ignore_errors: bool pydantic-field

Whether to ignore convert errors and omit the failed items.

CutColumnModule (KiaraModule)

Cut off one column from a table, returning an array.

Source code in tabular/modules/table/__init__.py
class CutColumnModule(KiaraModule):
    """Cut off one column from a table, returning an array."""

    _module_type_name = "table.cut_column"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs: Mapping[str, Any] = {
            "table": {"type": "table", "doc": "A table."},
            "column_name": {
                "type": "string",
                "doc": "The name of the column to extract.",
            },
        }
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import pyarrow as pa

        column_name: str = inputs.get_value_data("column_name")

        table_value: Value = inputs.get_value_obj("table")
        table_metadata: KiaraTableMetadata = table_value.get_property_data(
            "metadata.table"
        )

        available = table_metadata.table.column_names

        if column_name not in available:
            raise KiaraProcessingException(
                f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
            )

        table: pa.Table = table_value.data.arrow_table
        column = table.column(column_name)

        outputs.set_value("array", column)
Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs: Mapping[str, Any] = {
        "table": {"type": "table", "doc": "A table."},
        "column_name": {
            "type": "string",
            "doc": "The name of the column to extract.",
        },
    }
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
    return outputs
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import pyarrow as pa

    column_name: str = inputs.get_value_data("column_name")

    table_value: Value = inputs.get_value_obj("table")
    table_metadata: KiaraTableMetadata = table_value.get_property_data(
        "metadata.table"
    )

    available = table_metadata.table.column_names

    if column_name not in available:
        raise KiaraProcessingException(
            f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
        )

    table: pa.Table = table_value.data.arrow_table
    column = table.column(column_name)

    outputs.set_value("array", column)
DeserializeTableModule (DeserializeValueModule)
Source code in tabular/modules/table/__init__.py
class DeserializeTableModule(DeserializeValueModule):

    _module_type_name = "load.table"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraTable}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "table"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "feather"

    def to__python_object(self, data: SerializedData, **config: Any):

        import pyarrow as pa

        columns = {}

        for column_name in data.get_keys():

            chunks = data.get_serialized_data(column_name)

            # TODO: support multiple chunks
            assert chunks.get_number_of_chunks() == 1
            files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
            assert len(files) == 1

            file = files[0]
            with pa.memory_map(file, "r") as column_chunk:
                loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
                column = loaded_arrays.column(column_name)
                if column_name == EMPTY_COLUMN_NAME_MARKER:
                    columns[""] = column
                else:
                    columns[column_name] = column

        arrow_table = pa.table(columns)

        table = KiaraTable.create_table(arrow_table)
        return table
retrieve_serialized_value_type() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "table"
retrieve_supported_serialization_profile() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "feather"
retrieve_supported_target_profiles() classmethod
Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraTable}
to__python_object(self, data, **config)
Source code in tabular/modules/table/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):

    import pyarrow as pa

    columns = {}

    for column_name in data.get_keys():

        chunks = data.get_serialized_data(column_name)

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        file = files[0]
        with pa.memory_map(file, "r") as column_chunk:
            loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
            column = loaded_arrays.column(column_name)
            if column_name == EMPTY_COLUMN_NAME_MARKER:
                columns[""] = column
            else:
                columns[column_name] = column

    arrow_table = pa.table(columns)

    table = KiaraTable.create_table(arrow_table)
    return table
ExportTableModule (DataExportModule)

Export table data items.

Source code in tabular/modules/table/__init__.py
class ExportTableModule(DataExportModule):
    """Export table data items."""

    _module_type_name = "export.table"

    def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
        """Export a table as csv file."""

        import pyarrow.csv as csv

        target_path = os.path.join(base_path, f"{name}.csv")

        csv.write_csv(value.arrow_table, target_path)

        return {"files": target_path}

    # def export__table__as__sqlite_db(
    #     self, value: KiaraTable, base_path: str, name: str
    # ):
    #
    #     target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
    #
    #     raise NotImplementedError()
    #     # shutil.copy2(value.db_file_path, target_path)
    #
    #     return {"files": target_path}
Methods
export__table__as__csv_file(self, value, base_path, name)

Export a table as csv file.

Source code in tabular/modules/table/__init__.py
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
    """Export a table as csv file."""

    import pyarrow.csv as csv

    target_path = os.path.join(base_path, f"{name}.csv")

    csv.write_csv(value.arrow_table, target_path)

    return {"files": target_path}
MergeTableConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )
Attributes
column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

MergeTableModule (KiaraModule)

Create a table from other tables and/or arrays.

This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.

Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table'

Source code in tabular/modules/table/__init__.py
class MergeTableModule(KiaraModule):
    """Create a table from other tables and/or arrays.

    This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
    number of tables/arrays, all tables to be merged must be specified in the module configuration.

    Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
    desired column name as key, and a field-name in the following format as value:
    - '[inputs_schema key]' for inputs of type 'array'
    - '[inputs_schema_key].orig_column_name' for inputs of type 'table'
    """

    _module_type_name = "table.merge"
    _config_cls = MergeTableConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        input_schema_dict = self.get_config_value("inputs_schema")
        return input_schema_dict

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        outputs = {
            "table": {
                "type": "table",
                "doc": "The merged table, including all source tables and columns.",
            }
        }
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

        import pyarrow as pa

        inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
        column_map: Dict[str, str] = self.get_config_value("column_map")

        sources = {}
        for field_name in inputs_schema.keys():
            sources[field_name] = inputs.get_value_data(field_name)

        len_dict = {}
        arrays = {}

        column_map_final = dict(column_map)

        for source_key, table_or_array in sources.items():

            if isinstance(table_or_array, KiaraTable):
                rows = table_or_array.num_rows
                for name in table_or_array.column_names:
                    array_name = f"{source_key}.{name}"
                    if column_map and array_name not in column_map.values():
                        job_log.add_log(
                            f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                        )
                        continue

                    column = table_or_array.arrow_table.column(name)
                    arrays[array_name] = column
                    if not column_map:
                        if name in column_map_final:
                            raise Exception(
                                f"Can't merge table, duplicate column name: {name}."
                            )
                        column_map_final[name] = array_name

            elif isinstance(table_or_array, KiaraArray):

                if column_map and source_key not in column_map.values():
                    job_log.add_log(
                        f"Ignoring array '{source_key}': not listed in column_map."
                    )
                    continue

                rows = len(table_or_array)
                arrays[source_key] = table_or_array.arrow_array

                if not column_map:
                    if source_key in column_map_final.keys():
                        raise Exception(
                            f"Can't merge table, duplicate column name: {source_key}."
                        )
                    column_map_final[source_key] = source_key

            else:
                raise KiaraProcessingException(
                    f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
                )

            len_dict[source_key] = rows

        all_rows = None
        for source_key, rows in len_dict.items():
            if all_rows is None:
                all_rows = rows
            else:
                if all_rows != rows:
                    all_rows = None
                    break

        if all_rows is None:
            len_str = ""
            for name, rows in len_dict.items():
                len_str = f" {name} ({rows})"

            raise KiaraProcessingException(
                f"Can't merge table, sources have different lengths: {len_str}"
            )

        column_names = []
        columns = []
        for column_name, ref in column_map_final.items():
            column_names.append(column_name)
            column = arrays[ref]
            columns.append(column)

        table = pa.Table.from_arrays(arrays=columns, names=column_names)

        outputs.set_value("table", table)
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):

    inputs_schema: Dict[str, ValueSchema] = Field(
        description="A dict describing the inputs for this merge process."
    )
    column_map: Dict[str, str] = Field(
        description="A map describing", default_factory=dict
    )
Attributes
column_map: Dict[str, str] pydantic-field

A map describing

inputs_schema: Dict[str, kiara.models.values.value_schema.ValueSchema] pydantic-field required

A dict describing the inputs for this merge process.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    input_schema_dict = self.get_config_value("inputs_schema")
    return input_schema_dict
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    outputs = {
        "table": {
            "type": "table",
            "doc": "The merged table, including all source tables and columns.",
        }
    }
    return outputs
process(self, inputs, outputs, job_log)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:

    import pyarrow as pa

    inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
    column_map: Dict[str, str] = self.get_config_value("column_map")

    sources = {}
    for field_name in inputs_schema.keys():
        sources[field_name] = inputs.get_value_data(field_name)

    len_dict = {}
    arrays = {}

    column_map_final = dict(column_map)

    for source_key, table_or_array in sources.items():

        if isinstance(table_or_array, KiaraTable):
            rows = table_or_array.num_rows
            for name in table_or_array.column_names:
                array_name = f"{source_key}.{name}"
                if column_map and array_name not in column_map.values():
                    job_log.add_log(
                        f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
                    )
                    continue

                column = table_or_array.arrow_table.column(name)
                arrays[array_name] = column
                if not column_map:
                    if name in column_map_final:
                        raise Exception(
                            f"Can't merge table, duplicate column name: {name}."
                        )
                    column_map_final[name] = array_name

        elif isinstance(table_or_array, KiaraArray):

            if column_map and source_key not in column_map.values():
                job_log.add_log(
                    f"Ignoring array '{source_key}': not listed in column_map."
                )
                continue

            rows = len(table_or_array)
            arrays[source_key] = table_or_array.arrow_array

            if not column_map:
                if source_key in column_map_final.keys():
                    raise Exception(
                        f"Can't merge table, duplicate column name: {source_key}."
                    )
                column_map_final[source_key] = source_key

        else:
            raise KiaraProcessingException(
                f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
            )

        len_dict[source_key] = rows

    all_rows = None
    for source_key, rows in len_dict.items():
        if all_rows is None:
            all_rows = rows
        else:
            if all_rows != rows:
                all_rows = None
                break

    if all_rows is None:
        len_str = ""
        for name, rows in len_dict.items():
            len_str = f" {name} ({rows})"

        raise KiaraProcessingException(
            f"Can't merge table, sources have different lengths: {len_str}"
        )

    column_names = []
    columns = []
    for column_name, ref in column_map_final.items():
        column_names.append(column_name)
        column = arrays[ref]
        columns.append(column)

    table = pa.Table.from_arrays(arrays=columns, names=column_names)

    outputs.set_value("table", table)
QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.

Source code in tabular/modules/table/__init__.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table.

    The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.

    If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
    one will be used.
    """

    _module_type_name = "query.table"
    _config_cls = QueryTableSQLModuleConfig

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        inputs = {
            "table": {
                "type": "table",
                "doc": "The table to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}
            inputs["relation_name"] = {
                "type": "string",
                "doc": "The name the table is referred to in the sql query.",
                "default": "data",
            }

        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        import duckdb

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
            _relation_name: str = inputs.get_value_data("relation_name")
        else:
            _query = self.get_config_value("query")
            _relation_name = self.get_config_value("relation_name")

        if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
            raise KiaraProcessingException(
                f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
            )

        _table: KiaraTable = inputs.get_value_data("table")
        rel_from_arrow = duckdb.arrow(_table.arrow_table)
        result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

        outputs.set_value("query_result", result.arrow())
Classes
_config_cls (KiaraModuleConfig) private pydantic-model
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
Attributes
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    inputs = {
        "table": {
            "type": "table",
            "doc": "The table to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}
        inputs["relation_name"] = {
            "type": "string",
            "doc": "The name the table is referred to in the sql query.",
            "default": "data",
        }

    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    import duckdb

    if self.get_config_value("query") is None:
        _query: str = inputs.get_value_data("query")
        _relation_name: str = inputs.get_value_data("relation_name")
    else:
        _query = self.get_config_value("query")
        _relation_name = self.get_config_value("relation_name")

    if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
        raise KiaraProcessingException(
            f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
        )

    _table: KiaraTable = inputs.get_value_data("table")
    rel_from_arrow = duckdb.arrow(_table.arrow_table)
    result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)

    outputs.set_value("query_result", result.arrow())
QueryTableSQLModuleConfig (KiaraModuleConfig) pydantic-model
Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )
    relation_name: Union[str, None] = Field(
        description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
        default="data",
    )
Attributes
query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

relation_name: str pydantic-field

The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.

RenderTableModule (RenderTableModuleBase)
Source code in tabular/modules/table/__init__.py
class RenderTableModule(RenderTableModuleBase):
    _module_type_name = "render.table"

    def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )

    def render__table__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        wrap, data_related_scenes = self.preprocess_table(
            value=value,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
            rendered=pretty,
            related_scenes=data_related_scenes,
        )
render__table__as__string(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )
render__table__as__terminal_renderable(self, value, render_config)
Source code in tabular/modules/table/__init__.py
def render__table__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    wrap, data_related_scenes = self.preprocess_table(
        value=value,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
        rendered=pretty,
        related_scenes=data_related_scenes,
    )
RenderTableModuleBase (RenderValueModule)
Source code in tabular/modules/table/__init__.py
class RenderTableModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_table(
        self, value: Value, input_number_of_rows: int, input_row_offset: int
    ):

        import duckdb
        import pyarrow as pa

        if value.data_type_name == "array":
            array: KiaraArray = value.data
            arrow_table = pa.table(data=[array.arrow_array], names=["array"])
            column_names: Iterable[str] = ["array"]
        else:
            table: KiaraTable = value.data
            arrow_table = table.arrow_table
            column_names = table.column_names

        columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

        query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

        rel_from_arrow = duckdb.arrow(arrow_table)
        query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

        result_table = query_result.arrow()
        wrap = ArrowTabularWrap(table=result_table)

        related_scenes: Dict[str, Union[None, RenderScene]] = {}

        row_offset = arrow_table.num_rows - input_number_of_rows

        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < arrow_table.num_rows:
                next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(arrow_table.num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                    },
                )
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None
            related_scenes["next"] = None
            related_scenes["last"] = None

        return wrap, related_scenes
preprocess_table(self, value, input_number_of_rows, input_row_offset)
Source code in tabular/modules/table/__init__.py
def preprocess_table(
    self, value: Value, input_number_of_rows: int, input_row_offset: int
):

    import duckdb
    import pyarrow as pa

    if value.data_type_name == "array":
        array: KiaraArray = value.data
        arrow_table = pa.table(data=[array.arrow_array], names=["array"])
        column_names: Iterable[str] = ["array"]
    else:
        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        column_names = table.column_names

    columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]

    query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""

    rel_from_arrow = duckdb.arrow(arrow_table)
    query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)

    result_table = query_result.arrow()
    wrap = ArrowTabularWrap(table=result_table)

    related_scenes: Dict[str, Union[None, RenderScene]] = {}

    row_offset = arrow_table.num_rows - input_number_of_rows

    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < arrow_table.num_rows:
            next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(arrow_table.num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                },
            )
    else:
        related_scenes["first"] = None
        related_scenes["previous"] = None
        related_scenes["next"] = None
        related_scenes["last"] = None

    return wrap, related_scenes
filters
TableFiltersModule (FilterModule)
Source code in tabular/modules/table/filters.py
class TableFiltersModule(FilterModule):

    _module_type_name = "table.filters"

    @classmethod
    def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

        return "table"

    def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

        if filter_name in ["select_columns", "drop_columns"]:

            return {
                "columns": {
                    "type": "list",
                    "doc": "The name of the columns to include.",
                    "optional": True,
                },
                "ignore_invalid_column_names": {
                    "type": "boolean",
                    "doc": "Whether to ignore invalid column names.",
                    "default": True,
                },
            }

        return None

    def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names = filter_inputs["columns"]

        if not column_names:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        _column_names = []
        _columns = []

        for column_name in column_names:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names_to_ignore = filter_inputs["columns"]

        if not column_names_to_ignore:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table

        for column_name in column_names_to_ignore:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

        _column_names = []
        _columns = []
        for column_name in arrow_table.column_names:

            if column_name in column_names_to_ignore:
                continue

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

        pass
create_filter_inputs(self, filter_name)
Source code in tabular/modules/table/filters.py
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

    if filter_name in ["select_columns", "drop_columns"]:

        return {
            "columns": {
                "type": "list",
                "doc": "The name of the columns to include.",
                "optional": True,
            },
            "ignore_invalid_column_names": {
                "type": "boolean",
                "doc": "Whether to ignore invalid column names.",
                "default": True,
            },
        }

    return None
filter__drop_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names_to_ignore = filter_inputs["columns"]

    if not column_names_to_ignore:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table

    for column_name in column_names_to_ignore:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

    _column_names = []
    _columns = []
    for column_name in arrow_table.column_names:

        if column_name in column_names_to_ignore:
            continue

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_columns(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names = filter_inputs["columns"]

    if not column_names:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table
    _column_names = []
    _columns = []

    for column_name in column_names:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_rows(self, value, filter_inputs)
Source code in tabular/modules/table/filters.py
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

    pass
retrieve_supported_type() classmethod
Source code in tabular/modules/table/filters.py
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

    return "table"

pipelines special

Default (empty) module that is used as a base path for pipelines contained in this package.

utils

Functions

convert_arrow_column_types_to_sqlite(table)
Source code in tabular/utils.py
def convert_arrow_column_types_to_sqlite(
    table: "pa.Table",
) -> Dict[str, SqliteDataType]:

    result: Dict[str, SqliteDataType] = {}
    for column_name in table.column_names:
        field = table.field(column_name)
        sqlite_type = convert_arrow_type_to_sqlite(str(field.type))
        result[column_name] = sqlite_type

    return result
convert_arrow_type_to_sqlite(data_type)
Source code in tabular/utils.py
def convert_arrow_type_to_sqlite(data_type: str) -> SqliteDataType:

    if data_type.startswith("int") or data_type.startswith("uint"):
        return "INTEGER"

    if (
        data_type.startswith("float")
        or data_type.startswith("decimal")
        or data_type.startswith("double")
    ):
        return "REAL"

    if data_type.startswith("time") or data_type.startswith("date"):
        return "TEXT"

    if data_type == "bool":
        return "INTEGER"

    if data_type in ["string", "utf8", "large_string", "large_utf8"]:
        return "TEXT"

    if data_type in ["binary", "large_binary"]:
        return "BLOB"

    raise Exception(f"Can't convert to sqlite type: {data_type}")
create_sqlite_schema_data_from_arrow_table(table, column_map=None, index_columns=None, nullable_columns=None, unique_columns=None, primary_key=None)

Create a sql schema statement from an Arrow table object.

Parameters:

Name Type Description Default
table pa.Table

the Arrow table object

required
column_map Optional[Mapping[str, str]]

a map that contains column names that should be changed in the new table

None
index_columns Optional[Iterable[str]]

a list of column names (after mapping) to create module_indexes for

None
extra_column_info

a list of extra schema instructions per column name (after mapping)

required
Source code in tabular/utils.py
def create_sqlite_schema_data_from_arrow_table(
    table: "pa.Table",
    column_map: Union[Mapping[str, str], None] = None,
    index_columns: Union[Iterable[str], None] = None,
    nullable_columns: Union[Iterable[str], None] = None,
    unique_columns: Union[Iterable[str], None] = None,
    primary_key: Union[str, None] = None,
) -> SqliteTableSchema:
    """Create a sql schema statement from an Arrow table object.

    Arguments:
        table: the Arrow table object
        column_map: a map that contains column names that should be changed in the new table
        index_columns: a list of column names (after mapping) to create module_indexes for
        extra_column_info: a list of extra schema instructions per column name (after mapping)
    """

    columns = convert_arrow_column_types_to_sqlite(table=table)

    if column_map is None:
        column_map = {}

    temp: Dict[str, SqliteDataType] = {}

    if index_columns is None:
        index_columns = []

    if nullable_columns is None:
        nullable_columns = []

    if unique_columns is None:
        unique_columns = []

    for cn, sqlite_data_type in columns.items():
        if cn in column_map.keys():
            new_key = column_map[cn]
            index_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in index_columns
            ]
            unique_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in unique_columns
            ]
            nullable_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in nullable_columns
            ]
        else:
            new_key = cn

        temp[new_key] = sqlite_data_type

    columns = temp
    if not columns:
        raise Exception("Resulting table schema has no columns.")
    else:
        for ic in index_columns:
            if ic not in columns.keys():
                raise Exception(
                    f"Can't create schema, requested index column name not available: {ic}"
                )

    schema = SqliteTableSchema(
        columns=columns,
        index_columns=index_columns,
        nullable_columns=nullable_columns,
        unique_columns=unique_columns,
        primary_key=primary_key,
    )
    return schema
create_sqlite_table_from_tabular_file(target_db_file, file_item, table_name=None, is_csv=True, is_tsv=False, is_nl=False, primary_key_column_names=None, flatten_nested_json_objects=False, csv_delimiter=None, quotechar=None, sniff=True, no_headers=False, encoding='utf-8', batch_size=100, detect_types=True)
Source code in tabular/utils.py
def create_sqlite_table_from_tabular_file(
    target_db_file: str,
    file_item: FileModel,
    table_name: Union[str, None] = None,
    is_csv: bool = True,
    is_tsv: bool = False,
    is_nl: bool = False,
    primary_key_column_names: Union[Iterable[str], None] = None,
    flatten_nested_json_objects: bool = False,
    csv_delimiter: Union[str, None] = None,
    quotechar: Union[str, None] = None,
    sniff: bool = True,
    no_headers: bool = False,
    encoding: str = "utf-8",
    batch_size: int = 100,
    detect_types: bool = True,
):

    if not table_name:
        table_name = file_item.file_name_without_extension

    f = open(file_item.path, "rb")

    try:
        insert_upsert_implementation(
            path=target_db_file,
            table=table_name,
            file=f,
            pk=primary_key_column_names,
            flatten=flatten_nested_json_objects,
            nl=is_nl,
            csv=is_csv,
            tsv=is_tsv,
            lines=False,
            text=False,
            convert=None,
            imports=None,
            delimiter=csv_delimiter,
            quotechar=quotechar,
            sniff=sniff,
            no_headers=no_headers,
            encoding=encoding,
            batch_size=batch_size,
            alter=False,
            upsert=False,
            ignore=False,
            replace=False,
            truncate=False,
            not_null=None,
            default=None,
            detect_types=detect_types,
            analyze=False,
            load_extension=None,
            silent=True,
            bulk_sql=None,
        )
    except Exception as e:
        log_exception(e)
    finally:
        f.close()
insert_db_table_from_file_bundle(database, file_bundle, table_name='file_items', include_content=True)
Source code in tabular/utils.py
def insert_db_table_from_file_bundle(
    database: KiaraDatabase,
    file_bundle: FileBundle,
    table_name: str = "file_items",
    include_content: bool = True,
):

    # TODO: check if table with that name exists

    from sqlalchemy import Column, Integer, MetaData, String, Table, Text, insert
    from sqlalchemy.engine import Engine

    # if db_file_path is None:
    #     temp_f = tempfile.mkdtemp()
    #     db_file_path = os.path.join(temp_f, "db.sqlite")
    #
    #     def cleanup():
    #         shutil.rmtree(db_file_path, ignore_errors=True)
    #
    #     atexit.register(cleanup)

    metadata_obj = MetaData()

    file_items = Table(
        table_name,
        metadata_obj,
        Column("id", Integer, primary_key=True),
        Column("size", Integer(), nullable=False),
        Column("mime_type", String(length=64), nullable=False),
        Column("rel_path", String(), nullable=False),
        Column("file_name", String(), nullable=False),
        Column("content", Text(), nullable=not include_content),
    )

    engine: Engine = database.get_sqlalchemy_engine()
    metadata_obj.create_all(engine)

    with engine.connect() as con:

        # TODO: commit in batches for better performance

        for index, rel_path in enumerate(sorted(file_bundle.included_files.keys())):
            f: FileModel = file_bundle.included_files[rel_path]
            if not include_content:
                content: Union[str, None] = f.read_text()  # type: ignore
            else:
                content = None

            _values = {
                "id": index,
                "size": f.size,
                "mime_type": f.mime_type,
                "rel_path": rel_path,
                "file_name": f.file_name,
                "content": content,
            }

            stmt = insert(file_items).values(**_values)
            con.execute(stmt)
        con.commit()