Skip to content

db

Classes

DatabaseMetadata (ValueMetadata) pydantic-model

Database and table properties.

Source code in tabular/models/db.py
class DatabaseMetadata(ValueMetadata):
    """Database and table properties."""

    _metadata_key = "database"

    @classmethod
    def retrieve_supported_data_types(cls) -> Iterable[str]:
        return ["database"]

    @classmethod
    def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

        database: KiaraDatabase = value.data

        insp = database.get_sqlalchemy_inspector()

        mds = {}

        for table_name in insp.get_table_names():

            with database.get_sqlalchemy_engine().connect() as con:
                result = con.execute(text(f"SELECT count(*) from {table_name}"))
                num_rows = result.fetchone()[0]

                try:
                    result = con.execute(
                        text(
                            f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                        )
                    )
                    size: Union[int, None] = result.fetchone()[0]
                except Exception:
                    size = None

            columns = {}
            for column in insp.get_columns(table_name=table_name):
                name = column["name"]
                _type = column["type"]
                type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
                columns[name] = {
                    "type_name": type_name,
                    "metadata": {
                        "nullable": column["nullable"],
                        "primary_key": True if column["primary_key"] else False,
                    },
                }

            schema = {
                "column_names": list(columns.keys()),
                "column_schema": columns,
                "rows": num_rows,
                "size": size,
            }

            md = TableMetadata(**schema)
            mds[table_name] = md

        return DatabaseMetadata.construct(tables=mds)

    tables: Dict[str, TableMetadata] = Field(description="The table schema.")

Attributes

tables: Dict[str, kiara_plugin.tabular.models.TableMetadata] pydantic-field required

The table schema.

create_value_metadata(value) classmethod
Source code in tabular/models/db.py
@classmethod
def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

    database: KiaraDatabase = value.data

    insp = database.get_sqlalchemy_inspector()

    mds = {}

    for table_name in insp.get_table_names():

        with database.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(f"SELECT count(*) from {table_name}"))
            num_rows = result.fetchone()[0]

            try:
                result = con.execute(
                    text(
                        f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                    )
                )
                size: Union[int, None] = result.fetchone()[0]
            except Exception:
                size = None

        columns = {}
        for column in insp.get_columns(table_name=table_name):
            name = column["name"]
            _type = column["type"]
            type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
            columns[name] = {
                "type_name": type_name,
                "metadata": {
                    "nullable": column["nullable"],
                    "primary_key": True if column["primary_key"] else False,
                },
            }

        schema = {
            "column_names": list(columns.keys()),
            "column_schema": columns,
            "rows": num_rows,
            "size": size,
        }

        md = TableMetadata(**schema)
        mds[table_name] = md

    return DatabaseMetadata.construct(tables=mds)
retrieve_supported_data_types() classmethod
Source code in tabular/models/db.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
    return ["database"]

KiaraDatabase (KiaraModel) pydantic-model

A wrapper class to manage a sqlite database.

