Skip to content

db

Attributes

Classes

CreateDatabaseModuleConfig

Bases: CreateFromModuleConfig

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class CreateDatabaseModuleConfig(CreateFromModuleConfig):

    ignore_errors: bool = Field(
        description="Whether to ignore convert errors and omit the failed items.",
        default=False,
    )
    merge_into_single_table: bool = Field(
        description="Whether to merge all csv files into a single table.", default=False
    )
    include_source_metadata: Union[bool, None] = Field(
        description="Whether to include a table with metadata about the source files.",
        default=None,
    )
    include_source_file_content: bool = Field(
        description="When including source metadata, whether to also include the original raw (string) content.",
        default=False,
    )

Attributes

ignore_errors: bool = Field(description='Whether to ignore convert errors and omit the failed items.', default=False) class-attribute instance-attribute
merge_into_single_table: bool = Field(description='Whether to merge all csv files into a single table.', default=False) class-attribute instance-attribute
include_source_metadata: Union[bool, None] = Field(description='Whether to include a table with metadata about the source files.', default=None) class-attribute instance-attribute
include_source_file_content: bool = Field(description='When including source metadata, whether to also include the original raw (string) content.', default=False) class-attribute instance-attribute

CreateDatabaseModule

Bases: CreateFromModule

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
class CreateDatabaseModule(CreateFromModule):

    _module_type_name = "create.database"
    _config_cls = CreateDatabaseModuleConfig

    def create__database__from__file(
        self, source_value: Value, optional: ValueMap
    ) -> Any:
        """Create a database from a file.

        Currently, only csv files are supported.
        """
        import csv as py_csv

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        file_item: KiaraFile = source_value.data
        if not file_item.file_name.endswith(".csv"):
            raise KiaraProcessingException(
                "Only csv files are supported (at the moment)."
            )

        table_name = file_item.file_name_without_extension

        table_name = table_name.replace("-", "_")
        table_name = table_name.replace(".", "_")

        has_header = optional.get_value_data("first_row_is_header")
        if has_header is None:
            try:
                has_header = True
                with open(source_value.data.path, "rt") as csvfile:
                    sniffer = py_csv.Sniffer()
                    has_header = sniffer.has_header(csvfile.read(2048))
                    csvfile.seek(0)
            except Exception as e:
                # TODO: add this to the procss log
                log_message(
                    "csv_sniffer.error",
                    file=source_value.data.path,
                    error=str(e),
                    details="assuming csv file has header",
                )

        try:
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path,
                file_item=file_item,
                table_name=table_name,
                no_headers=not has_header,
            )
        except Exception as e:
            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=file_item.path, reason=str(e))
            else:
                raise KiaraProcessingException(e)

        include_raw_content_in_file_info: bool = self.get_config_value(
            "include_source_metadata"
        )
        if include_raw_content_in_file_info:
            db = KiaraDatabase(db_file_path=db_path)
            db.create_if_not_exists()
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()
            included_files = {file_item.file_name: file_item}
            file_bundle = KiaraFileBundle.create_from_file_models(
                files=included_files, bundle_name=file_item.file_name
            )
            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=file_bundle,
                table_name="source_files_metadata",
                include_content=include_content,
            )
            db._lock_db()

        return db_path

    def create__database__from__file_bundle(
        self, source_value: Value, job_log: JobLog
    ) -> Any:
        """Create a database from a file_bundle value.

        Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.

        Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table
        in the resulting database. If this option is set, only a single table with all the values of all
        csv files will be created. For this to work, all csv files should follow the same schema.

        """

        merge_into_single_table = self.get_config_value("merge_into_single_table")
        if merge_into_single_table:
            raise NotImplementedError("Not supported (yet).")

        include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
            "include_source_metadata"
        )

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "db.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()

        # TODO: check whether/how to add indexes

        bundle: KiaraFileBundle = source_value.data

        table_names: List[str] = []
        included_files: Dict[str, bool] = {}
        errors: Dict[str, Union[None, str]] = {}
        for rel_path in sorted(bundle.included_files.keys()):

            if not rel_path.endswith(".csv"):
                job_log.add_log(
                    f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO
                )
                included_files[rel_path] = False
                errors[rel_path] = "Not a csv file."
                continue

            file_item = bundle.included_files[rel_path]
            table_name = find_free_id(
                stem=file_item.file_name_without_extension, current_ids=table_names
            )
            try:
                table_names.append(table_name)
                create_sqlite_table_from_tabular_file(
                    target_db_file=db_path, file_item=file_item, table_name=table_name
                )
                included_files[rel_path] = True
            except Exception as e:
                included_files[rel_path] = False
                errors[rel_path] = KiaraException.get_root_details(e)

                if self.get_config_value("ignore_errors") is True or True:
                    log_message("ignore.import_file", file=rel_path, reason=str(e))
                    continue

                raise KiaraProcessingException(e)

        if include_raw_content_in_file_info in [None, True]:
            include_content: bool = self.get_config_value("include_source_file_content")
            db._unlock_db()

            insert_db_table_from_file_bundle(
                database=db,
                file_bundle=source_value.data,
                table_name="source_files_metadata",
                include_content=include_content,
                included_files=included_files,
                errors=errors,
            )
            db._lock_db()

        return db_path

    def create_optional_inputs(
        self, source_type: str, target_type
    ) -> Union[Mapping[str, Mapping[str, Any]], None]:

        inputs = {}
        if source_type == "file":
            inputs["first_row_is_header"] = {
                "type": "boolean",
                "optional": True,
                "doc": "Whether the first row of the file is a header row. If not provided, kiara will try to auto-determine.",
            }

        if target_type == "database" and source_type == "table":

            inputs["table_name"] = {
                "type": "string",
                "doc": "The name of the table in the new database.",
                "default": "imported_table",
            }

        return inputs

    def create__database__from__tables(
        self, source_value: Value, optional: ValueMap
    ) -> Any:
        """Create a database value from a list of tables."""

        from kiara_plugin.tabular.utils.tables import create_database_from_tables

        tables: KiaraTables = source_value.data
        db = create_database_from_tables(tables=tables)

        return db

    def create__database__from__table(
        self, source_value: Value, optional: ValueMap
    ) -> Any:
        """Create a database value from a table."""

        table_name = optional.get_value_data("table_name")
        if not table_name:
            table_name = DEFAULT_TABLE_NAME

        table: KiaraTable = source_value.data
        arrow_table = table.arrow_table

        column_map = None
        index_columns = None

        sqlite_schema = create_sqlite_schema_data_from_arrow_table(
            table=arrow_table, index_columns=index_columns, column_map=column_map
        )

        db = KiaraDatabase.create_in_temp_dir()
        db._unlock_db()
        engine = db.get_sqlalchemy_engine()

        _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

        with engine.connect() as conn:

            for batch in arrow_table.to_batches(
                max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
            ):
                conn.execute(insert(_table), batch.to_pylist())
                conn.commit()

        db._lock_db()
        return db

