Skip to content

db

Attributes

KDBC = TypeVar('KDBC', bound='KiaraDatabase') module-attribute

Classes

SqliteTableSchema

Bases: BaseModel

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SqliteTableSchema(BaseModel):

    columns: Dict[str, SqliteDataType] = Field(
        description="The table columns and their attributes."
    )
    index_columns: List[str] = Field(
        description="The columns to index", default_factory=list
    )
    nullable_columns: List[str] = Field(
        description="The columns that are nullable.", default_factory=list
    )
    unique_columns: List[str] = Field(
        description="The columns that should be marked 'UNIQUE'.", default_factory=list
    )
    primary_key: Union[str, None] = Field(
        description="The primary key for this table.", default=None
    )

    def create_table_metadata(
        self,
        table_name: str,
    ) -> Tuple[MetaData, Table]:
        """Create an sql script to initialize a table.

        Arguments:
            column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
        """

        table_columns = []
        for column_name, data_type in self.columns.items():
            column_obj = Column(
                column_name,
                SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
                nullable=column_name in self.nullable_columns,
                primary_key=column_name == self.primary_key,
                index=column_name in self.index_columns,
                unique=column_name in self.unique_columns,
            )
            table_columns.append(column_obj)

        meta = MetaData()
        table = Table(table_name, meta, *table_columns)
        return meta, table

    def create_table(self, table_name: str, engine: Engine) -> Table:

        meta, table = self.create_table_metadata(table_name=table_name)
        meta.create_all(engine)
        return table

Attributes

columns: Dict[str, SqliteDataType] = Field(description='The table columns and their attributes.') class-attribute instance-attribute
index_columns: List[str] = Field(description='The columns to index', default_factory=list) class-attribute instance-attribute
nullable_columns: List[str] = Field(description='The columns that are nullable.', default_factory=list) class-attribute instance-attribute
unique_columns: List[str] = Field(description="The columns that should be marked 'UNIQUE'.", default_factory=list) class-attribute instance-attribute
primary_key: Union[str, None] = Field(description='The primary key for this table.', default=None) class-attribute instance-attribute

Functions

create_table_metadata(table_name: str) -> Tuple[MetaData, Table]

Create an sql script to initialize a table.

Parameters:

Name Type Description Default
column_attrs

a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values

required
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def create_table_metadata(
    self,
    table_name: str,
) -> Tuple[MetaData, Table]:
    """Create an sql script to initialize a table.

    Arguments:
        column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
    """

    table_columns = []
    for column_name, data_type in self.columns.items():
        column_obj = Column(
            column_name,
            SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
            nullable=column_name in self.nullable_columns,
            primary_key=column_name == self.primary_key,
            index=column_name in self.index_columns,
            unique=column_name in self.unique_columns,
        )
        table_columns.append(column_obj)

    meta = MetaData()
    table = Table(table_name, meta, *table_columns)
    return meta, table
create_table(table_name: str, engine: Engine) -> Table
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
89
90
91
92
93
def create_table(self, table_name: str, engine: Engine) -> Table:

    meta, table = self.create_table_metadata(table_name=table_name)
    meta.create_all(engine)
    return table

KiaraDatabase

Bases: KiaraModel

