Skip to content

tables

Attributes

Classes

Functions

attach_metadata(table: pa.Table, *, table_metadata: Union[Dict[str, KiaraModel], None] = None, column_metadata: Union[Dict[str, Dict[str, KiaraModel]], None] = None, overwrite_existing: bool = True) -> pa.Table

Attach metadata and column_metadata to a table.

Parameters:

Name Type Description Default
table_metadata Union[Dict[str, KiaraModel], None]

the (overall) metadata to attach to the table (format: = )

None
column_metadata Union[Dict[str, Dict[str, KiaraModel]], None]

the column metadata to attach to the table (format: . = )

None
overwrite_existing bool

if True, existing keys will be overwritten, otherwise they will be kept and the new values will be ignored

True
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/utils/tables.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def attach_metadata(
    table: pa.Table,
    *,
    table_metadata: Union[Dict[str, "KiaraModel"], None] = None,
    column_metadata: Union[Dict[str, Dict[str, "KiaraModel"]], None] = None,
    overwrite_existing: bool = True
) -> pa.Table:
    """Attach metadata and column_metadata to a table.

    Arguments:
        table_metadata: the (overall) metadata to attach to the table (format: <metadata_key> = <metadata_value>)
        column_metadata: the column metadata to attach to the table (format: <column_name>.<metadata_key> = <metadata_value>)
        overwrite_existing: if True, existing keys will be overwritten, otherwise they will be kept and the new values will be ignored
    """

    if column_metadata:
        new_fields = []
        for idx, column_name in enumerate(table.schema.names):
            field = table.schema.field(idx)
            assert field.name == column_name

            if table_metadata:
                raise NotImplementedError()

            models = column_metadata.get(column_name, None)
            if not models:
                new_fields.append(field)
            else:
                coL_metadata = {}
                for key, model in models.items():
                    if not overwrite_existing:
                        if field.metadata and key in field.metadata.keys():
                            continue
                    coL_metadata[key] = model.as_json_with_schema(incl_model_id=True)
                new_field = field.with_metadata(coL_metadata)
                new_fields.append(new_field)

        new_schema = pa.schema(new_fields)
    else:
        new_schema = table.schema

    new_table = pa.table(table.columns, schema=new_schema)
    return new_table

extract_column_metadata(table: pa.Table) -> Dict[str, Dict[str, KiaraModel]]

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/utils/tables.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def extract_column_metadata(table: pa.Table) -> Dict[str, Dict[str, "KiaraModel"]]:

    from kiara.registries.models import ModelRegistry

    model_registry = ModelRegistry.instance()

    result: Dict[str, Dict[str, KiaraModel]] = {}
    for idx, column_name in enumerate(table.schema.names):
        field = table.schema.field(idx)
        assert field.name == column_name

        if not field.metadata:
            result[column_name] = {}
        else:
            column_metadata = {}
            for key, model_data in field.metadata.items():
                model_instance = model_registry.create_instance_from_json(model_data)
                column_metadata[key] = model_instance
            result[column_name] = column_metadata

    return result

create_database_from_tables(tables: KiaraTables) -> KiaraDatabase

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/utils/tables.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def create_database_from_tables(tables: "KiaraTables") -> "KiaraDatabase":

    from sqlalchemy import insert

    from kiara_plugin.tabular.models.db import KiaraDatabase

    column_map = None
    index_columns = None

    db = KiaraDatabase.create_in_temp_dir()
    db._unlock_db()
    engine = db.get_sqlalchemy_engine()

    for table_name, table in tables.tables.items():
        arrow_table = table.arrow_table
        nullable_columns = []
        for column_name in arrow_table.column_names:
            column = arrow_table.column(column_name)
            if column.null_count > 0:
                nullable_columns.append(column_name)

        sqlite_schema = create_sqlite_schema_data_from_arrow_table(
            table=table.arrow_table,
            index_columns=index_columns,
            column_map=column_map,
            nullable_columns=nullable_columns,
        )

        _table = sqlite_schema.create_table(table_name=table_name, engine=engine)
        with engine.connect() as conn:
            arrow_table = table.arrow_table
            for batch in arrow_table.to_batches(
                max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
            ):
                conn.execute(insert(_table), batch.to_pylist())
                conn.commit()

    db._lock_db()
    return db