Attributes

_config_cls = CreateDatabaseModuleConfig class-attribute instance-attribute

Functions

create__database__from__file(source_value: Value, optional: ValueMap) -> Any

Create a database from a file.

Currently, only csv files are supported.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def create__database__from__file(
    self, source_value: Value, optional: ValueMap
) -> Any:
    """Create a database from a file.

    Currently, only csv files are supported.
    """
    import csv as py_csv

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    file_item: KiaraFile = source_value.data
    if not file_item.file_name.endswith(".csv"):
        raise KiaraProcessingException(
            "Only csv files are supported (at the moment)."
        )

    table_name = file_item.file_name_without_extension

    table_name = table_name.replace("-", "_")
    table_name = table_name.replace(".", "_")

    has_header = optional.get_value_data("first_row_is_header")
    if has_header is None:
        try:
            has_header = True
            with open(source_value.data.path, "rt") as csvfile:
                sniffer = py_csv.Sniffer()
                has_header = sniffer.has_header(csvfile.read(2048))
                csvfile.seek(0)
        except Exception as e:
            # TODO: add this to the procss log
            log_message(
                "csv_sniffer.error",
                file=source_value.data.path,
                error=str(e),
                details="assuming csv file has header",
            )

    try:
        create_sqlite_table_from_tabular_file(
            target_db_file=db_path,
            file_item=file_item,
            table_name=table_name,
            no_headers=not has_header,
        )
    except Exception as e:
        if self.get_config_value("ignore_errors") is True or True:
            log_message("ignore.import_file", file=file_item.path, reason=str(e))
        else:
            raise KiaraProcessingException(e)

    include_raw_content_in_file_info: bool = self.get_config_value(
        "include_source_metadata"
    )
    if include_raw_content_in_file_info:
        db = KiaraDatabase(db_file_path=db_path)
        db.create_if_not_exists()
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()
        included_files = {file_item.file_name: file_item}
        file_bundle = KiaraFileBundle.create_from_file_models(
            files=included_files, bundle_name=file_item.file_name
        )
        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=file_bundle,
            table_name="source_files_metadata",
            include_content=include_content,
        )
        db._lock_db()

    return db_path