A wrapper class to manage a sqlite database.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
class KiaraDatabase(KiaraModel):
    """A wrapper class to manage a sqlite database."""

    @classmethod
    def create_in_temp_dir(
        cls: Type[KDBC],
        init_statement: Union[None, str, "TextClause"] = None,
        init_data: Union[Mapping[str, Any], None] = None,
    ) -> KDBC:

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(db_file_path=db_path)
        db.create_if_not_exists()

        if init_statement:
            db._unlock_db()
            db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
            db._lock_db()

        return db

    db_file_path: str = Field(description="The path to the sqlite database file.")

    _cached_engine = PrivateAttr(default=None)
    _cached_inspector = PrivateAttr(default=None)
    _table_names = PrivateAttr(default=None)
    _tables: Dict[str, Table] = PrivateAttr(default_factory=dict)
    _metadata_obj: Union[MetaData, None] = PrivateAttr(default=None)
    # _table_schemas: Optional[Dict[str, SqliteTableSchema]] = PrivateAttr(default=None)
    # _file_hash: Optional[str] = PrivateAttr(default=None)
    _file_cid: Union[CID, None] = PrivateAttr(default=None)
    _lock: bool = PrivateAttr(default=True)
    _immutable: bool = PrivateAttr(default=None)

    def _retrieve_id(self) -> str:
        return str(self.file_cid)

    def _retrieve_data_to_hash(self) -> Any:
        return self.file_cid

    @validator("db_file_path", allow_reuse=True)
    def ensure_absolute_path(cls, path: str):

        path = os.path.abspath(path)
        if not os.path.exists(os.path.dirname(path)):
            raise ValueError(f"Parent folder for database file does not exist: {path}")
        return path

    @property
    def db_url(self) -> str:
        return f"sqlite:///{self.db_file_path}"

    @property
    def file_cid(self) -> CID:

        if self._file_cid is not None:
            return self._file_cid

        self._file_cid = compute_cid_from_file(file=self.db_file_path, codec="raw")
        return self._file_cid

    def get_sqlalchemy_engine(self) -> "Engine":

        if self._cached_engine is not None:
            return self._cached_engine

        def _pragma_on_connect(dbapi_con, con_record):
            dbapi_con.execute("PRAGMA query_only = ON")

        self._cached_engine = create_engine(self.db_url, future=True)

        if self._lock:
            event.listen(self._cached_engine, "connect", _pragma_on_connect)

        return self._cached_engine

    def _lock_db(self):
        self._lock = True
        self._invalidate()

    def _unlock_db(self):
        if self._immutable:
            raise Exception("Can't unlock db, it's immutable.")
        self._lock = False
        self._invalidate()

    def create_if_not_exists(self):

        from sqlalchemy_utils import create_database, database_exists

        if not database_exists(self.db_url):
            create_database(self.db_url)

    def execute_sql(
        self,
        statement: Union[str, "TextClause"],
        data: Union[Mapping[str, Any], None] = None,
        invalidate: bool = False,
    ):
        """Execute an sql script.

        Arguments:
          statement: the sql statement
          data: (optional) data, to be bound to the statement
          invalidate: whether to invalidate cached values within this object
        """

        if isinstance(statement, str):
            statement = text(statement)

        if data:
            statement = statement.bindparams(**data)

        with self.get_sqlalchemy_engine().connect() as con:
            result = con.execute(statement)

        if invalidate:
            self._invalidate()

        return result

    def _invalidate(self):
        self._cached_engine = None
        self._cached_inspector = None
        self._table_names = None
        # self._file_hash = None
        self._metadata_obj = None
        self._tables.clear()

    def _invalidate_other(self):
        pass

    def get_sqlalchemy_metadata(self) -> MetaData:
        """Return the sqlalchemy Metadtaa object for the underlying database.

        This is used internally, you typically don't need to access this attribute.

        """

        if self._metadata_obj is None:
            self._metadata_obj = MetaData()
        return self._metadata_obj

    def copy_database_file(self, target: str):

        os.makedirs(os.path.dirname(target))

        shutil.copy2(self.db_file_path, target)

        new_db = KiaraDatabase(db_file_path=target)
        # if self._file_hash:
        #     new_db._file_hash = self._file_hash
        return new_db

    def get_sqlalchemy_inspector(self) -> Inspector:

        if self._cached_inspector is not None:
            return self._cached_inspector

        self._cached_inspector = inspect(self.get_sqlalchemy_engine())
        return self._cached_inspector

    @property
    def table_names(self) -> Iterable[str]:
        if self._table_names is not None:
            return self._table_names

        self._table_names = self.get_sqlalchemy_inspector().get_table_names()
        return self._table_names

    def get_sqlalchemy_table(self, table_name: str) -> Table:
        """Return the sqlalchemy edges table instance for this network datab."""

        if table_name in self._tables.keys():
            return self._tables[table_name]

        table = Table(
            table_name,
            self.get_sqlalchemy_metadata(),
            autoload_with=self.get_sqlalchemy_engine(),
        )
        self._tables[table_name] = table
        return table

    def get_table_as_pandas_df(self, table_name: str) -> "pd.DataFrame":

        import pandas as pd

        query = text(f'SELECT * FROM "{table_name}"')
        with self.get_sqlalchemy_engine().connect() as con:
            df = pd.read_sql(query, con)  # noqa

        return df

    def create_metadata(self) -> "DatabaseMetadata":

        insp = self.get_sqlalchemy_inspector()

        mds = {}

        for table_name in insp.get_table_names():

            with self.get_sqlalchemy_engine().connect() as con:
                query = f'SELECT count(*) from "{table_name}"'
                result = con.execute(text(query))
                num_rows = result.fetchone()[0]

                try:
                    result = con.execute(
                        text(
                            f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                        )
                    )
                    size: Union[int, None] = result.fetchone()[0]
                except Exception:
                    size = None

            columns = {}
            backend_properties: Dict[str, Any] = {"column_details": {}}
            for column in insp.get_columns(table_name=table_name):
                name = column["name"]
                _type = column["type"]
                type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
                backend_properties["column_details"][name] = {
                    "nullable": column["nullable"],
                    "primary_key": True if column["primary_key"] else False,
                }
                columns[name] = {
                    "type_name": type_name,
                }

            backend = StorageBackend(name="sqlite", properties=backend_properties)

            schema = {
                "column_names": list(columns.keys()),
                "column_schema": columns,
                "backend": backend,
                "rows": num_rows,
                "size": size,
            }

            md = TableMetadata(**schema)
            mds[table_name] = md

        return DatabaseMetadata.construct(tables=mds)