Source code in tabular/models/db.py
class KiaraDatabase(KiaraModel):
    """A wrapper class to manage a sqlite database."""

    @classmethod
    def create_in_temp_dir(
        cls,
        init_statement: Union[None, str, "TextClause"] = None,
        init_data: Union[Mapping[str, Any], None] = None,
    ):

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(db_file_path=db_path)
        db.create_if_not_exists()

        if init_statement:
            db._unlock_db()
            db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
            db._lock_db()

        return db

    db_file_path: str = Field(description="The path to the sqlite database file.")

    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _tables: Dict[str, Table] = PrivateAttr(default_factory=dict)
    _metadata_obj: Union[MetaData, None] = PrivateAttr(default=None)
    # _table_schemas: Optional[Dict[str, SqliteTableSchema]] = PrivateAttr(default=None)
    # _file_hash: Optional[str] = PrivateAttr(default=None)
    _file_cid: Union[CID, None] = PrivateAttr(default=None)
    _lock: bool = PrivateAttr(default=True)
    _immutable: bool = PrivateAttr(default=None)

    def _retrieve_id(self) -> str:
        return str(self.file_cid)

    def _retrieve_data_to_hash(self) -> Any:
        return self.file_cid

    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):

        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path

    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"

    @property
    def file_cid(self) -> CID:

        if self._file_cid is not None:
            return self._file_cid

        self._file_cid = compute_cid_from_file(file=self.db_file_path, codec="raw")
        return self._file_cid

    def get_sqlalchemy_engine(self) -> "Engine":

        if self._cached_engine is not None:
            return self._cached_engine

        def _pragma_on_connect(dbapi_con, con_record):
            dbapi_con.execute("PRAGMA query_only = ON")

        self._cached_engine = create_engine(self.db_url, future=True)

        if self._lock:
            event.listen(self._cached_engine, "connect", _pragma_on_connect)

        return self._cached_engine

    def _lock_db(self):
        self._lock = True
        self._invalidate()

    def _unlock_db(self):
        if self._immutable:
            raise Exception("Can't unlock db, it's immutable.")
        self._lock = False
        self._invalidate()

    def create_if_not_exists(self):

        from sqlalchemy_utils import create_database, database_exists

        if not database_exists(self.db_url):
            create_database(self.db_url)

    def execute_sql(
        self,
        statement: Union[str, "TextClause"],
        data: Union[Mapping[str, Any], None] = None,
        invalidate: bool = False,
    ):
        """Execute an sql script.

        Arguments:
          statement: the sql statement
          data: (optional) data, to be bound to the statement
          invalidate: whether to invalidate cached values within this object
        """

        if isinstance(statement, str):
            statement = text(statement)

        if data:
            statement.bindparams(**data)

        with self.get_sqlalchemy_engine().connect() as con:
            con.execute(statement)

        if invalidate:
            self._invalidate()

    def _invalidate(self):
        self._cached_engine = None
        self._cached_inspector = None
        self._table_names = None
        # self._file_hash = None
        self._metadata_obj = None
        self._tables.clear()

    def _invalidate_other(self):
        pass

    def get_sqlalchemy_metadata(self) -> MetaData:
        """Return the sqlalchemy Metadtaa object for the underlying database.

        This is used internally, you typically don't need to access this attribute.

        """

        if self._metadata_obj is None:
            self._metadata_obj = MetaData()
        return self._metadata_obj

    def copy_database_file(self, target: str):

        os.makedirs(os.path.dirname(target))

        shutil.copy2(self.db_file_path, target)

        new_db = KiaraDatabase(db_file_path=target)
        # if self._file_hash:
        #     new_db._file_hash = self._file_hash
        return new_db

    def get_sqlalchemy_inspector(self) -> Inspector:

        if self._cached_inspector is not None:
            return self._cached_inspector

        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector

    @property
    def table_names(self) -> Iterable[str]:
        if self._table_names is not None:
            return self._table_names

        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names

    def get_sqlalchemy_table(self, table_name: str) -> Table:
        """Return the sqlalchemy edges table instance for this network datab."""

        if table_name in self._tables.keys():
            return self._tables[table_name]

        table = Table(
            table_name,
            self.get_sqlalchemy_metadata(),
            autoload_with=self.get_sqlalchemy_engine(),
        )
        self._tables[table_name] = table
        return table

Attributes

db_file_path: str pydantic-field required

The path to the sqlite database file.

db_url: str property readonly
file_cid: CID property readonly
table_names: Iterable[str] property readonly

Methods

copy_database_file(self, target)
Source code in tabular/models/db.py
def copy_database_file(self, target: str):

    os.makedirs(os.path.dirname(target))

    shutil.copy2(self.db_file_path, target)

    new_db = KiaraDatabase(db_file_path=target)
    # if self._file_hash:
    #     new_db._file_hash = self._file_hash
    return new_db
create_if_not_exists(self)
Source code in tabular/models/db.py
def create_if_not_exists(self):

    from sqlalchemy_utils import create_database, database_exists

    if not database_exists(self.db_url):
        create_database(self.db_url)
create_in_temp_dir(init_statement=None, init_data=None) classmethod
Source code in tabular/models/db.py
@classmethod
def create_in_temp_dir(
    cls,
    init_statement: Union[None, str, "TextClause"] = None,
    init_data: Union[Mapping[str, Any], None] = None,
):

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = cls(db_file_path=db_path)
    db.create_if_not_exists()

    if init_statement:
        db._unlock_db()
        db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
        db._lock_db()

    return db
ensure_absolute_path(path) classmethod
Source code in tabular/models/db.py
@validator("db_file_path", allow_reuse=True)
def ensure_absolute_path(cls, path: str):

    path = os.path.abspath(path)
    if not os.path.exists(os.path.dirname(path)):
        raise ValueError(f"Parent folder for database file does not exist: {path}")
    return path
execute_sql(self, statement, data=None, invalidate=False)

Execute an sql script.

Parameters:

Name Type Description Default
statement Union[str, TextClause]

the sql statement

required
data Optional[Mapping[str, Any]]

(optional) data, to be bound to the statement

None
invalidate bool

whether to invalidate cached values within this object

False
Source code in tabular/models/db.py
def execute_sql(
    self,
    statement: Union[str, "TextClause"],
    data: Union[Mapping[str, Any], None] = None,
    invalidate: bool = False,
):
    """Execute an sql script.

    Arguments:
      statement: the sql statement
      data: (optional) data, to be bound to the statement
      invalidate: whether to invalidate cached values within this object
    """

    if isinstance(statement, str):
        statement = text(statement)

    if data:
        statement.bindparams(**data)

    with self.get_sqlalchemy_engine().connect() as con:
        con.execute(statement)

    if invalidate:
        self._invalidate()