create__database__from__file_bundle(source_value: Value, job_log: JobLog) -> Any

Create a database from a file_bundle value.

Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.

Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
def create__database__from__file_bundle(
    self, source_value: Value, job_log: JobLog
) -> Any:
    """Create a database from a file_bundle value.

    Currently, only csv files are supported, files in the source file_bundle that have different extensions will be ignored.

    Unless 'merge_into_single_table' is set to 'True' in the module configuration, each csv file will create one table
    in the resulting database. If this option is set, only a single table with all the values of all
    csv files will be created. For this to work, all csv files should follow the same schema.

    """

    merge_into_single_table = self.get_config_value("merge_into_single_table")
    if merge_into_single_table:
        raise NotImplementedError("Not supported (yet).")

    include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
        "include_source_metadata"
    )

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "db.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = KiaraDatabase(db_file_path=db_path)
    db.create_if_not_exists()

    # TODO: check whether/how to add indexes

    bundle: KiaraFileBundle = source_value.data

    table_names: List[str] = []
    included_files: Dict[str, bool] = {}
    errors: Dict[str, Union[None, str]] = {}
    for rel_path in sorted(bundle.included_files.keys()):

        if not rel_path.endswith(".csv"):
            job_log.add_log(
                f"Ignoring file (not csv): {rel_path}", log_level=logging.INFO
            )
            included_files[rel_path] = False
            errors[rel_path] = "Not a csv file."
            continue

        file_item = bundle.included_files[rel_path]
        table_name = find_free_id(
            stem=file_item.file_name_without_extension, current_ids=table_names
        )
        try:
            table_names.append(table_name)
            create_sqlite_table_from_tabular_file(
                target_db_file=db_path, file_item=file_item, table_name=table_name
            )
            included_files[rel_path] = True
        except Exception as e:
            included_files[rel_path] = False
            errors[rel_path] = KiaraException.get_root_details(e)

            if self.get_config_value("ignore_errors") is True or True:
                log_message("ignore.import_file", file=rel_path, reason=str(e))
                continue

            raise KiaraProcessingException(e)

    if include_raw_content_in_file_info in [None, True]:
        include_content: bool = self.get_config_value("include_source_file_content")
        db._unlock_db()

        insert_db_table_from_file_bundle(
            database=db,
            file_bundle=source_value.data,
            table_name="source_files_metadata",
            include_content=include_content,
            included_files=included_files,
            errors=errors,
        )
        db._lock_db()

    return db_path