Attributes

db_file_path: str = Field(description='The path to the sqlite database file.') class-attribute instance-attribute
db_url: str property
file_cid: CID property
table_names: Iterable[str] property

Functions

create_in_temp_dir(init_statement: Union[None, str, TextClause] = None, init_data: Union[Mapping[str, Any], None] = None) -> KDBC classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@classmethod
def create_in_temp_dir(
    cls: Type[KDBC],
    init_statement: Union[None, str, "TextClause"] = None,
    init_data: Union[Mapping[str, Any], None] = None,
) -> KDBC:

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = cls(db_file_path=db_path)
    db.create_if_not_exists()

    if init_statement:
        db._unlock_db()
        db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
        db._lock_db()

    return db
ensure_absolute_path(path: str)
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
143
144
145
146
147
148
149
@validator("db_file_path", allow_reuse=True)
def ensure_absolute_path(cls, path: str):

    path = os.path.abspath(path)
    if not os.path.exists(os.path.dirname(path)):
        raise ValueError(f"Parent folder for database file does not exist: {path}")
    return path
get_sqlalchemy_engine() -> Engine
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def get_sqlalchemy_engine(self) -> "Engine":

    if self._cached_engine is not None:
        return self._cached_engine

    def _pragma_on_connect(dbapi_con, con_record):
        dbapi_con.execute("PRAGMA query_only = ON")

    self._cached_engine = create_engine(self.db_url, future=True)

    if self._lock:
        event.listen(self._cached_engine, "connect", _pragma_on_connect)

    return self._cached_engine
create_if_not_exists()
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
189
190
191
192
193
194
def create_if_not_exists(self):

    from sqlalchemy_utils import create_database, database_exists

    if not database_exists(self.db_url):
        create_database(self.db_url)
execute_sql(statement: Union[str, TextClause], data: Union[Mapping[str, Any], None] = None, invalidate: bool = False)

Execute an sql script.

Parameters:

Name Type Description Default
statement Union[str, TextClause]

the sql statement

required
data Union[Mapping[str, Any], None]

(optional) data, to be bound to the statement

None
invalidate bool

whether to invalidate cached values within this object

False
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def execute_sql(
    self,
    statement: Union[str, "TextClause"],
    data: Union[Mapping[str, Any], None] = None,
    invalidate: bool = False,
):
    """Execute an sql script.

    Arguments:
      statement: the sql statement
      data: (optional) data, to be bound to the statement
      invalidate: whether to invalidate cached values within this object
    """

    if isinstance(statement, str):
        statement = text(statement)

    if data:
        statement = statement.bindparams(**data)

    with self.get_sqlalchemy_engine().connect() as con:
        result = con.execute(statement)

    if invalidate:
        self._invalidate()

    return result
get_sqlalchemy_metadata() -> MetaData

Return the sqlalchemy Metadtaa object for the underlying database.

