11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143 | class TableFiltersModule(FilterModule):
_module_type_name = "table.filters"
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
return "table"
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
if filter_name in ["select_columns", "drop_columns"]:
return {
"columns": {
"type": "list",
"doc": "The name of the columns to include.",
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore invalid column names.",
"default": True,
},
}
elif filter_name == "select_rows":
return {
"match": {
"type": "string",
"doc": "The string token to match.",
"optional": True,
},
"case_insensitive": {
"type": "boolean",
"doc": "Whether to ignore case.",
"default": True,
},
}
return None
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names_to_ignore = filter_inputs["columns"]
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
_column_names = []
_columns = []
for column_name in arrow_table.column_names:
if column_name in column_names_to_ignore:
continue
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):
match = filter_inputs.get("match", None)
if not match:
return value
case_insensitive = filter_inputs.get("case_insensitive", True)
import duckdb
_table: KiaraTable = value.data
rel_from_arrow = duckdb.arrow(_table.arrow_table)
if case_insensitive:
# query_tokens = [f"LOWER({c}) GLOB LOWER('{match}')" for c in rel_from_arrow.columns]
query_tokens = [
f"regexp_matches(LOWER({c}), LOWER('{match}'))"
for c in rel_from_arrow.columns
]
else:
query_tokens = [
f"regexp_matches({c}, '{match}')" for c in rel_from_arrow.columns
]
query = " OR ".join(query_tokens)
result = rel_from_arrow.filter(query)
return result.arrow()
|