create_optional_inputs(source_type: str, target_type: str) -> Union[Mapping[str, Mapping[str, Any]], None]
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
def create_optional_inputs(
    self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:

    inputs = {}
    if source_type == "file":
        inputs["first_row_is_header"] = {
            "type": "boolean",
            "optional": True,
            "doc": "Whether the first row of the file is a header row. If not provided, kiara will try to auto-determine.",
        }

    if target_type == "database" and source_type == "table":

        inputs["table_name"] = {
            "type": "string",
            "doc": "The name of the table in the new database.",
            "default": "imported_table",
        }

    return inputs
create__database__from__tables(source_value: Value, optional: ValueMap) -> Any

Create a database value from a list of tables.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
255
256
257
258
259
260
261
262
263
264
265
def create__database__from__tables(
    self, source_value: Value, optional: ValueMap
) -> Any:
    """Create a database value from a list of tables."""

    from kiara_plugin.tabular.utils.tables import create_database_from_tables

    tables: KiaraTables = source_value.data
    db = create_database_from_tables(tables=tables)

    return db
create__database__from__table(source_value: Value, optional: ValueMap) -> Any

Create a database value from a table.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def create__database__from__table(
    self, source_value: Value, optional: ValueMap
) -> Any:
    """Create a database value from a table."""

    table_name = optional.get_value_data("table_name")
    if not table_name:
        table_name = DEFAULT_TABLE_NAME

    table: KiaraTable = source_value.data
    arrow_table = table.arrow_table

    column_map = None
    index_columns = None

    sqlite_schema = create_sqlite_schema_data_from_arrow_table(
        table=arrow_table, index_columns=index_columns, column_map=column_map
    )

    db = KiaraDatabase.create_in_temp_dir()
    db._unlock_db()
    engine = db.get_sqlalchemy_engine()

    _table = sqlite_schema.create_table(table_name=table_name, engine=engine)

    with engine.connect() as conn:

        for batch in arrow_table.to_batches(
            max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
        ):
            conn.execute(insert(_table), batch.to_pylist())
            conn.commit()

    db._lock_db()
    return db

LoadDatabaseFromDiskModule

Bases: DeserializeValueModule

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
class LoadDatabaseFromDiskModule(DeserializeValueModule):

    _module_type_name = "load.database"

    @classmethod
    def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
        return {"python_object": KiaraDatabase}

    @classmethod
    def retrieve_serialized_value_type(cls) -> str:
        return "database"

    @classmethod
    def retrieve_supported_serialization_profile(cls) -> str:
        return "copy"

    def to__python_object(self, data: SerializedData, **config: Any):

        assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

        chunks = data.get_serialized_data("db.sqlite")

        # TODO: support multiple chunks
        assert chunks.get_number_of_chunks() == 1
        files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
        assert len(files) == 1

        db_file = files[0]

        db = KiaraDatabase(db_file_path=db_file)
        return db

Functions

retrieve_supported_target_profiles() -> Mapping[str, Type] classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
308
309
310
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
    return {"python_object": KiaraDatabase}
retrieve_serialized_value_type() -> str classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
312
313
314
@classmethod
def retrieve_serialized_value_type(cls) -> str:
    return "database"
retrieve_supported_serialization_profile() -> str classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
316
317
318
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
    return "copy"
to__python_object(data: SerializedData, **config: Any)
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def to__python_object(self, data: SerializedData, **config: Any):

    assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1

    chunks = data.get_serialized_data("db.sqlite")

    # TODO: support multiple chunks
    assert chunks.get_number_of_chunks() == 1
    files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
    assert len(files) == 1

    db_file = files[0]

    db = KiaraDatabase(db_file_path=db_file)
    return db

QueryDatabaseConfig

Bases: KiaraModuleConfig

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
337
338
339
class QueryDatabaseConfig(KiaraModuleConfig):

    query: Union[str, None] = Field(description="The query.", default=None)

Attributes

query: Union[str, None] = Field(description='The query.', default=None) class-attribute instance-attribute

QueryDatabaseModule

Bases: KiaraModule

Execute a sql query against a (sqlite) database.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
class QueryDatabaseModule(KiaraModule):
    """Execute a sql query against a (sqlite) database."""

    _config_cls = QueryDatabaseConfig
    _module_type_name = "query.database"

    def create_inputs_schema(
        self,
    ) -> ValueMapSchema:

        result: Dict[str, Dict[str, Any]] = {
            "database": {"type": "database", "doc": "The database to query."}
        }

        if not self.get_config_value("query"):
            result["query"] = {"type": "string", "doc": "The query to execute."}

        return result

    def create_outputs_schema(
        self,
    ) -> ValueMapSchema:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueMap, outputs: ValueMap):

        import pyarrow as pa

        database: KiaraDatabase = inputs.get_value_data("database")
        query = self.get_config_value("query")
        if query is None:
            query = inputs.get_value_data("query")

        # TODO: make this memory efficent

        result_columns: Dict[str, List[Any]] = {}
        with database.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(query))
            for r in result:
                for k, v in dict(r).items():
                    result_columns.setdefault(k, []).append(v)

        table = pa.Table.from_pydict(result_columns)
        outputs.set_value("query_result", table)