This is used internally, you typically don't need to access this attribute.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
235
236
237
238
239
240
241
242
243
244
def get_sqlalchemy_metadata(self) -> MetaData:
    """Return the sqlalchemy Metadtaa object for the underlying database.

    This is used internally, you typically don't need to access this attribute.

    """

    if self._metadata_obj is None:
        self._metadata_obj = MetaData()
    return self._metadata_obj
copy_database_file(target: str)
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
246
247
248
249
250
251
252
253
254
255
def copy_database_file(self, target: str):

    os.makedirs(os.path.dirname(target))

    shutil.copy2(self.db_file_path, target)

    new_db = KiaraDatabase(db_file_path=target)
    # if self._file_hash:
    #     new_db._file_hash = self._file_hash
    return new_db
get_sqlalchemy_inspector() -> Inspector
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
257
258
259
260
261
262
263
def get_sqlalchemy_inspector(self) -> Inspector:

    if self._cached_inspector is not None:
        return self._cached_inspector

    self._cached_inspector = inspect(self.get_sqlalchemy_engine())
    return self._cached_inspector
get_sqlalchemy_table(table_name: str) -> Table

Return the sqlalchemy edges table instance for this network datab.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
273
274
275
276
277
278
279
280
281
282
283
284
285
def get_sqlalchemy_table(self, table_name: str) -> Table:
    """Return the sqlalchemy edges table instance for this network datab."""

    if table_name in self._tables.keys():
        return self._tables[table_name]

    table = Table(
        table_name,
        self.get_sqlalchemy_metadata(),
        autoload_with=self.get_sqlalchemy_engine(),
    )
    self._tables[table_name] = table
    return table
get_table_as_pandas_df(table_name: str) -> pd.DataFrame
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
287
288
289
290
291
292
293
294
295
def get_table_as_pandas_df(self, table_name: str) -> "pd.DataFrame":

    import pandas as pd

    query = text(f'SELECT * FROM "{table_name}"')
    with self.get_sqlalchemy_engine().connect() as con:
        df = pd.read_sql(query, con)  # noqa

    return df
create_metadata() -> DatabaseMetadata
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def create_metadata(self) -> "DatabaseMetadata":

    insp = self.get_sqlalchemy_inspector()

    mds = {}

    for table_name in insp.get_table_names():

        with self.get_sqlalchemy_engine().connect() as con:
            query = f'SELECT count(*) from "{table_name}"'
            result = con.execute(text(query))
            num_rows = result.fetchone()[0]

            try:
                result = con.execute(
                    text(
                        f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
                    )
                )
                size: Union[int, None] = result.fetchone()[0]
            except Exception:
                size = None

        columns = {}
        backend_properties: Dict[str, Any] = {"column_details": {}}
        for column in insp.get_columns(table_name=table_name):
            name = column["name"]
            _type = column["type"]
            type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
            backend_properties["column_details"][name] = {
                "nullable": column["nullable"],
                "primary_key": True if column["primary_key"] else False,
            }
            columns[name] = {
                "type_name": type_name,
            }

        backend = StorageBackend(name="sqlite", properties=backend_properties)

        schema = {
            "column_names": list(columns.keys()),
            "column_schema": columns,
            "backend": backend,
            "rows": num_rows,
            "size": size,
        }

        md = TableMetadata(**schema)
        mds[table_name] = md

    return DatabaseMetadata.construct(tables=mds)

DatabaseMetadata

Bases: ValueMetadata

Database and table properties.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
class DatabaseMetadata(ValueMetadata):
    """Database and table properties."""

    _metadata_key = "database"

    @classmethod
    def retrieve_supported_data_types(cls) -> Iterable[str]:
        return ["database"]

    @classmethod
    def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

        database: KiaraDatabase = value.data
        return database.create_metadata()

    tables: Dict[str, TableMetadata] = Field(description="The table schema.")

Attributes

tables: Dict[str, TableMetadata] = Field(description='The table schema.') class-attribute instance-attribute

Functions

retrieve_supported_data_types() -> Iterable[str] classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
355
356
357
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
    return ["database"]
create_value_metadata(value: Value) -> DatabaseMetadata classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/db.py
359
360
361
362
363
@classmethod
def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":

    database: KiaraDatabase = value.data
    return database.create_metadata()