20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 | class TablesType(AnyType[KiaraTables, DataTypeConfig]):
"""Multiple tabular data sets.
The data is organized in sets of tables (which are sets of columns), each table having a string identifier.
This is similar to the 'database' data type, the main difference being that 'database' is backed by sqlite, whereas 'tables' is backed by Apache Feather/Arrow. There is no hard rule when it's better to use which, but in general, if you need to access the datasets on a row-basis, 'database' is the better fit, for more column-based analytical queries, 'tables' is better.
"""
_data_type_name = "tables"
@classmethod
def python_class(cls) -> Type:
return KiaraTables
def parse_python_obj(self, data: Any) -> KiaraTables:
return KiaraTables.create_tables(data)
def _validate(cls, value: Any) -> None:
if not isinstance(value, KiaraTables):
raise Exception(
f"invalid type '{type(value).__name__}', must be 'KiaraTables'."
)
def serialize(self, data: KiaraTables) -> Union[None, str, "SerializedData"]:
import pyarrow as pa
for table_id, table in data.tables.items():
if not table_id:
raise Exception("table id must not be empty.")
if TABLE_COLUMN_SPLIT_MARKER in table_id:
raise Exception(
f"table id must not contain '{TABLE_COLUMN_SPLIT_MARKER}"
)
temp_f = tempfile.mkdtemp()
def cleanup():
shutil.rmtree(temp_f, ignore_errors=True)
atexit.register(cleanup)
chunk_map = {}
for table_id, table in data.tables.items():
arrow_table = table.arrow_table
for column_name in arrow_table.column_names:
if not column_name:
raise Exception(
f"column name for table '{table_id}' is empty. This is not allowed."
)
column: pa.Array = arrow_table.column(column_name)
file_name = os.path.join(temp_f, column_name)
store_array(
array_obj=column, file_name=file_name, column_name=column_name
)
chunk_map[f"{table_id}{TABLE_COLUMN_SPLIT_MARKER}{column_name}"] = {
"type": "file",
"file": file_name,
"codec": "raw",
}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunk_map,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.tables",
"module_config": {
"value_type": "tables",
"target_profile": "python_object",
"serialization_profile": "feather",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
def pretty_print_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
tables: KiaraTables = value.data
result: List[Any] = [""]
for table_name in tables.table_names:
atw = ArrowTabularWrap(tables.get_table(table_name).arrow_table)
pretty = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
result.append(pretty)
return Group(*result)
|