Attributes

_config_cls = QueryDatabaseConfig class-attribute instance-attribute

Functions

create_inputs_schema() -> ValueMapSchema
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
348
349
350
351
352
353
354
355
356
357
358
359
def create_inputs_schema(
    self,
) -> ValueMapSchema:

    result: Dict[str, Dict[str, Any]] = {
        "database": {"type": "database", "doc": "The database to query."}
    }

    if not self.get_config_value("query"):
        result["query"] = {"type": "string", "doc": "The query to execute."}

    return result
create_outputs_schema() -> ValueMapSchema
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
361
362
363
364
365
def create_outputs_schema(
    self,
) -> ValueMapSchema:

    return {"query_result": {"type": "table", "doc": "The query result."}}
process(inputs: ValueMap, outputs: ValueMap)
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
def process(self, inputs: ValueMap, outputs: ValueMap):

    import pyarrow as pa

    database: KiaraDatabase = inputs.get_value_data("database")
    query = self.get_config_value("query")
    if query is None:
        query = inputs.get_value_data("query")

    # TODO: make this memory efficent

    result_columns: Dict[str, List[Any]] = {}
    with database.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(query))
        for r in result:
            for k, v in dict(r).items():
                result_columns.setdefault(k, []).append(v)

    table = pa.Table.from_pydict(result_columns)
    outputs.set_value("query_result", table)

RenderDatabaseModuleBase

