Bases: KiaraModel
A wrapper class, containing multiple tables.
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/tables.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | class KiaraTables(KiaraModel):
"""A wrapper class, containing multiple tables."""
@classmethod
def create_tables(cls, data: Any) -> "KiaraTables":
if isinstance(data, KiaraTables):
return data
table_obj: Union[None, Dict[str, KiaraTable]] = None
if isinstance(data, Mapping):
temp = {}
for k, v in data.items():
temp[k] = KiaraTable.create_table(v)
table_obj = temp
elif isinstance(data, (pa.Table)):
table_obj = {DEFAULT_TABLE_NAME: data}
if table_obj is None:
raise Exception(
f"Can't create tables, invalid source data type: {type(data)}."
)
obj = cls(tables=table_obj)
return obj
tables: Dict[str, KiaraTable] = Field(
description="A dictionary of tables, with the table names as keys."
)
@property
def table_names(self) -> List[str]:
return list(self.tables.keys())
def _retrieve_data_to_hash(self) -> Any:
raise NotImplementedError()
def get_table(self, table_name: str) -> KiaraTable:
if table_name not in self.tables:
raise KiaraException(
f"Table '{table_name}' not found. Available: {', '.join(self.tables.keys())}"
)
return self.tables[table_name]
|
Attributes
tables: Dict[str, KiaraTable] = Field(description='A dictionary of tables, with the table names as keys.')
class-attribute
instance-attribute
table_names: List[str]
property
Functions
create_tables(data: Any) -> KiaraTables
classmethod
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/tables.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43 | @classmethod
def create_tables(cls, data: Any) -> "KiaraTables":
if isinstance(data, KiaraTables):
return data
table_obj: Union[None, Dict[str, KiaraTable]] = None
if isinstance(data, Mapping):
temp = {}
for k, v in data.items():
temp[k] = KiaraTable.create_table(v)
table_obj = temp
elif isinstance(data, (pa.Table)):
table_obj = {DEFAULT_TABLE_NAME: data}
if table_obj is None:
raise Exception(
f"Can't create tables, invalid source data type: {type(data)}."
)
obj = cls(tables=table_obj)
return obj
|
get_table(table_name: str) -> KiaraTable
Source code in /opt/hostedtoolcache/Python/3.11.4/x64/lib/python3.11/site-packages/kiara_plugin/tabular/models/tables.py
| def get_table(self, table_name: str) -> KiaraTable:
if table_name not in self.tables:
raise KiaraException(
f"Table '{table_name}' not found. Available: {', '.join(self.tables.keys())}"
)
return self.tables[table_name]
|