Skip to content

utils

Functions

convert_arrow_column_types_to_sqlite(table)

Source code in tabular/utils.py
def convert_arrow_column_types_to_sqlite(
    table: "pa.Table",
) -> Dict[str, SqliteDataType]:

    result: Dict[str, SqliteDataType] = {}
    for column_name in table.column_names:
        field = table.field(column_name)
        sqlite_type = convert_arrow_type_to_sqlite(str(field.type))
        result[column_name] = sqlite_type

    return result

convert_arrow_type_to_sqlite(data_type)

Source code in tabular/utils.py
def convert_arrow_type_to_sqlite(data_type: str) -> SqliteDataType:

    if data_type.startswith("int") or data_type.startswith("uint"):
        return "INTEGER"

    if (
        data_type.startswith("float")
        or data_type.startswith("decimal")
        or data_type.startswith("double")
    ):
        return "REAL"

    if data_type.startswith("time") or data_type.startswith("date"):
        return "TEXT"

    if data_type == "bool":
        return "INTEGER"

    if data_type in ["string", "utf8", "large_string", "large_utf8"]:
        return "TEXT"

    if data_type in ["binary", "large_binary"]:
        return "BLOB"

    raise Exception(f"Can't convert to sqlite type: {data_type}")

create_sqlite_schema_data_from_arrow_table(table, column_map=None, index_columns=None, nullable_columns=None, unique_columns=None, primary_key=None)

Create a sql schema statement from an Arrow table object.

Parameters:

Name Type Description Default
table pa.Table

the Arrow table object

required
column_map Optional[Mapping[str, str]]

a map that contains column names that should be changed in the new table

None
index_columns Optional[Iterable[str]]

a list of column names (after mapping) to create module_indexes for

None
extra_column_info

a list of extra schema instructions per column name (after mapping)

required
Source code in tabular/utils.py
def create_sqlite_schema_data_from_arrow_table(
    table: "pa.Table",
    column_map: Union[Mapping[str, str], None] = None,
    index_columns: Union[Iterable[str], None] = None,
    nullable_columns: Union[Iterable[str], None] = None,
    unique_columns: Union[Iterable[str], None] = None,
    primary_key: Union[str, None] = None,
) -> SqliteTableSchema:
    """Create a sql schema statement from an Arrow table object.

    Arguments:
        table: the Arrow table object
        column_map: a map that contains column names that should be changed in the new table
        index_columns: a list of column names (after mapping) to create module_indexes for
        extra_column_info: a list of extra schema instructions per column name (after mapping)
    """

    columns = convert_arrow_column_types_to_sqlite(table=table)

    if column_map is None:
        column_map = {}

    temp: Dict[str, SqliteDataType] = {}

    if index_columns is None:
        index_columns = []

    if nullable_columns is None:
        nullable_columns = []

    if unique_columns is None:
        unique_columns = []

    for cn, sqlite_data_type in columns.items():
        if cn in column_map.keys():
            new_key = column_map[cn]
            index_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in index_columns
            ]
            unique_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in unique_columns
            ]
            nullable_columns = [
                x if x not in column_map.keys() else column_map[x]
                for x in nullable_columns
            ]
        else:
            new_key = cn

        temp[new_key] = sqlite_data_type

    columns = temp
    if not columns:
        raise Exception("Resulting table schema has no columns.")
    else:
        for ic in index_columns:
            if ic not in columns.keys():
                raise Exception(
                    f"Can't create schema, requested index column name not available: {ic}"
                )

    schema = SqliteTableSchema(
        columns=columns,
        index_columns=index_columns,
        nullable_columns=nullable_columns,
        unique_columns=unique_columns,
        primary_key=primary_key,
    )
    return schema

create_sqlite_table_from_tabular_file(target_db_file, file_item, table_name=None, is_csv=True, is_tsv=False, is_nl=False, primary_key_column_names=None, flatten_nested_json_objects=False, csv_delimiter=None, quotechar=None, sniff=True, no_headers=False, encoding='utf-8', batch_size=100, detect_types=True)

Source code in tabular/utils.py
def create_sqlite_table_from_tabular_file(
    target_db_file: str,
    file_item: FileModel,
    table_name: Union[str, None] = None,
    is_csv: bool = True,
    is_tsv: bool = False,
    is_nl: bool = False,
    primary_key_column_names: Union[Iterable[str], None] = None,
    flatten_nested_json_objects: bool = False,
    csv_delimiter: Union[str, None] = None,
    quotechar: Union[str, None] = None,
    sniff: bool = True,
    no_headers: bool = False,
    encoding: str = "utf-8",
    batch_size: int = 100,
    detect_types: bool = True,
):

    if not table_name:
        table_name = file_item.file_name_without_extension

    f = open(file_item.path, "rb")

    try:
        insert_upsert_implementation(
            path=target_db_file,
            table=table_name,
            file=f,
            pk=primary_key_column_names,
            flatten=flatten_nested_json_objects,
            nl=is_nl,
            csv=is_csv,
            tsv=is_tsv,
            lines=False,
            text=False,
            convert=None,
            imports=None,
            delimiter=csv_delimiter,
            quotechar=quotechar,
            sniff=sniff,
            no_headers=no_headers,
            encoding=encoding,
            batch_size=batch_size,
            alter=False,
            upsert=False,
            ignore=False,
            replace=False,
            truncate=False,
            not_null=None,
            default=None,
            detect_types=detect_types,
            analyze=False,
            load_extension=None,
            silent=True,
            bulk_sql=None,
        )
    except Exception as e:
        log_exception(e)
    finally:
        f.close()

insert_db_table_from_file_bundle(database, file_bundle, table_name='file_items', include_content=True)

Source code in tabular/utils.py
def insert_db_table_from_file_bundle(
    database: KiaraDatabase,
    file_bundle: FileBundle,
    table_name: str = "file_items",
    include_content: bool = True,
):

    # TODO: check if table with that name exists

    from sqlalchemy import Column, Integer, MetaData, String, Table, Text, insert
    from sqlalchemy.engine import Engine

    # if db_file_path is None:
    #     temp_f = tempfile.mkdtemp()
    #     db_file_path = os.path.join(temp_f, "db.sqlite")
    #
    #     def cleanup():
    #         shutil.rmtree(db_file_path, ignore_errors=True)
    #
    #     atexit.register(cleanup)

    metadata_obj = MetaData()

    file_items = Table(
        table_name,
        metadata_obj,
        Column("id", Integer, primary_key=True),
        Column("size", Integer(), nullable=False),
        Column("mime_type", String(length=64), nullable=False),
        Column("rel_path", String(), nullable=False),
        Column("file_name", String(), nullable=False),
        Column("content", Text(), nullable=not include_content),
    )

    engine: Engine = database.get_sqlalchemy_engine()
    metadata_obj.create_all(engine)

    with engine.connect() as con:

        # TODO: commit in batches for better performance

        for index, rel_path in enumerate(sorted(file_bundle.included_files.keys())):
            f: FileModel = file_bundle.included_files[rel_path]
            if not include_content:
                content: Union[str, None] = f.read_text()  # type: ignore
            else:
                content = None

            _values = {
                "id": index,
                "size": f.size,
                "mime_type": f.mime_type,
                "rel_path": rel_path,
                "file_name": f.file_name,
                "content": content,
            }

            stmt = insert(file_items).values(**_values)
            con.execute(stmt)
        con.commit()