Bases: RenderValueModule

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
class RenderDatabaseModuleBase(RenderValueModule):

    _module_type_name: str = None  # type: ignore

    def preprocess_database(
        self,
        value: Value,
        table_name: Union[str, None],
        input_number_of_rows: int,
        input_row_offset: int,
    ):

        database: KiaraDatabase = value.data
        table_names = database.table_names

        if not table_name:
            table_name = list(table_names)[0]

        if table_name not in table_names:
            raise Exception(
                f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
            )

        related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
            t: RenderScene.construct(
                title=t,
                description=f"Display the '{t}' table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={"table_name": t},
            )
            for t in database.table_names
        }

        query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
        result: Dict[str, List[Any]] = {}
        # TODO: this could be written much more efficient
        with database.get_sqlalchemy_engine().connect() as con:
            num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
            table_num_rows = num_rows_result.fetchone()[0]
            rs = con.execute(text(query))
            for r in rs:
                for k, v in dict(r).items():
                    result.setdefault(k, []).append(v)

        wrap = DictTabularWrap(data=result)

        row_offset = table_num_rows - input_number_of_rows
        related_scenes: Dict[str, Union[RenderScene, None]] = {}
        if row_offset > 0:

            if input_row_offset > 0:
                related_scenes["first"] = RenderScene.construct(
                    title="first",
                    description=f"Display the first {input_number_of_rows} rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": 0,
                        "number_of_rows": input_number_of_rows,
                        "table_name": table_name,
                    },
                )

                p_offset = input_row_offset - input_number_of_rows
                if p_offset < 0:
                    p_offset = 0
                previous = {
                    "row_offset": p_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
            else:
                related_scenes["first"] = None
                related_scenes["previous"] = None

            n_offset = input_row_offset + input_number_of_rows
            if n_offset < table_num_rows:
                next = {
                    "row_offset": n_offset,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                }
                related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
            else:
                related_scenes["next"] = None

            last_page = int(table_num_rows / input_number_of_rows)
            current_start = last_page * input_number_of_rows
            if (input_row_offset + input_number_of_rows) > table_num_rows:
                related_scenes["last"] = None
            else:
                related_scenes["last"] = RenderScene.construct(
                    title="last",
                    description="Display the final rows of this table.",
                    manifest_hash=self.manifest.manifest_hash,
                    render_config={
                        "row_offset": current_start,  # type: ignore
                        "number_of_rows": input_number_of_rows,  # type: ignore
                        "table_name": table_name,
                    },
                )
        related_scenes_tables[table_name].disabled = True  # type: ignore
        related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
        return wrap, related_scenes_tables

Functions

preprocess_database(value: Value, table_name: Union[str, None], input_number_of_rows: int, input_row_offset: int)
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
def preprocess_database(
    self,
    value: Value,
    table_name: Union[str, None],
    input_number_of_rows: int,
    input_row_offset: int,
):

    database: KiaraDatabase = value.data
    table_names = database.table_names

    if not table_name:
        table_name = list(table_names)[0]

    if table_name not in table_names:
        raise Exception(
            f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
        )

    related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
        t: RenderScene.construct(
            title=t,
            description=f"Display the '{t}' table.",
            manifest_hash=self.manifest.manifest_hash,
            render_config={"table_name": t},
        )
        for t in database.table_names
    }

    query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
    result: Dict[str, List[Any]] = {}
    # TODO: this could be written much more efficient
    with database.get_sqlalchemy_engine().connect() as con:
        num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
        table_num_rows = num_rows_result.fetchone()[0]
        rs = con.execute(text(query))
        for r in rs:
            for k, v in dict(r).items():
                result.setdefault(k, []).append(v)

    wrap = DictTabularWrap(data=result)

    row_offset = table_num_rows - input_number_of_rows
    related_scenes: Dict[str, Union[RenderScene, None]] = {}
    if row_offset > 0:

        if input_row_offset > 0:
            related_scenes["first"] = RenderScene.construct(
                title="first",
                description=f"Display the first {input_number_of_rows} rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": 0,
                    "number_of_rows": input_number_of_rows,
                    "table_name": table_name,
                },
            )

            p_offset = input_row_offset - input_number_of_rows
            if p_offset < 0:
                p_offset = 0
            previous = {
                "row_offset": p_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous)  # type: ignore
        else:
            related_scenes["first"] = None
            related_scenes["previous"] = None

        n_offset = input_row_offset + input_number_of_rows
        if n_offset < table_num_rows:
            next = {
                "row_offset": n_offset,
                "number_of_rows": input_number_of_rows,
                "table_name": table_name,
            }
            related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next)  # type: ignore
        else:
            related_scenes["next"] = None

        last_page = int(table_num_rows / input_number_of_rows)
        current_start = last_page * input_number_of_rows
        if (input_row_offset + input_number_of_rows) > table_num_rows:
            related_scenes["last"] = None
        else:
            related_scenes["last"] = RenderScene.construct(
                title="last",
                description="Display the final rows of this table.",
                manifest_hash=self.manifest.manifest_hash,
                render_config={
                    "row_offset": current_start,  # type: ignore
                    "number_of_rows": input_number_of_rows,  # type: ignore
                    "table_name": table_name,
                },
            )
    related_scenes_tables[table_name].disabled = True  # type: ignore
    related_scenes_tables[table_name].related_scenes = related_scenes  # type: ignore
    return wrap, related_scenes_tables

RenderDatabaseModule

Bases: RenderDatabaseModuleBase

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
class RenderDatabaseModule(RenderDatabaseModuleBase):
    _module_type_name = "render.database"

    def render__database__as__string(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_string(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_config=render_config,
            render_manifest=self.manifest.manifest_hash,
        )

    def render__database__as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ):

        input_number_of_rows = render_config.get("number_of_rows", 20)
        input_row_offset = render_config.get("row_offset", 0)

        table_name = render_config.get("table_name", None)

        wrap, data_related_scenes = self.preprocess_database(
            value=value,
            table_name=table_name,
            input_number_of_rows=input_number_of_rows,
            input_row_offset=input_row_offset,
        )
        pretty = wrap.as_terminal_renderable(max_row_height=1)

        return RenderValueResult(
            value_id=value.value_id,
            render_config=render_config,
            rendered=pretty,
            related_scenes=data_related_scenes,
            render_manifest=self.manifest.manifest_hash,
        )

Functions

