Skip to content

query

QueryDatabaseSQLModuleConfig (ModuleTypeConfigSchema) pydantic-model

Source code in core/database/query.py
class QueryDatabaseSQLModuleConfig(ModuleTypeConfigSchema):

    query: typing.Optional[str] = Field(
        description="The query to execute. If not specified, the user will be able to provide their own.",
        default=None,
    )

query: str pydantic-field

The query to execute. If not specified, the user will be able to provide their own.

QueryTableSQL (KiaraModule)

Execute a sql query against an (Arrow) table.

Source code in core/database/query.py
class QueryTableSQL(KiaraModule):
    """Execute a sql query against an (Arrow) table."""

    _module_type_name = "sql"
    _config_cls = QueryDatabaseSQLModuleConfig

    def create_input_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        inputs = {
            "database": {
                "type": "database",
                "doc": "The database to query",
            }
        }

        if self.get_config_value("query") is None:
            inputs["query"] = {"type": "string", "doc": "The query."}

        return inputs

    def create_output_schema(
        self,
    ) -> typing.Mapping[
        str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
    ]:

        return {"query_result": {"type": "table", "doc": "The query result."}}

    def process(self, inputs: ValueSet, outputs: ValueSet) -> None:

        import pandas as pd
        import pyarrow as pa

        if self.get_config_value("query") is None:
            _query: str = inputs.get_value_data("query")
        else:
            _query = self.get_config_value("query")

        _database: KiaraDatabase = inputs.get_value_data("database")

        # can't re-use the default engine, because pandas does not support having the 'future' flag set to 'True'
        engine = create_engine(_database.db_url)
        df = pd.read_sql(_query, con=engine)
        table = pa.Table.from_pandas(df)

        outputs.set_value("query_result", table)

create_input_schema(self)

Abstract method to implement by child classes, returns a description of the input schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[input_field_name]: { "type": "[value_type]", "doc*": "[a description of this input]", "optional*': [boolean whether this input is optional or required (defaults to 'False')] "[other_input_field_name]: { "type: ... ... }

Source code in core/database/query.py
def create_input_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    inputs = {
        "database": {
            "type": "database",
            "doc": "The database to query",
        }
    }

    if self.get_config_value("query") is None:
        inputs["query"] = {"type": "string", "doc": "The query."}

    return inputs

create_output_schema(self)

Abstract method to implement by child classes, returns a description of the output schema of this module.

If returning a dictionary of dictionaries, the format of the return value is as follows (items with '*' are optional):

{ "[output_field_name]: { "type": "[value_type]", "doc*": "[a description of this output]" "[other_input_field_name]: { "type: ... ... }

Source code in core/database/query.py
def create_output_schema(
    self,
) -> typing.Mapping[
    str, typing.Union[ValueSchema, typing.Mapping[str, typing.Any]]
]:

    return {"query_result": {"type": "table", "doc": "The query result."}}