get_sqlalchemy_engine(self)
Source code in tabular/models/db.py
def get_sqlalchemy_engine(self) -> "Engine":

    if self._cached_engine is not None:
        return self._cached_engine

    def _pragma_on_connect(dbapi_con, con_record):
        dbapi_con.execute("PRAGMA query_only = ON")

    self._cached_engine = create_engine(self.db_url, future=True)

    if self._lock:
        event.listen(self._cached_engine, "connect", _pragma_on_connect)

    return self._cached_engine
get_sqlalchemy_inspector(self)
Source code in tabular/models/db.py
def get_sqlalchemy_inspector(self) -> Inspector:

    if self._cached_inspector is not None:
        return self._cached_inspector

    self._cached_inspector = inspect(self.get_sqlalchemy_engine())
    return self._cached_inspector
get_sqlalchemy_metadata(self)

Return the sqlalchemy Metadtaa object for the underlying database.

This is used internally, you typically don't need to access this attribute.

Source code in tabular/models/db.py
def get_sqlalchemy_metadata(self) -> MetaData:
    """Return the sqlalchemy Metadtaa object for the underlying database.

    This is used internally, you typically don't need to access this attribute.

    """

    if self._metadata_obj is None:
        self._metadata_obj = MetaData()
    return self._metadata_obj
get_sqlalchemy_table(self, table_name)

Return the sqlalchemy edges table instance for this network datab.

Source code in tabular/models/db.py
def get_sqlalchemy_table(self, table_name: str) -> Table:
    """Return the sqlalchemy edges table instance for this network datab."""

    if table_name in self._tables.keys():
        return self._tables[table_name]

    table = Table(
        table_name,
        self.get_sqlalchemy_metadata(),
        autoload_with=self.get_sqlalchemy_engine(),
    )
    self._tables[table_name] = table
    return table

SqliteTableSchema (BaseModel) pydantic-model

Source code in tabular/models/db.py
class SqliteTableSchema(BaseModel):

    columns: Dict[str, SqliteDataType] = Field(
        description="The table columns and their attributes."
    )
    index_columns: List[str] = Field(
        description="The columns to index", default_factory=list
    )
    nullable_columns: List[str] = Field(
        description="The columns that are nullable.", default_factory=list
    )
    unique_columns: List[str] = Field(
        description="The columns that should be marked 'UNIQUE'.", default_factory=list
    )
    primary_key: Union[str, None] = Field(
        description="The primary key for this table.", default=None
    )

    def create_table_metadata(
        self,
        table_name: str,
    ) -> Tuple[MetaData, Table]:
        """Create an sql script to initialize a table.

        Arguments:
            column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
        """

        table_columns = []
        for column_name, data_type in self.columns.items():
            column_obj = Column(
                column_name,
                SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
                nullable=column_name in self.nullable_columns,
                primary_key=column_name == self.primary_key,
                index=column_name in self.index_columns,
                unique=column_name in self.unique_columns,
            )
            table_columns.append(column_obj)

        meta = MetaData()
        table = Table(table_name, meta, *table_columns)
        return meta, table

    def create_table(self, table_name: str, engine: Engine) -> Table:

        meta, table = self.create_table_metadata(table_name=table_name)
        meta.create_all(engine)
        return table

Attributes

columns: Dict[str, Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB']] pydantic-field required

The table columns and their attributes.

index_columns: List[str] pydantic-field

The columns to index

nullable_columns: List[str] pydantic-field

The columns that are nullable.

primary_key: str pydantic-field

The primary key for this table.

unique_columns: List[str] pydantic-field

The columns that should be marked 'UNIQUE'.

Methods

create_table(self, table_name, engine)
Source code in tabular/models/db.py
def create_table(self, table_name: str, engine: Engine) -> Table:

    meta, table = self.create_table_metadata(table_name=table_name)
    meta.create_all(engine)
    return table
create_table_metadata(self, table_name)

Create an sql script to initialize a table.

Parameters:

Name Type Description Default
column_attrs

a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values

required
Source code in tabular/models/db.py
def create_table_metadata(
    self,
    table_name: str,
) -> Tuple[MetaData, Table]:
    """Create an sql script to initialize a table.

    Arguments:
        column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
    """

    table_columns = []
    for column_name, data_type in self.columns.items():
        column_obj = Column(
            column_name,
            SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
            nullable=column_name in self.nullable_columns,
            primary_key=column_name == self.primary_key,
            index=column_name in self.index_columns,
            unique=column_name in self.unique_columns,
        )
        table_columns.append(column_obj)

    meta = MetaData()
    table = Table(table_name, meta, *table_columns)
    return meta, table