render__database__as__string(value: Value, render_config: Mapping[str, Any])
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def render__database__as__string(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_string(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_config=render_config,
        render_manifest=self.manifest.manifest_hash,
    )
render__database__as__terminal_renderable(value: Value, render_config: Mapping[str, Any])
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
def render__database__as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
):

    input_number_of_rows = render_config.get("number_of_rows", 20)
    input_row_offset = render_config.get("row_offset", 0)

    table_name = render_config.get("table_name", None)

    wrap, data_related_scenes = self.preprocess_database(
        value=value,
        table_name=table_name,
        input_number_of_rows=input_number_of_rows,
        input_row_offset=input_row_offset,
    )
    pretty = wrap.as_terminal_renderable(max_row_height=1)

    return RenderValueResult(
        value_id=value.value_id,
        render_config=render_config,
        rendered=pretty,
        related_scenes=data_related_scenes,
        render_manifest=self.manifest.manifest_hash,
    )

ExportNetworkDataModule

Bases: DataExportModule

Export database values.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
class ExportNetworkDataModule(DataExportModule):
    """Export database values."""

    _module_type_name = "export.database"

    def export__database__as__sqlite_db(
        self, value: KiaraDatabase, base_path: str, name: str
    ):
        """Export network data as a sqlite database file."""

        target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
        shutil.copy2(value.db_file_path, target_path)

        return {"files": target_path}

    def export__database__as__sql_dump(
        self, value: KiaraDatabase, base_path: str, name: str
    ):
        """Export network data as a sql dump file."""

        import sqlite_utils

        db = sqlite_utils.Database(value.db_file_path)
        target_path = Path(os.path.join(base_path, f"{name}.sql"))
        with target_path.open("wt") as f:
            for line in db.conn.iterdump():
                f.write(line + "\n")

        return {"files": target_path.as_posix()}

    def export__database__as__csv_files(
        self, value: KiaraDatabase, base_path: str, name: str
    ):
        """Export network data as 2 csv files (one for edges, one for nodes."""

        import sqlite3

        files = []

        for table_name in value.table_names:
            target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
            os.makedirs(os.path.dirname(target_path), exist_ok=True)

            # copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
            con = sqlite3.connect(value.db_file_path)
            outfile = open(target_path, "wt")
            outcsv = csv.writer(outfile)

            cursor = con.execute(f"select * from {table_name}")
            # dump column titles (optional)
            outcsv.writerow(x[0] for x in cursor.description)
            # dump rows
            outcsv.writerows(cursor.fetchall())

            outfile.close()
            files.append(target_path)

        return {"files": files}

Functions

export__database__as__sqlite_db(value: KiaraDatabase, base_path: str, name: str)

Export network data as a sqlite database file.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
554
555
556
557
558
559
560
561
562
def export__database__as__sqlite_db(
    self, value: KiaraDatabase, base_path: str, name: str
):
    """Export network data as a sqlite database file."""

    target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
    shutil.copy2(value.db_file_path, target_path)

    return {"files": target_path}
export__database__as__sql_dump(value: KiaraDatabase, base_path: str, name: str)

Export network data as a sql dump file.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def export__database__as__sql_dump(
    self, value: KiaraDatabase, base_path: str, name: str
):
    """Export network data as a sql dump file."""

    import sqlite_utils

    db = sqlite_utils.Database(value.db_file_path)
    target_path = Path(os.path.join(base_path, f"{name}.sql"))
    with target_path.open("wt") as f:
        for line in db.conn.iterdump():
            f.write(line + "\n")

    return {"files": target_path.as_posix()}
export__database__as__csv_files(value: KiaraDatabase, base_path: str, name: str)

Export network data as 2 csv files (one for edges, one for nodes.

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/db/__init__.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
def export__database__as__csv_files(
    self, value: KiaraDatabase, base_path: str, name: str
):
    """Export network data as 2 csv files (one for edges, one for nodes."""

    import sqlite3

    files = []

    for table_name in value.table_names:
        target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
        os.makedirs(os.path.dirname(target_path), exist_ok=True)

        # copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
        con = sqlite3.connect(value.db_file_path)
        outfile = open(target_path, "wt")
        outcsv = csv.writer(outfile)

        cursor = con.execute(f"select * from {table_name}")
        # dump column titles (optional)
        outcsv.writerow(x[0] for x in cursor.description)
        # dump rows
        outcsv.writerows(cursor.fetchall())

        outfile.close()
        files.append(target_path)

    return {"files": files}

Functions