Skip to content

filters

Classes

TableFiltersModule

Bases: FilterModule

Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
class TableFiltersModule(FilterModule):

    _module_type_name = "table.filters"

    @classmethod
    def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

        return "table"

    def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

        if filter_name in ["select_columns", "drop_columns"]:

            return {
                "columns": {
                    "type": "list",
                    "doc": "The name of the columns to include.",
                    "optional": True,
                },
                "ignore_invalid_column_names": {
                    "type": "boolean",
                    "doc": "Whether to ignore invalid column names.",
                    "default": True,
                },
            }
        elif filter_name == "select_rows":
            return {
                "match": {
                    "type": "string",
                    "doc": "The string token to match.",
                    "optional": True,
                },
                "case_insensitive": {
                    "type": "boolean",
                    "doc": "Whether to ignore case.",
                    "default": True,
                },
            }

        return None

    def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names = filter_inputs["columns"]

        if not column_names:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table
        _column_names = []
        _columns = []

        for column_name in column_names:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

        import pyarrow as pa

        ignore_invalid = filter_inputs["ignore_invalid_column_names"]
        column_names_to_ignore = filter_inputs["columns"]

        if not column_names_to_ignore:
            return value

        table: KiaraTable = value.data
        arrow_table = table.arrow_table

        for column_name in column_names_to_ignore:
            if column_name not in arrow_table.column_names:
                if ignore_invalid:
                    continue
                else:
                    raise KiaraProcessingException(
                        f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                    )

        _column_names = []
        _columns = []
        for column_name in arrow_table.column_names:

            if column_name in column_names_to_ignore:
                continue

            column = arrow_table.column(column_name)
            _column_names.append(column_name)
            _columns.append(column)

        return pa.table(data=_columns, names=_column_names)

    def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

        match = filter_inputs.get("match", None)
        if not match:
            return value

        case_insensitive = filter_inputs.get("case_insensitive", True)

        import duckdb

        _table: KiaraTable = value.data
        rel_from_arrow = duckdb.arrow(_table.arrow_table)

        if case_insensitive:
            # query_tokens = [f"LOWER({c}) GLOB LOWER('{match}')" for c in rel_from_arrow.columns]
            query_tokens = [
                f"regexp_matches(LOWER({c}), LOWER('{match}'))"
                for c in rel_from_arrow.columns
            ]
        else:
            query_tokens = [
                f"regexp_matches({c}, '{match}')" for c in rel_from_arrow.columns
            ]
        query = " OR ".join(query_tokens)

        result = rel_from_arrow.filter(query)
        return result.arrow()

Functions

retrieve_supported_type() -> Union[Dict[str, Any], str] classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
15
16
17
18
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:

    return "table"
create_filter_inputs(filter_name: str) -> Union[None, ValueMapSchema]
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:

    if filter_name in ["select_columns", "drop_columns"]:

        return {
            "columns": {
                "type": "list",
                "doc": "The name of the columns to include.",
                "optional": True,
            },
            "ignore_invalid_column_names": {
                "type": "boolean",
                "doc": "Whether to ignore invalid column names.",
                "default": True,
            },
        }
    elif filter_name == "select_rows":
        return {
            "match": {
                "type": "string",
                "doc": "The string token to match.",
                "optional": True,
            },
            "case_insensitive": {
                "type": "boolean",
                "doc": "Whether to ignore case.",
                "default": True,
            },
        }

    return None
filter__select_columns(value: Value, filter_inputs: Mapping[str, Any])
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names = filter_inputs["columns"]

    if not column_names:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table
    _column_names = []
    _columns = []

    for column_name in column_names:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__drop_columns(value: Value, filter_inputs: Mapping[str, Any])
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):

    import pyarrow as pa

    ignore_invalid = filter_inputs["ignore_invalid_column_names"]
    column_names_to_ignore = filter_inputs["columns"]

    if not column_names_to_ignore:
        return value

    table: KiaraTable = value.data
    arrow_table = table.arrow_table

    for column_name in column_names_to_ignore:
        if column_name not in arrow_table.column_names:
            if ignore_invalid:
                continue
            else:
                raise KiaraProcessingException(
                    f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
                )

    _column_names = []
    _columns = []
    for column_name in arrow_table.column_names:

        if column_name in column_names_to_ignore:
            continue

        column = arrow_table.column(column_name)
        _column_names.append(column_name)
        _columns.append(column)

    return pa.table(data=_columns, names=_column_names)
filter__select_rows(value: Value, filter_inputs: Mapping[str, Any])
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/modules/table/filters.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):

    match = filter_inputs.get("match", None)
    if not match:
        return value

    case_insensitive = filter_inputs.get("case_insensitive", True)

    import duckdb

    _table: KiaraTable = value.data
    rel_from_arrow = duckdb.arrow(_table.arrow_table)

    if case_insensitive:
        # query_tokens = [f"LOWER({c}) GLOB LOWER('{match}')" for c in rel_from_arrow.columns]
        query_tokens = [
            f"regexp_matches(LOWER({c}), LOWER('{match}'))"
            for c in rel_from_arrow.columns
        ]
    else:
        query_tokens = [
            f"regexp_matches({c}, '{match}')" for c in rel_from_arrow.columns
        ]
    query = " OR ".join(query_tokens)

    result = rel_from_arrow.filter(query)
    return result.arrow()