Top-level package for kiara_plugin.tabular.
KIARA_METADATA
¶
find_data_types: Union[Type, Tuple, Callable]
¶
find_model_classes: Union[Type, Tuple, Callable]
¶
find_modules: Union[Type, Tuple, Callable]
¶
find_pipelines: Union[Type, Tuple, Callable]
¶
get_version()
¶
Source code in tabular/__init__.py
def get_version():
from pkg_resources import DistributionNotFound, get_distribution
try:
# Change here if project is renamed and does not equal the package name
dist_name = __name__
__version__ = get_distribution(dist_name).version
except DistributionNotFound:
try:
version_file = os.path.join(os.path.dirname(__file__), "version.txt")
if os.path.exists(version_file):
with open(version_file, encoding="utf-8") as vf:
__version__ = vf.read()
else:
__version__ = "unknown"
except (Exception):
pass
if __version__ is None:
__version__ = "unknown"
return __version__
Modules¶
data_types
special
¶
This module contains the value type classes that are used in the kiara_plugin.tabular
package.
Modules¶
array
¶
Classes¶
ArrayType (AnyType)
¶An array, in most cases used as a column within a table.
Internally, this type uses the KiaraArray wrapper class to manage array data. This wrapper class, in turn, uses an Apache Arrow Array to store the data in memory (and on disk).
Source code in tabular/data_types/array.py
class ArrayType(AnyType[KiaraArray, DataTypeConfig]):
"""An array, in most cases used as a column within a table.
Internally, this type uses the [KiaraArray][kiara_plugin.tabular.models.array.KiaraArray] wrapper class to manage array data. This wrapper class, in turn, uses an [Apache Arrow](https://arrow.apache.org) [Array](https://arrow.apache.org/docs/python/generated/pyarrow.Array.html#pyarrow.Array) to store the data in memory (and on disk).
"""
_data_type_name = "array"
@classmethod
def python_class(cls) -> Type:
return KiaraArray
def parse_python_obj(self, data: Any) -> KiaraArray:
return KiaraArray.create_array(data)
def _validate(cls, value: Any) -> None:
if not isinstance(value, (KiaraArray)):
raise Exception(
f"Invalid type '{type(value).__name__}', must be an instance of the 'KiaraArray' class."
)
def serialize(self, data: KiaraArray) -> SerializedData:
import pyarrow as pa
# TODO: make sure temp dir is in the same partition as file store
temp_f = tempfile.mkdtemp()
def cleanup():
shutil.rmtree(temp_f, ignore_errors=True)
atexit.register(cleanup)
column: pa.Array = data.arrow_array
file_name = os.path.join(temp_f, "array.arrow")
store_array(array_obj=column, file_name=file_name, column_name="array")
chunks = {"array.arrow": {"type": "file", "codec": "raw", "file": file_name}}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunks,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.array",
"module_config": {
"value_type": "array",
"target_profile": "python_object",
"serialization_profile": "feather",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
def pretty_print_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
import pyarrow as pa
array: pa.Array = value.data.arrow_array
temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
atw = ArrowTabularWrap(temp_table)
result = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
show_table_header=False,
)
return result
parse_python_obj(self, data)
¶Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
the value |
required |
Returns:
Type | Description |
---|---|
KiaraArray |
'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object |
Source code in tabular/data_types/array.py
def parse_python_obj(self, data: Any) -> KiaraArray:
return KiaraArray.create_array(data)
pretty_print_as__terminal_renderable(self, value, render_config)
¶Source code in tabular/data_types/array.py
def pretty_print_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
import pyarrow as pa
array: pa.Array = value.data.arrow_array
temp_table = pa.Table.from_arrays(arrays=[array], names=["array"])
atw = ArrowTabularWrap(temp_table)
result = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
show_table_header=False,
)
return result
python_class()
classmethod
¶Source code in tabular/data_types/array.py
@classmethod
def python_class(cls) -> Type:
return KiaraArray
serialize(self, data)
¶Source code in tabular/data_types/array.py
def serialize(self, data: KiaraArray) -> SerializedData:
import pyarrow as pa
# TODO: make sure temp dir is in the same partition as file store
temp_f = tempfile.mkdtemp()
def cleanup():
shutil.rmtree(temp_f, ignore_errors=True)
atexit.register(cleanup)
column: pa.Array = data.arrow_array
file_name = os.path.join(temp_f, "array.arrow")
store_array(array_obj=column, file_name=file_name, column_name="array")
chunks = {"array.arrow": {"type": "file", "codec": "raw", "file": file_name}}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunks,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.array",
"module_config": {
"value_type": "array",
"target_profile": "python_object",
"serialization_profile": "feather",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
Functions¶
store_array(array_obj, file_name, column_name='array')
¶Utility methdo to stora an array to a file.
Source code in tabular/data_types/array.py
def store_array(array_obj: "pa.Array", file_name: str, column_name: "str" = "array"):
"""Utility methdo to stora an array to a file."""
import pyarrow as pa
from pyarrow import ChunkedArray
schema = pa.schema([pa.field(column_name, array_obj.type)])
# TODO: support non-single chunk columns
with pa.OSFile(file_name, "wb") as sink:
with pa.ipc.new_file(sink, schema=schema) as writer:
if isinstance(array_obj, ChunkedArray):
for chunk in array_obj.chunks:
batch = pa.record_batch([chunk], schema=schema)
writer.write(batch)
else:
raise NotImplementedError()
db
¶
Classes¶
DatabaseType (AnyType)
¶A database, containing one or several tables.
This is backed by the KiaraDatabase class to manage the stored data.
Source code in tabular/data_types/db.py
class DatabaseType(AnyType[KiaraDatabase, DataTypeConfig]):
"""A database, containing one or several tables.
This is backed by the [KiaraDatabase][kiara_plugin.tabular.models.db.KiaraDatabase] class to manage
the stored data.
"""
_data_type_name = "database"
@classmethod
def python_class(self) -> Type[KiaraDatabase]:
return KiaraDatabase
def parse_python_obj(self, data: Any) -> KiaraDatabase:
if isinstance(data, Path):
data = data.as_posix()
if isinstance(data, str):
if not os.path.exists(data):
raise ValueError(
f"Can't create database from path '{data}': path does not exist."
)
return KiaraDatabase(db_file_path=data)
return data
def _validate(cls, value: Any) -> None:
if not isinstance(value, (KiaraDatabase)):
raise ValueError(
f"Invalid type '{type(value).__name__}', must be an instance of the 'KiaraDatabase' class."
)
def serialize(self, data: KiaraDatabase) -> SerializedData:
chunks = {
"db.sqlite": {"type": "file", "codec": "raw", "file": data.db_file_path}
}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunks,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.database",
"module_config": {
"value_type": self.data_type_name,
"target_profile": "python_object",
"serialization_profile": "copy",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
def pretty_print_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
db: KiaraDatabase = value.data
result: List[Any] = [""]
for table_name in db.table_names:
atw = SqliteTabularWrap(
engine=db.get_sqlalchemy_engine(), table_name=table_name
)
pretty = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
result.append(pretty)
return Group(*result)
parse_python_obj(self, data)
¶Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
the value |
required |
Returns:
Type | Description |
---|---|
KiaraDatabase |
'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object |
Source code in tabular/data_types/db.py
def parse_python_obj(self, data: Any) -> KiaraDatabase:
if isinstance(data, Path):
data = data.as_posix()
if isinstance(data, str):
if not os.path.exists(data):
raise ValueError(
f"Can't create database from path '{data}': path does not exist."
)
return KiaraDatabase(db_file_path=data)
return data
pretty_print_as__terminal_renderable(self, value, render_config)
¶Source code in tabular/data_types/db.py
def pretty_print_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
db: KiaraDatabase = value.data
result: List[Any] = [""]
for table_name in db.table_names:
atw = SqliteTabularWrap(
engine=db.get_sqlalchemy_engine(), table_name=table_name
)
pretty = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{table_name}[/i]")
result.append(pretty)
return Group(*result)
python_class()
classmethod
¶Source code in tabular/data_types/db.py
@classmethod
def python_class(self) -> Type[KiaraDatabase]:
return KiaraDatabase
serialize(self, data)
¶Source code in tabular/data_types/db.py
def serialize(self, data: KiaraDatabase) -> SerializedData:
chunks = {
"db.sqlite": {"type": "file", "codec": "raw", "file": data.db_file_path}
}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunks,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.database",
"module_config": {
"value_type": self.data_type_name,
"target_profile": "python_object",
"serialization_profile": "copy",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
SqliteTabularWrap (TabularWrap)
¶Source code in tabular/data_types/db.py
class SqliteTabularWrap(TabularWrap):
def __init__(self, engine: "Engine", table_name: str):
self._engine: Engine = engine
self._table_name: str = table_name
super().__init__()
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._engine.connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
def retrieve_column_names(self) -> Iterable[str]:
from sqlalchemy import inspect
engine = self._engine
inspector = inspect(engine)
columns = inspector.get_columns(self._table_name)
result = [column["name"] for column in columns]
return result
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
def to_pydict(self) -> Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
retrieve_column_names(self)
¶Source code in tabular/data_types/db.py
def retrieve_column_names(self) -> Iterable[str]:
from sqlalchemy import inspect
engine = self._engine
inspector = inspect(engine)
columns = inspector.get_columns(self._table_name)
result = [column["name"] for column in columns]
return result
retrieve_number_of_rows(self)
¶Source code in tabular/data_types/db.py
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._engine.connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
slice(self, offset=0, length=None)
¶Source code in tabular/data_types/db.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
to_pydict(self)
¶Source code in tabular/data_types/db.py
def to_pydict(self) -> Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
table
¶
Classes¶
TableType (AnyType)
¶Tabular data (table, spreadsheet, data_frame, what have you).
The table data is organized in sets of columns (arrays of data of the same type), with each column having a string identifier.
kiara uses an instance of the KiaraTable
class to manage the table data, which let's developers access it in different formats (Apache Arrow Table, Pandas dataframe, Python dict of lists, more to follow...).
Please consult the API doc of the KiaraTable
class for more information about how to access and query the data:
Internally, the data is stored in Apache Feather format -- both in memory and on disk when saved, which enables some advanced usage to preserve memory and compute overhead.
Source code in tabular/data_types/table.py
class TableType(AnyType[KiaraTable, DataTypeConfig]):
"""Tabular data (table, spreadsheet, data_frame, what have you).
The table data is organized in sets of columns (arrays of data of the same type), with each column having a string identifier.
*kiara* uses an instance of the [`KiaraTable`][kiara_plugin.tabular.models.table.KiaraTable]
class to manage the table data, which let's developers access it in different formats ([Apache Arrow Table](https://arrow.apache.org/docs/python/generated/pyarrow.Table.html), [Pandas dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), Python dict of lists, more to follow...).
Please consult the API doc of the `KiaraTable` class for more information about how to access and query the data:
- [`KiaraTable` API doc](https://dharpa.org/kiara_plugin.tabular/latest/reference/kiara_plugin/tabular/models/__init__/#kiara_plugin.tabular.models.table.KiaraTable)
Internally, the data is stored in [Apache Feather format](https://arrow.apache.org/docs/python/feather.html) -- both
in memory and on disk when saved, which enables some advanced usage to preserve memory and compute overhead.
"""
_data_type_name = "table"
@classmethod
def python_class(cls) -> Type:
return KiaraTable
def parse_python_obj(self, data: Any) -> KiaraTable:
return KiaraTable.create_table(data)
# def calculate_hash(self, data: KiaraTable) -> CID:
# hashes = []
# for column_name in data.arrow_table.column_names:
# hashes.append(column_name)
# column = data.arrow_table.column(column_name)
# for chunk in column.chunks:
# for buf in chunk.buffers():
# if not buf:
# continue
# h = hash_from_buffer(memoryview(buf))
# hashes.append(h)
# return compute_cid(hashes)
# return KIARA_HASH_FUNCTION(memoryview(data.arrow_array))
# def calculate_size(self, data: KiaraTable) -> int:
# return len(data.arrow_table)
def _validate(cls, value: Any) -> None:
pass
if not isinstance(value, KiaraTable):
raise Exception(
f"invalid type '{type(value).__name__}', must be 'KiaraTable'."
)
def serialize(self, data: KiaraTable) -> SerializedData:
import pyarrow as pa
chunk_map = {}
# TODO: make sure temp dir is in the same partition as file store
temp_f = tempfile.mkdtemp()
def cleanup():
shutil.rmtree(temp_f, ignore_errors=True)
atexit.register(cleanup)
for column_name in data.arrow_table.column_names:
column: pa.Array = data.arrow_table.column(column_name)
if column_name == "":
file_name = os.path.join(temp_f, EMPTY_COLUMN_NAME_MARKER)
else:
file_name = os.path.join(temp_f, column_name)
store_array(array_obj=column, file_name=file_name, column_name=column_name)
chunk_map[column_name] = {"type": "file", "file": file_name, "codec": "raw"}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunk_map,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.table",
"module_config": {
"value_type": "table",
"target_profile": "python_object",
"serialization_profile": "feather",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
def pretty_print_as__terminal_renderable(
self, value: "Value", render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
atw = ArrowTabularWrap(value.data.arrow_table)
result = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
return result
parse_python_obj(self, data)
¶Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
the value |
required |
Returns:
Type | Description |
---|---|
KiaraTable |
'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object |
Source code in tabular/data_types/table.py
def parse_python_obj(self, data: Any) -> KiaraTable:
return KiaraTable.create_table(data)
pretty_print_as__terminal_renderable(self, value, render_config)
¶Source code in tabular/data_types/table.py
def pretty_print_as__terminal_renderable(
self, value: "Value", render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Union[int, None] = None
if max_rows:
half_lines = int(max_rows / 2)
atw = ArrowTabularWrap(value.data.arrow_table)
result = atw.as_terminal_renderable(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
return result
python_class()
classmethod
¶Source code in tabular/data_types/table.py
@classmethod
def python_class(cls) -> Type:
return KiaraTable
serialize(self, data)
¶Source code in tabular/data_types/table.py
def serialize(self, data: KiaraTable) -> SerializedData:
import pyarrow as pa
chunk_map = {}
# TODO: make sure temp dir is in the same partition as file store
temp_f = tempfile.mkdtemp()
def cleanup():
shutil.rmtree(temp_f, ignore_errors=True)
atexit.register(cleanup)
for column_name in data.arrow_table.column_names:
column: pa.Array = data.arrow_table.column(column_name)
if column_name == "":
file_name = os.path.join(temp_f, EMPTY_COLUMN_NAME_MARKER)
else:
file_name = os.path.join(temp_f, column_name)
store_array(array_obj=column, file_name=file_name, column_name=column_name)
chunk_map[column_name] = {"type": "file", "file": file_name, "codec": "raw"}
serialized_data = {
"data_type": self.data_type_name,
"data_type_config": self.type_config.dict(),
"data": chunk_map,
"serialization_profile": "feather",
"metadata": {
"environment": {},
"deserialize": {
"python_object": {
"module_type": "load.table",
"module_config": {
"value_type": "table",
"target_profile": "python_object",
"serialization_profile": "feather",
},
}
},
},
}
serialized = SerializationResult(**serialized_data)
return serialized
defaults
¶
Attributes¶
DEFAULT_TABULAR_DATA_CHUNK_SIZE
¶
KIARA_PLUGIN_TABULAR_BASE_FOLDER
¶
Marker to indicate the base folder for the kiara network module package.
KIARA_PLUGIN_TABULAR_RESOURCES_FOLDER
¶
Default resources folder for this package.
RESERVED_SQL_KEYWORDS
¶
SQLALCHEMY_SQLITE_TYPE_MAP: Dict[Type, Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB']]
¶
SQLITE_DATA_TYPE: Tuple[Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB'], ...]
¶
SQLITE_SQLALCHEMY_TYPE_MAP: Dict[Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB'], Type]
¶
SqliteDataType
¶
TEMPLATES_FOLDER
¶
models
special
¶
This module contains the metadata (and other) models that are used in the kiara_plugin.tabular
package.
Those models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata -- but also other type of models -- that is attached to data, as well as kiara modules.
Metadata models must be a sub-class of [kiara.metadata.MetadataModel][]. Other models usually sub-class a pydantic BaseModel or implement custom base classes.
Classes¶
ColumnSchema (BaseModel)
pydantic-model
¶
Describes properties of a single column of the 'table' data type.
Source code in tabular/models/__init__.py
class ColumnSchema(BaseModel):
"""Describes properties of a single column of the 'table' data type."""
type_name: str = Field(
description="The type name of the column (backend-specific)."
)
metadata: Dict[str, Any] = Field(
description="Other metadata for the column.", default_factory=dict
)
TableMetadata (KiaraModel)
pydantic-model
¶
Describes properties for the 'table' data type.
Source code in tabular/models/__init__.py
class TableMetadata(KiaraModel):
"""Describes properties for the 'table' data type."""
column_names: List[str] = Field(description="The name of the columns of the table.")
column_schema: Dict[str, ColumnSchema] = Field(
description="The schema description of the table."
)
rows: int = Field(description="The number of rows the table contains.")
size: Union[int, None] = Field(
description="The tables size in bytes.", default=None
)
def _retrieve_data_to_hash(self) -> Any:
return {
"column_schemas": {k: v.dict() for k, v in self.column_schema.items()},
"rows": self.rows,
"size": self.size,
}
Attributes¶
column_names: List[str]
pydantic-field
required
¶The name of the columns of the table.
column_schema: Dict[str, kiara_plugin.tabular.models.ColumnSchema]
pydantic-field
required
¶The schema description of the table.
rows: int
pydantic-field
required
¶The number of rows the table contains.
size: int
pydantic-field
¶The tables size in bytes.
Modules¶
array
¶
Classes¶
KiaraArray (KiaraModel)
pydantic-model
¶A class to manage array-like data.
Internally, this uses an Apache Arrow Array to handle the data in memory and on disk.
Source code in tabular/models/array.py
class KiaraArray(KiaraModel):
"""A class to manage array-like data.
Internally, this uses an [Apache Arrow Array](https://arrow.apache.org/docs/python/generated/pyarrow.Array.html#pyarrow.Array) to handle the data in memory and on disk.
"""
# @classmethod
# def create_in_temp_dir(cls, ):
#
# temp_f = tempfile.mkdtemp()
# file_path = os.path.join(temp_f, "array.feather")
#
# def cleanup():
# shutil.rmtree(file_path, ignore_errors=True)
#
# atexit.register(cleanup)
#
# array_obj = cls(feather_path=file_path)
# return array_obj
@classmethod
def create_array(cls, data: Any) -> "KiaraArray":
if isinstance(data, KiaraArray):
return data
array_obj = None
if isinstance(data, (pa.Array, pa.ChunkedArray)):
array_obj = data
elif isinstance(data, pa.Table):
if len(data.columns) != 1:
raise Exception(
f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(data.columns)} columns."
)
array_obj = data.column(0)
else:
try:
array_obj = pa.array(data)
except Exception:
pass
if array_obj is None:
raise Exception(
f"Can't create table, invalid source data type: {type(data)}."
)
obj = KiaraArray()
if not isinstance(array_obj, pa.lib.ChunkedArray):
array_obj = pa.chunked_array(array_obj)
obj._array_obj = array_obj
return obj
data_path: Union[str, None] = Field(
description="The path to the (feather) file backing this array.", default=None
)
_array_obj: pa.Array = PrivateAttr(default=None)
def _retrieve_data_to_hash(self) -> Any:
raise NotImplementedError()
def __len__(self):
return len(self.arrow_array)
@property
def arrow_array(self) -> pa.Array:
if self._array_obj is not None:
return self._array_obj
if not self.data_path:
raise Exception("Can't retrieve array data, object not initialized (yet).")
with pa.memory_map(self.data_path, "r") as source:
table: pa.Table = pa.ipc.open_file(source).read_all()
if len(table.columns) != 1:
raise Exception(
f"Invalid serialized array data, only a single-column Table is allowed. This value is a table with {len(table.columns)} columns."
)
self._array_obj = table.column(0)
return self._array_obj
def to_pylist(self):
return self.arrow_array.to_pylist()
def to_pandas(self):
return self.arrow_array.to_pandas()
arrow_array: Array
property
readonly
¶data_path: str
pydantic-field
¶The path to the (feather) file backing this array.
create_array(data)
classmethod
¶Source code in tabular/models/array.py
@classmethod
def create_array(cls, data: Any) -> "KiaraArray":
if isinstance(data, KiaraArray):
return data
array_obj = None
if isinstance(data, (pa.Array, pa.ChunkedArray)):
array_obj = data
elif isinstance(data, pa.Table):
if len(data.columns) != 1:
raise Exception(
f"Invalid type, only Arrow Arrays or single-column Tables allowed. This value is a table with {len(data.columns)} columns."
)
array_obj = data.column(0)
else:
try:
array_obj = pa.array(data)
except Exception:
pass
if array_obj is None:
raise Exception(
f"Can't create table, invalid source data type: {type(data)}."
)
obj = KiaraArray()
if not isinstance(array_obj, pa.lib.ChunkedArray):
array_obj = pa.chunked_array(array_obj)
obj._array_obj = array_obj
return obj
to_pandas(self)
¶Source code in tabular/models/array.py
def to_pandas(self):
return self.arrow_array.to_pandas()
to_pylist(self)
¶Source code in tabular/models/array.py
def to_pylist(self):
return self.arrow_array.to_pylist()
db
¶
Classes¶
DatabaseMetadata (ValueMetadata)
pydantic-model
¶Database and table properties.
Source code in tabular/models/db.py
class DatabaseMetadata(ValueMetadata):
"""Database and table properties."""
_metadata_key = "database"
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["database"]
@classmethod
def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":
database: KiaraDatabase = value.data
insp = database.get_sqlalchemy_inspector()
mds = {}
for table_name in insp.get_table_names():
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(f"SELECT count(*) from {table_name}"))
num_rows = result.fetchone()[0]
try:
result = con.execute(
text(
f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
)
)
size: Union[int, None] = result.fetchone()[0]
except Exception:
size = None
columns = {}
for column in insp.get_columns(table_name=table_name):
name = column["name"]
_type = column["type"]
type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
columns[name] = {
"type_name": type_name,
"metadata": {
"nullable": column["nullable"],
"primary_key": True if column["primary_key"] else False,
},
}
schema = {
"column_names": list(columns.keys()),
"column_schema": columns,
"rows": num_rows,
"size": size,
}
md = TableMetadata(**schema)
mds[table_name] = md
return DatabaseMetadata.construct(tables=mds)
tables: Dict[str, TableMetadata] = Field(description="The table schema.")
tables: Dict[str, kiara_plugin.tabular.models.TableMetadata]
pydantic-field
required
¶The table schema.
create_value_metadata(value)
classmethod
¶Source code in tabular/models/db.py
@classmethod
def create_value_metadata(cls, value: Value) -> "DatabaseMetadata":
database: KiaraDatabase = value.data
insp = database.get_sqlalchemy_inspector()
mds = {}
for table_name in insp.get_table_names():
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(f"SELECT count(*) from {table_name}"))
num_rows = result.fetchone()[0]
try:
result = con.execute(
text(
f'SELECT SUM("pgsize") FROM "dbstat" WHERE name="{table_name}"'
)
)
size: Union[int, None] = result.fetchone()[0]
except Exception:
size = None
columns = {}
for column in insp.get_columns(table_name=table_name):
name = column["name"]
_type = column["type"]
type_name = SQLALCHEMY_SQLITE_TYPE_MAP[type(_type)]
columns[name] = {
"type_name": type_name,
"metadata": {
"nullable": column["nullable"],
"primary_key": True if column["primary_key"] else False,
},
}
schema = {
"column_names": list(columns.keys()),
"column_schema": columns,
"rows": num_rows,
"size": size,
}
md = TableMetadata(**schema)
mds[table_name] = md
return DatabaseMetadata.construct(tables=mds)
retrieve_supported_data_types()
classmethod
¶Source code in tabular/models/db.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["database"]
KiaraDatabase (KiaraModel)
pydantic-model
¶A wrapper class to manage a sqlite database.
Source code in tabular/models/db.py
class KiaraDatabase(KiaraModel):
"""A wrapper class to manage a sqlite database."""
@classmethod
def create_in_temp_dir(
cls,
init_statement: Union[None, str, "TextClause"] = None,
init_data: Union[Mapping[str, Any], None] = None,
):
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = cls(db_file_path=db_path)
db.create_if_not_exists()
if init_statement:
db._unlock_db()
db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
db._lock_db()
return db
db_file_path: str = Field(description="The path to the sqlite database file.")
_cached_engine = PrivateAttr(default=None)
_cached_inspector = PrivateAttr(default=None)
_table_names = PrivateAttr(default=None)
_tables: Dict[str, Table] = PrivateAttr(default_factory=dict)
_metadata_obj: Union[MetaData, None] = PrivateAttr(default=None)
# _table_schemas: Optional[Dict[str, SqliteTableSchema]] = PrivateAttr(default=None)
# _file_hash: Optional[str] = PrivateAttr(default=None)
_file_cid: Union[CID, None] = PrivateAttr(default=None)
_lock: bool = PrivateAttr(default=True)
_immutable: bool = PrivateAttr(default=None)
def _retrieve_id(self) -> str:
return str(self.file_cid)
def _retrieve_data_to_hash(self) -> Any:
return self.file_cid
@validator("db_file_path", allow_reuse=True)
def ensure_absolute_path(cls, path: str):
path = os.path.abspath(path)
if not os.path.exists(os.path.dirname(path)):
raise ValueError(f"Parent folder for database file does not exist: {path}")
return path
@property
def db_url(self) -> str:
return f"sqlite:///{self.db_file_path}"
@property
def file_cid(self) -> CID:
if self._file_cid is not None:
return self._file_cid
self._file_cid = compute_cid_from_file(file=self.db_file_path, codec="raw")
return self._file_cid
def get_sqlalchemy_engine(self) -> "Engine":
if self._cached_engine is not None:
return self._cached_engine
def _pragma_on_connect(dbapi_con, con_record):
dbapi_con.execute("PRAGMA query_only = ON")
self._cached_engine = create_engine(self.db_url, future=True)
if self._lock:
event.listen(self._cached_engine, "connect", _pragma_on_connect)
return self._cached_engine
def _lock_db(self):
self._lock = True
self._invalidate()
def _unlock_db(self):
if self._immutable:
raise Exception("Can't unlock db, it's immutable.")
self._lock = False
self._invalidate()
def create_if_not_exists(self):
from sqlalchemy_utils import create_database, database_exists
if not database_exists(self.db_url):
create_database(self.db_url)
def execute_sql(
self,
statement: Union[str, "TextClause"],
data: Union[Mapping[str, Any], None] = None,
invalidate: bool = False,
):
"""Execute an sql script.
Arguments:
statement: the sql statement
data: (optional) data, to be bound to the statement
invalidate: whether to invalidate cached values within this object
"""
if isinstance(statement, str):
statement = text(statement)
if data:
statement.bindparams(**data)
with self.get_sqlalchemy_engine().connect() as con:
con.execute(statement)
if invalidate:
self._invalidate()
def _invalidate(self):
self._cached_engine = None
self._cached_inspector = None
self._table_names = None
# self._file_hash = None
self._metadata_obj = None
self._tables.clear()
def _invalidate_other(self):
pass
def get_sqlalchemy_metadata(self) -> MetaData:
"""Return the sqlalchemy Metadtaa object for the underlying database.
This is used internally, you typically don't need to access this attribute.
"""
if self._metadata_obj is None:
self._metadata_obj = MetaData()
return self._metadata_obj
def copy_database_file(self, target: str):
os.makedirs(os.path.dirname(target))
shutil.copy2(self.db_file_path, target)
new_db = KiaraDatabase(db_file_path=target)
# if self._file_hash:
# new_db._file_hash = self._file_hash
return new_db
def get_sqlalchemy_inspector(self) -> Inspector:
if self._cached_inspector is not None:
return self._cached_inspector
self._cached_inspector = inspect(self.get_sqlalchemy_engine())
return self._cached_inspector
@property
def table_names(self) -> Iterable[str]:
if self._table_names is not None:
return self._table_names
self._table_names = self.get_sqlalchemy_inspector().get_table_names()
return self._table_names
def get_sqlalchemy_table(self, table_name: str) -> Table:
"""Return the sqlalchemy edges table instance for this network datab."""
if table_name in self._tables.keys():
return self._tables[table_name]
table = Table(
table_name,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
self._tables[table_name] = table
return table
db_file_path: str
pydantic-field
required
¶The path to the sqlite database file.
db_url: str
property
readonly
¶file_cid: CID
property
readonly
¶table_names: Iterable[str]
property
readonly
¶copy_database_file(self, target)
¶Source code in tabular/models/db.py
def copy_database_file(self, target: str):
os.makedirs(os.path.dirname(target))
shutil.copy2(self.db_file_path, target)
new_db = KiaraDatabase(db_file_path=target)
# if self._file_hash:
# new_db._file_hash = self._file_hash
return new_db
create_if_not_exists(self)
¶Source code in tabular/models/db.py
def create_if_not_exists(self):
from sqlalchemy_utils import create_database, database_exists
if not database_exists(self.db_url):
create_database(self.db_url)
create_in_temp_dir(init_statement=None, init_data=None)
classmethod
¶Source code in tabular/models/db.py
@classmethod
def create_in_temp_dir(
cls,
init_statement: Union[None, str, "TextClause"] = None,
init_data: Union[Mapping[str, Any], None] = None,
):
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = cls(db_file_path=db_path)
db.create_if_not_exists()
if init_statement:
db._unlock_db()
db.execute_sql(statement=init_statement, data=init_data, invalidate=True)
db._lock_db()
return db
ensure_absolute_path(path)
classmethod
¶Source code in tabular/models/db.py
@validator("db_file_path", allow_reuse=True)
def ensure_absolute_path(cls, path: str):
path = os.path.abspath(path)
if not os.path.exists(os.path.dirname(path)):
raise ValueError(f"Parent folder for database file does not exist: {path}")
return path
execute_sql(self, statement, data=None, invalidate=False)
¶Execute an sql script.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
statement |
Union[str, TextClause] |
the sql statement |
required |
data |
Optional[Mapping[str, Any]] |
(optional) data, to be bound to the statement |
None |
invalidate |
bool |
whether to invalidate cached values within this object |
False |
Source code in tabular/models/db.py
def execute_sql(
self,
statement: Union[str, "TextClause"],
data: Union[Mapping[str, Any], None] = None,
invalidate: bool = False,
):
"""Execute an sql script.
Arguments:
statement: the sql statement
data: (optional) data, to be bound to the statement
invalidate: whether to invalidate cached values within this object
"""
if isinstance(statement, str):
statement = text(statement)
if data:
statement.bindparams(**data)
with self.get_sqlalchemy_engine().connect() as con:
con.execute(statement)
if invalidate:
self._invalidate()
get_sqlalchemy_engine(self)
¶Source code in tabular/models/db.py
def get_sqlalchemy_engine(self) -> "Engine":
if self._cached_engine is not None:
return self._cached_engine
def _pragma_on_connect(dbapi_con, con_record):
dbapi_con.execute("PRAGMA query_only = ON")
self._cached_engine = create_engine(self.db_url, future=True)
if self._lock:
event.listen(self._cached_engine, "connect", _pragma_on_connect)
return self._cached_engine
get_sqlalchemy_inspector(self)
¶Source code in tabular/models/db.py
def get_sqlalchemy_inspector(self) -> Inspector:
if self._cached_inspector is not None:
return self._cached_inspector
self._cached_inspector = inspect(self.get_sqlalchemy_engine())
return self._cached_inspector
get_sqlalchemy_metadata(self)
¶Return the sqlalchemy Metadtaa object for the underlying database.
This is used internally, you typically don't need to access this attribute.
Source code in tabular/models/db.py
def get_sqlalchemy_metadata(self) -> MetaData:
"""Return the sqlalchemy Metadtaa object for the underlying database.
This is used internally, you typically don't need to access this attribute.
"""
if self._metadata_obj is None:
self._metadata_obj = MetaData()
return self._metadata_obj
get_sqlalchemy_table(self, table_name)
¶Return the sqlalchemy edges table instance for this network datab.
Source code in tabular/models/db.py
def get_sqlalchemy_table(self, table_name: str) -> Table:
"""Return the sqlalchemy edges table instance for this network datab."""
if table_name in self._tables.keys():
return self._tables[table_name]
table = Table(
table_name,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
self._tables[table_name] = table
return table
SqliteTableSchema (BaseModel)
pydantic-model
¶Source code in tabular/models/db.py
class SqliteTableSchema(BaseModel):
columns: Dict[str, SqliteDataType] = Field(
description="The table columns and their attributes."
)
index_columns: List[str] = Field(
description="The columns to index", default_factory=list
)
nullable_columns: List[str] = Field(
description="The columns that are nullable.", default_factory=list
)
unique_columns: List[str] = Field(
description="The columns that should be marked 'UNIQUE'.", default_factory=list
)
primary_key: Union[str, None] = Field(
description="The primary key for this table.", default=None
)
def create_table_metadata(
self,
table_name: str,
) -> Tuple[MetaData, Table]:
"""Create an sql script to initialize a table.
Arguments:
column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
"""
table_columns = []
for column_name, data_type in self.columns.items():
column_obj = Column(
column_name,
SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
nullable=column_name in self.nullable_columns,
primary_key=column_name == self.primary_key,
index=column_name in self.index_columns,
unique=column_name in self.unique_columns,
)
table_columns.append(column_obj)
meta = MetaData()
table = Table(table_name, meta, *table_columns)
return meta, table
def create_table(self, table_name: str, engine: Engine) -> Table:
meta, table = self.create_table_metadata(table_name=table_name)
meta.create_all(engine)
return table
columns: Dict[str, Literal['NULL', 'INTEGER', 'REAL', 'TEXT', 'BLOB']]
pydantic-field
required
¶The table columns and their attributes.
index_columns: List[str]
pydantic-field
¶The columns to index
nullable_columns: List[str]
pydantic-field
¶The columns that are nullable.
primary_key: str
pydantic-field
¶The primary key for this table.
unique_columns: List[str]
pydantic-field
¶The columns that should be marked 'UNIQUE'.
create_table(self, table_name, engine)
¶Source code in tabular/models/db.py
def create_table(self, table_name: str, engine: Engine) -> Table:
meta, table = self.create_table_metadata(table_name=table_name)
meta.create_all(engine)
return table
create_table_metadata(self, table_name)
¶Create an sql script to initialize a table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column_attrs |
a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values |
required |
Source code in tabular/models/db.py
def create_table_metadata(
self,
table_name: str,
) -> Tuple[MetaData, Table]:
"""Create an sql script to initialize a table.
Arguments:
column_attrs: a map with the column name as key, and column details ('type', 'extra_column_info', 'create_index') as values
"""
table_columns = []
for column_name, data_type in self.columns.items():
column_obj = Column(
column_name,
SQLITE_SQLALCHEMY_TYPE_MAP[data_type],
nullable=column_name in self.nullable_columns,
primary_key=column_name == self.primary_key,
index=column_name in self.index_columns,
unique=column_name in self.unique_columns,
)
table_columns.append(column_obj)
meta = MetaData()
table = Table(table_name, meta, *table_columns)
return meta, table
table
¶
Classes¶
KiaraTable (KiaraModel)
pydantic-model
¶A wrapper class to manage tabular data in a memory efficient way.
Source code in tabular/models/table.py
class KiaraTable(KiaraModel):
"""A wrapper class to manage tabular data in a memory efficient way."""
@classmethod
def create_table(cls, data: Any) -> "KiaraTable":
"""Create a `KiaraTable` instance from an Apache Arrow Table, or dict of lists."""
table_obj = None
if isinstance(data, KiaraTable):
return data
if isinstance(data, (pa.Table)):
table_obj = data
else:
try:
table_obj = pa.table(data)
except Exception:
pass
if table_obj is None:
raise Exception(
f"Can't create table, invalid source data type: {type(data)}."
)
obj = KiaraTable()
obj._table_obj = table_obj
return obj
data_path: Union[None, str] = Field(
description="The path to the (feather) file backing this array.", default=None
)
"""The path where the table object is store (for internal or read-only use)."""
_table_obj: pa.Table = PrivateAttr(default=None)
def _retrieve_data_to_hash(self) -> Any:
raise NotImplementedError()
@property
def arrow_table(self) -> pa.Table:
"""Return the data as an Apache Arrow Table instance."""
if self._table_obj is not None:
return self._table_obj
if not self.data_path:
raise Exception("Can't retrieve table data, object not initialized (yet).")
with pa.memory_map(self.data_path, "r") as source:
table: pa.Table = pa.ipc.open_file(source).read_all()
self._table_obj = table
return self._table_obj
@property
def column_names(self) -> Iterable[str]:
"""Retrieve the names of all the columns of this table."""
return self.arrow_table.column_names
@property
def num_rows(self) -> int:
"""Return the number of rows in this table."""
return self.arrow_table.num_rows
def to_pydict(self):
"""Convert and return the table data as a dictionary of lists.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pydict()
def to_pylist(self):
"""Convert and return the table data as a list of rows/dictionaries.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pylist()
def to_pandas(self):
"""Convert and return the table data to a Pandas dataframe.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pandas()
arrow_table: Table
property
readonly
¶Return the data as an Apache Arrow Table instance.
column_names: Iterable[str]
property
readonly
¶Retrieve the names of all the columns of this table.
data_path: str
pydantic-field
¶The path to the (feather) file backing this array.
num_rows: int
property
readonly
¶Return the number of rows in this table.
create_table(data)
classmethod
¶Create a KiaraTable
instance from an Apache Arrow Table, or dict of lists.
Source code in tabular/models/table.py
@classmethod
def create_table(cls, data: Any) -> "KiaraTable":
"""Create a `KiaraTable` instance from an Apache Arrow Table, or dict of lists."""
table_obj = None
if isinstance(data, KiaraTable):
return data
if isinstance(data, (pa.Table)):
table_obj = data
else:
try:
table_obj = pa.table(data)
except Exception:
pass
if table_obj is None:
raise Exception(
f"Can't create table, invalid source data type: {type(data)}."
)
obj = KiaraTable()
obj._table_obj = table_obj
return obj
to_pandas(self)
¶Convert and return the table data to a Pandas dataframe.
This will load all data into memory, so you might or might not want to do that.
Source code in tabular/models/table.py
def to_pandas(self):
"""Convert and return the table data to a Pandas dataframe.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pandas()
to_pydict(self)
¶Convert and return the table data as a dictionary of lists.
This will load all data into memory, so you might or might not want to do that.
Source code in tabular/models/table.py
def to_pydict(self):
"""Convert and return the table data as a dictionary of lists.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pydict()
to_pylist(self)
¶Convert and return the table data as a list of rows/dictionaries.
This will load all data into memory, so you might or might not want to do that.
Source code in tabular/models/table.py
def to_pylist(self):
"""Convert and return the table data as a list of rows/dictionaries.
This will load all data into memory, so you might or might not want to do that.
"""
return self.arrow_table.to_pylist()
KiaraTableMetadata (ValueMetadata)
pydantic-model
¶File stats.
Source code in tabular/models/table.py
class KiaraTableMetadata(ValueMetadata):
"""File stats."""
_metadata_key = "table"
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["table"]
@classmethod
def create_value_metadata(cls, value: "Value") -> "KiaraTableMetadata":
kiara_table: KiaraTable = value.data
table: pa.Table = kiara_table.arrow_table
table_schema = {}
for name in table.schema.names:
field = table.schema.field(name)
md = field.metadata
_type = field.type
if not md:
md = {
"arrow_type_id": _type.id,
}
_d = {
"type_name": str(_type),
"metadata": md,
}
table_schema[name] = _d
schema = {
"column_names": table.column_names,
"column_schema": table_schema,
"rows": table.num_rows,
"size": table.nbytes,
}
md = TableMetadata.construct(**schema)
return KiaraTableMetadata.construct(table=md)
table: TableMetadata = Field(description="The table schema.")
table: TableMetadata
pydantic-field
required
¶The table schema.
create_value_metadata(value)
classmethod
¶Source code in tabular/models/table.py
@classmethod
def create_value_metadata(cls, value: "Value") -> "KiaraTableMetadata":
kiara_table: KiaraTable = value.data
table: pa.Table = kiara_table.arrow_table
table_schema = {}
for name in table.schema.names:
field = table.schema.field(name)
md = field.metadata
_type = field.type
if not md:
md = {
"arrow_type_id": _type.id,
}
_d = {
"type_name": str(_type),
"metadata": md,
}
table_schema[name] = _d
schema = {
"column_names": table.column_names,
"column_schema": table_schema,
"rows": table.num_rows,
"size": table.nbytes,
}
md = TableMetadata.construct(**schema)
return KiaraTableMetadata.construct(table=md)
retrieve_supported_data_types()
classmethod
¶Source code in tabular/models/table.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["table"]
modules
special
¶
Modules¶
array
special
¶
FORCE_NON_NULL_DOC
¶MAX_INDEX_DOC
¶MIN_INDEX_DOC
¶REMOVE_TOKENS_DOC
¶Classes¶
DeserializeArrayModule (DeserializeValueModule)
¶Deserialize array data.
Source code in tabular/modules/array/__init__.py
class DeserializeArrayModule(DeserializeValueModule):
"""Deserialize array data."""
_module_type_name = "load.array"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraArray}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "array"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
def to__python_object(self, data: SerializedData, **config: Any):
assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("array.arrow")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
array_file = files[0]
array = KiaraArray(data_path=array_file)
return array
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "array"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/array/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraArray}
to__python_object(self, data, **config)
¶Source code in tabular/modules/array/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
assert "array.arrow" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("array.arrow")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
array_file = files[0]
array = KiaraArray(data_path=array_file)
return array
ExtractDateConfig (KiaraInputsConfig)
pydantic-model
¶Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):
force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
min_index: Union[None, int] = Field(
description=MIN_INDEX_DOC,
default=None,
)
max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
remove_tokens: List[str] = Field(
description=REMOVE_TOKENS_DOC, default_factory=list
)
force_non_null: bool
pydantic-field
¶If set to 'True', raise an error if any of the strings in the array can't be parsed.
max_index: int
pydantic-field
¶The maximum index until whic to parse the string(s).
min_index: int
pydantic-field
¶The minimum index from where to start parsing the string(s).
remove_tokens: List[str]
pydantic-field
¶A list of tokens/characters to replace with a single white-space before parsing the input.
ExtractDateModule (AutoInputsKiaraModule)
¶Create an array of date objects from an array of strings.
This module is very simplistic at the moment, more functionality and options will be added in the future.
At its core, this module uses the standard parser from the dateutil package to parse strings into dates. As this parser can't handle complex strings, the input strings can be pre-processed in the following ways:
- 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
- remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)
By default, if an input string can't be parsed this module will raise an exception. This can be prevented by setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings will appear as 'NULL' value in the resulting array.
Source code in tabular/modules/array/__init__.py
class ExtractDateModule(AutoInputsKiaraModule):
"""Create an array of date objects from an array of strings.
This module is very simplistic at the moment, more functionality and options will be added in the future.
At its core, this module uses the standard parser from the
[dateutil](https://github.com/dateutil/dateutil) package to parse strings into dates. As this parser can't handle
complex strings, the input strings can be pre-processed in the following ways:
- 'cut' non-relevant parts of the string (using 'min_index' & 'max_index' input/config options)
- remove matching tokens from the string, and replace them with a single whitespace (using the 'remove_tokens' option)
By default, if an input string can't be parsed this module will raise an exception. This can be prevented by
setting this modules 'force_non_null' config option or input to 'False', in which case un-parsable strings
will appear as 'NULL' value in the resulting array.
"""
_module_type_name = "parse.date_array"
_config_cls = ExtractDateConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {"array": {"type": "array", "doc": "The input array."}}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"date_array": {
"type": "array",
"doc": "The resulting array with items of a date data type.",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_field(
field_name="force_non_null", inputs=inputs
)
min_pos: Union[None, int] = self.get_data_for_field(
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_for_field(
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_data_for_field(
field_name="remove_tokens", inputs=inputs
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type: ignore
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_pos] # type: ignore # noqa
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=True)
except Exception as e:
if force_non_null:
raise KiaraProcessingException(e)
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from string: {text}"
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=array.arrow_array)
job_log.add_log(f"start parsing date for {len(array)} items")
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date for {len(array)} items")
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
_config_cls (KiaraInputsConfig)
private
pydantic-model
¶Source code in tabular/modules/array/__init__.py
class ExtractDateConfig(KiaraInputsConfig):
force_non_null: bool = Field(description=FORCE_NON_NULL_DOC, default=True)
min_index: Union[None, int] = Field(
description=MIN_INDEX_DOC,
default=None,
)
max_index: Union[None, int] = Field(description=MAX_INDEX_DOC, default=None)
remove_tokens: List[str] = Field(
description=REMOVE_TOKENS_DOC, default_factory=list
)
force_non_null: bool
pydantic-field
¶If set to 'True', raise an error if any of the strings in the array can't be parsed.
max_index: int
pydantic-field
¶The maximum index until whic to parse the string(s).
min_index: int
pydantic-field
¶The minimum index from where to start parsing the string(s).
remove_tokens: List[str]
pydantic-field
¶A list of tokens/characters to replace with a single white-space before parsing the input.
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/array/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {"array": {"type": "array", "doc": "The input array."}}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/array/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"date_array": {
"type": "array",
"doc": "The resulting array with items of a date data type.",
}
}
process(self, inputs, outputs, job_log)
¶Source code in tabular/modules/array/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog):
import polars as pl
import pyarrow as pa
from dateutil import parser
force_non_null: bool = self.get_data_for_field(
field_name="force_non_null", inputs=inputs
)
min_pos: Union[None, int] = self.get_data_for_field(
field_name="min_index", inputs=inputs
)
if min_pos is None:
min_pos = 0
max_pos: Union[None, int] = self.get_data_for_field(
field_name="max_index", inputs=inputs
)
remove_tokens: Iterable[str] = self.get_data_for_field(
field_name="remove_tokens", inputs=inputs
)
def parse_date(_text: str):
text = _text
if min_pos:
try:
text = text[min_pos:] # type: ignore
except Exception:
return None
if max_pos:
try:
text = text[0 : max_pos - min_pos] # type: ignore # noqa
except Exception:
pass
if remove_tokens:
for t in remove_tokens:
text = text.replace(t, " ")
try:
d_obj = parser.parse(text, fuzzy=True)
except Exception as e:
if force_non_null:
raise KiaraProcessingException(e)
return None
if d_obj is None:
if force_non_null:
raise KiaraProcessingException(
f"Can't parse date from string: {text}"
)
return None
return d_obj
value = inputs.get_value_obj("array")
array: KiaraArray = value.data
series = pl.Series(name="tokens", values=array.arrow_array)
job_log.add_log(f"start parsing date for {len(array)} items")
result = series.apply(parse_date)
job_log.add_log(f"finished parsing date for {len(array)} items")
result_array = result.to_arrow()
# TODO: remove this cast once the array data type can handle non-chunked arrays
chunked = pa.chunked_array(result_array)
outputs.set_values(date_array=chunked)
db
special
¶
Classes¶
CreateDatabaseModule (CreateFromModule)
¶Source code in tabular/modules/db/__init__.py
class CreateDatabaseModule(CreateFromModule):
_module_type_name = "create.database"
_config_cls = CreateDatabaseModuleConfig
def create__database__from__csv_file(self, source_value: Value) -> Any:
"""Create a database from a csv_file value."""
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: FileModel = source_value.data
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=file_item.path, reason=str(e))
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value(
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
included_files = {file_item.file_name: file_item}
file_bundle = FileBundle.create_from_file_models(
files=included_files, bundle_name=file_item.file_name
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
"""Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table")
if merge_into_single_table:
raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: FileBundle = source_value.data
table_names: List[str] = []
for rel_path in sorted(bundle.included_files.keys()):
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
if target_type == "database" and source_type == "table":
return {
"table_name": {
"type": "string",
"doc": "The name of the table in the new database.",
"default": "imported_table",
}
}
else:
return None
def create__database__from__table(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("table_name")
if not table_name:
table_name = "imported_table"
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table(
table=arrow_table, index_columns=index_columns, column_map=column_map
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
):
conn.execute(insert(_table), batch.to_pylist())
conn.commit()
db._lock_db()
return db
_config_cls (CreateFromModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
merge_into_single_table: bool = Field(
description="Whether to merge all csv files into a single table.", default=False
)
include_source_metadata: Union[bool, None] = Field(
description="Whether to include a table with metadata about the source files.",
default=None,
)
include_source_file_content: bool = Field(
description="When including source metadata, whether to also include the original raw (string) content.",
default=False,
)
ignore_errors: bool
pydantic-field
¶Whether to ignore convert errors and omit the failed items.
include_source_file_content: bool
pydantic-field
¶When including source metadata, whether to also include the original raw (string) content.
include_source_metadata: bool
pydantic-field
¶Whether to include a table with metadata about the source files.
merge_into_single_table: bool
pydantic-field
¶Whether to merge all csv files into a single table.
create__database__from__csv_file(self, source_value)
¶Create a database from a csv_file value.
Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file(self, source_value: Value) -> Any:
"""Create a database from a csv_file value."""
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
file_item: FileModel = source_value.data
table_name = file_item.file_name_without_extension
table_name = table_name.replace("-", "_")
table_name = table_name.replace(".", "_")
try:
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=file_item.path, reason=str(e))
else:
raise KiaraProcessingException(e)
include_raw_content_in_file_info: bool = self.get_config_value(
"include_source_metadata"
)
if include_raw_content_in_file_info:
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
included_files = {file_item.file_name: file_item}
file_bundle = FileBundle.create_from_file_models(
files=included_files, bundle_name=file_item.file_name
)
insert_db_table_from_file_bundle(
database=db,
file_bundle=file_bundle,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
create__database__from__csv_file_bundle(self, source_value)
¶Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table in the resulting database. If this option is set, only a single table with all the values of all csv files will be created. For this to work, all csv files should follow the same schema.
Source code in tabular/modules/db/__init__.py
def create__database__from__csv_file_bundle(self, source_value: Value) -> Any:
"""Create a database from a csv_file_bundle value.
Unless 'merge_into_single_table' is set to 'True', each csv file will create one table
in the resulting database. If this option is set, only a single table with all the values of all
csv files will be created. For this to work, all csv files should follow the same schema.
"""
merge_into_single_table = self.get_config_value("merge_into_single_table")
if merge_into_single_table:
raise NotImplementedError("Not supported (yet).")
include_raw_content_in_file_info: Union[bool, None] = self.get_config_value(
"include_source_metadata"
)
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "db.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = KiaraDatabase(db_file_path=db_path)
db.create_if_not_exists()
# TODO: check whether/how to add indexes
bundle: FileBundle = source_value.data
table_names: List[str] = []
for rel_path in sorted(bundle.included_files.keys()):
file_item = bundle.included_files[rel_path]
table_name = find_free_id(
stem=file_item.file_name_without_extension, current_ids=table_names
)
try:
table_names.append(table_name)
create_sqlite_table_from_tabular_file(
target_db_file=db_path, file_item=file_item, table_name=table_name
)
except Exception as e:
if self.get_config_value("ignore_errors") is True or True:
log_message("ignore.import_file", file=rel_path, reason=str(e))
continue
raise KiaraProcessingException(e)
if include_raw_content_in_file_info in [None, True]:
include_content: bool = self.get_config_value("include_source_file_content")
db._unlock_db()
insert_db_table_from_file_bundle(
database=db,
file_bundle=source_value.data,
table_name="source_files_metadata",
include_content=include_content,
)
db._lock_db()
return db_path
create__database__from__table(self, source_value, optional)
¶Create a database value from a table.
Source code in tabular/modules/db/__init__.py
def create__database__from__table(
self, source_value: Value, optional: ValueMap
) -> Any:
"""Create a database value from a table."""
table_name = optional.get_value_data("table_name")
if not table_name:
table_name = "imported_table"
table: KiaraTable = source_value.data
arrow_table = table.arrow_table
column_map = None
index_columns = None
sqlite_schema = create_sqlite_schema_data_from_arrow_table(
table=arrow_table, index_columns=index_columns, column_map=column_map
)
db = KiaraDatabase.create_in_temp_dir()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
_table = sqlite_schema.create_table(table_name=table_name, engine=engine)
with engine.connect() as conn:
for batch in arrow_table.to_batches(
max_chunksize=DEFAULT_TABULAR_DATA_CHUNK_SIZE
):
conn.execute(insert(_table), batch.to_pylist())
conn.commit()
db._lock_db()
return db
create_optional_inputs(self, source_type, target_type)
¶Source code in tabular/modules/db/__init__.py
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
if target_type == "database" and source_type == "table":
return {
"table_name": {
"type": "string",
"doc": "The name of the table in the new database.",
"default": "imported_table",
}
}
else:
return None
CreateDatabaseModuleConfig (CreateFromModuleConfig)
pydantic-model
¶Source code in tabular/modules/db/__init__.py
class CreateDatabaseModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
merge_into_single_table: bool = Field(
description="Whether to merge all csv files into a single table.", default=False
)
include_source_metadata: Union[bool, None] = Field(
description="Whether to include a table with metadata about the source files.",
default=None,
)
include_source_file_content: bool = Field(
description="When including source metadata, whether to also include the original raw (string) content.",
default=False,
)
ignore_errors: bool
pydantic-field
¶Whether to ignore convert errors and omit the failed items.
include_source_file_content: bool
pydantic-field
¶When including source metadata, whether to also include the original raw (string) content.
include_source_metadata: bool
pydantic-field
¶Whether to include a table with metadata about the source files.
merge_into_single_table: bool
pydantic-field
¶Whether to merge all csv files into a single table.
LoadDatabaseFromDiskModule (DeserializeValueModule)
¶Source code in tabular/modules/db/__init__.py
class LoadDatabaseFromDiskModule(DeserializeValueModule):
_module_type_name = "load.database"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraDatabase}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "database"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "copy"
def to__python_object(self, data: SerializedData, **config: Any):
assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("db.sqlite")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
db_file = files[0]
db = KiaraDatabase(db_file_path=db_file)
return db
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "database"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "copy"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/db/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraDatabase}
to__python_object(self, data, **config)
¶Source code in tabular/modules/db/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
assert "db.sqlite" in data.get_keys() and len(list(data.get_keys())) == 1
chunks = data.get_serialized_data("db.sqlite")
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
db_file = files[0]
db = KiaraDatabase(db_file_path=db_file)
return db
QueryDatabaseConfig (KiaraModuleConfig)
pydantic-model
¶
QueryDatabaseModule (KiaraModule)
¶Execute a sql query against a (sqlite) database.
Source code in tabular/modules/db/__init__.py
class QueryDatabaseModule(KiaraModule):
"""Execute a sql query against a (sqlite) database."""
_config_cls = QueryDatabaseConfig
_module_type_name = "query.database"
def create_inputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {
"database": {"type": "database", "doc": "The database to query."}
}
if not self.get_config_value("query"):
result["query"] = {"type": "string", "doc": "The query to execute."}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
def process(self, inputs: ValueMap, outputs: ValueMap):
import pyarrow as pa
database: KiaraDatabase = inputs.get_value_data("database")
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, []).append(v)
table = pa.Table.from_pydict(result_columns)
outputs.set_value("query_result", table)
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/db/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {
"database": {"type": "database", "doc": "The database to query."}
}
if not self.get_config_value("query"):
result["query"] = {"type": "string", "doc": "The query to execute."}
return result
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/db/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
¶Source code in tabular/modules/db/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap):
import pyarrow as pa
database: KiaraDatabase = inputs.get_value_data("database")
query = self.get_config_value("query")
if query is None:
query = inputs.get_value_data("query")
# TODO: make this memory efficent
result_columns: Dict[str, List[Any]] = {}
with database.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
for r in result:
for k, v in dict(r).items():
result_columns.setdefault(k, []).append(v)
table = pa.Table.from_pydict(result_columns)
outputs.set_value("query_result", table)
RenderDatabaseModule (RenderDatabaseModuleBase)
¶Source code in tabular/modules/db/__init__.py
class RenderDatabaseModule(RenderDatabaseModuleBase):
_module_type_name = "render.database"
def render__database__as__string(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
rendered=pretty,
related_scenes=data_related_scenes,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
)
def render__database__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
rendered=pretty,
related_scenes=data_related_scenes,
render_manifest=self.manifest.manifest_hash,
)
render__database__as__string(self, value, render_config)
¶Source code in tabular/modules/db/__init__.py
def render__database__as__string(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
rendered=pretty,
related_scenes=data_related_scenes,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
)
render__database__as__terminal_renderable(self, value, render_config)
¶Source code in tabular/modules/db/__init__.py
def render__database__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
table_name = render_config.get("table_name", None)
wrap, data_related_scenes = self.preprocess_database(
value=value,
table_name=table_name,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
rendered=pretty,
related_scenes=data_related_scenes,
render_manifest=self.manifest.manifest_hash,
)
RenderDatabaseModuleBase (RenderValueModule)
¶Source code in tabular/modules/db/__init__.py
class RenderDatabaseModuleBase(RenderValueModule):
_module_type_name: str = None # type: ignore
def preprocess_database(
self,
value: Value,
table_name: Union[str, None],
input_number_of_rows: int,
input_row_offset: int,
):
database: KiaraDatabase = value.data
table_names = database.table_names
if not table_name:
table_name = list(table_names)[0]
if table_name not in table_names:
raise Exception(
f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
)
related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
t: RenderScene.construct(
title=t,
description=f"Display the '{t}' table.",
manifest_hash=self.manifest.manifest_hash,
render_config={"table_name": t},
)
for t in database.table_names
}
query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
result: Dict[str, List[Any]] = {}
# TODO: this could be written much more efficient
with database.get_sqlalchemy_engine().connect() as con:
num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
table_num_rows = num_rows_result.fetchone()[0]
rs = con.execute(text(query))
for r in rs:
for k, v in dict(r).items():
result.setdefault(k, []).append(v)
wrap = DictTabularWrap(data=result)
row_offset = table_num_rows - input_number_of_rows
related_scenes: Dict[str, Union[RenderScene, None]] = {}
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < table_num_rows:
next = {
"row_offset": n_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(table_num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > table_num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
"table_name": table_name,
},
)
related_scenes_tables[table_name].disabled = True # type: ignore
related_scenes_tables[table_name].related_scenes = related_scenes # type: ignore
return wrap, related_scenes_tables
preprocess_database(self, value, table_name, input_number_of_rows, input_row_offset)
¶Source code in tabular/modules/db/__init__.py
def preprocess_database(
self,
value: Value,
table_name: Union[str, None],
input_number_of_rows: int,
input_row_offset: int,
):
database: KiaraDatabase = value.data
table_names = database.table_names
if not table_name:
table_name = list(table_names)[0]
if table_name not in table_names:
raise Exception(
f"Invalid table name: {table_name}. Available: {', '.join(table_names)}"
)
related_scenes_tables: Dict[str, Union[RenderScene, None]] = {
t: RenderScene.construct(
title=t,
description=f"Display the '{t}' table.",
manifest_hash=self.manifest.manifest_hash,
render_config={"table_name": t},
)
for t in database.table_names
}
query = f"""SELECT * FROM {table_name} LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
result: Dict[str, List[Any]] = {}
# TODO: this could be written much more efficient
with database.get_sqlalchemy_engine().connect() as con:
num_rows_result = con.execute(text(f"SELECT count(*) from {table_name}"))
table_num_rows = num_rows_result.fetchone()[0]
rs = con.execute(text(query))
for r in rs:
for k, v in dict(r).items():
result.setdefault(k, []).append(v)
wrap = DictTabularWrap(data=result)
row_offset = table_num_rows - input_number_of_rows
related_scenes: Dict[str, Union[RenderScene, None]] = {}
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < table_num_rows:
next = {
"row_offset": n_offset,
"number_of_rows": input_number_of_rows,
"table_name": table_name,
}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(table_num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > table_num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
"table_name": table_name,
},
)
related_scenes_tables[table_name].disabled = True # type: ignore
related_scenes_tables[table_name].related_scenes = related_scenes # type: ignore
return wrap, related_scenes_tables
table
special
¶
EMPTY_COLUMN_NAME_MARKER
¶Classes¶
CreateTableModule (CreateFromModule)
¶Source code in tabular/modules/table/__init__.py
class CreateTableModule(CreateFromModule):
_module_type_name = "create.table"
_config_cls = CreateTableModuleConfig
def create__table__from__csv_file(self, source_value: Value) -> Any:
"""Create a table from a csv_file value."""
from pyarrow import csv
input_file: FileModel = source_value.data
imported_data = csv.read_csv(input_file.path)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_data)
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
"""Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns:
- id: an auto-assigned index
- rel_path: the relative path of the file (from the provided base path)
- content: the text file content
"""
import pyarrow as pa
bundle: FileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
ignore_errors = self.get_config_value("ignore_errors")
file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorted(file_dict.keys())):
if column == "content":
_value: Any = file_dict[rel_path]
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_files[rel_path]
_value = getattr(file_model, column)
tabular.setdefault(column, []).append(_value)
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)
_config_cls (CreateFromModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
create__table__from__csv_file(self, source_value)
¶Create a table from a csv_file value.
Source code in tabular/modules/table/__init__.py
def create__table__from__csv_file(self, source_value: Value) -> Any:
"""Create a table from a csv_file value."""
from pyarrow import csv
input_file: FileModel = source_value.data
imported_data = csv.read_csv(input_file.path)
# import pandas as pd
# df = pd.read_csv(input_file.path)
# imported_data = pa.Table.from_pandas(df)
return KiaraTable.create_table(imported_data)
create__table__from__text_file_bundle(self, source_value)
¶Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns: - id: an auto-assigned index - rel_path: the relative path of the file (from the provided base path) - content: the text file content
Source code in tabular/modules/table/__init__.py
def create__table__from__text_file_bundle(self, source_value: Value) -> Any:
"""Create a table value from a text file_bundle.
The resulting table will have (at a minimum) the following collumns:
- id: an auto-assigned index
- rel_path: the relative path of the file (from the provided base path)
- content: the text file content
"""
import pyarrow as pa
bundle: FileBundle = source_value.data
columns = FILE_BUNDLE_IMPORT_AVAILABLE_COLUMNS
ignore_errors = self.get_config_value("ignore_errors")
file_dict = bundle.read_text_file_contents(ignore_errors=ignore_errors)
# TODO: use chunks to save on memory
tabular: Dict[str, List[Any]] = {}
for column in columns:
for index, rel_path in enumerate(sorted(file_dict.keys())):
if column == "content":
_value: Any = file_dict[rel_path]
elif column == "id":
_value = index
elif column == "rel_path":
_value = rel_path
else:
file_model = bundle.included_files[rel_path]
_value = getattr(file_model, column)
tabular.setdefault(column, []).append(_value)
table = pa.Table.from_pydict(tabular)
return KiaraTable.create_table(table)
CreateTableModuleConfig (CreateFromModuleConfig)
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class CreateTableModuleConfig(CreateFromModuleConfig):
ignore_errors: bool = Field(
description="Whether to ignore convert errors and omit the failed items.",
default=False,
)
CutColumnModule (KiaraModule)
¶Cut off one column from a table, returning an array.
Source code in tabular/modules/table/__init__.py
class CutColumnModule(KiaraModule):
"""Cut off one column from a table, returning an array."""
_module_type_name = "table.cut_column"
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Mapping[str, Any] = {
"table": {"type": "table", "doc": "A table."},
"column_name": {
"type": "string",
"doc": "The name of the column to extract.",
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: str = inputs.get_value_data("column_name")
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Mapping[str, Any] = {
"table": {"type": "table", "doc": "A table."},
"column_name": {
"type": "string",
"doc": "The name of the column to extract.",
},
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {"array": {"type": "array", "doc": "The column."}}
return outputs
process(self, inputs, outputs)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import pyarrow as pa
column_name: str = inputs.get_value_data("column_name")
table_value: Value = inputs.get_value_obj("table")
table_metadata: KiaraTableMetadata = table_value.get_property_data(
"metadata.table"
)
available = table_metadata.table.column_names
if column_name not in available:
raise KiaraProcessingException(
f"Invalid column name '{column_name}'. Available column names: {', '.join(available)}"
)
table: pa.Table = table_value.data.arrow_table
column = table.column(column_name)
outputs.set_value("array", column)
DeserializeTableModule (DeserializeValueModule)
¶Source code in tabular/modules/table/__init__.py
class DeserializeTableModule(DeserializeValueModule):
_module_type_name = "load.table"
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraTable}
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "table"
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
def to__python_object(self, data: SerializedData, **config: Any):
import pyarrow as pa
columns = {}
for column_name in data.get_keys():
chunks = data.get_serialized_data(column_name)
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as column_chunk:
loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
column = loaded_arrays.column(column_name)
if column_name == EMPTY_COLUMN_NAME_MARKER:
columns[""] = column
else:
columns[column_name] = column
arrow_table = pa.table(columns)
table = KiaraTable.create_table(arrow_table)
return table
retrieve_serialized_value_type()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_serialized_value_type(cls) -> str:
return "table"
retrieve_supported_serialization_profile()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_serialization_profile(cls) -> str:
return "feather"
retrieve_supported_target_profiles()
classmethod
¶Source code in tabular/modules/table/__init__.py
@classmethod
def retrieve_supported_target_profiles(cls) -> Mapping[str, Type]:
return {"python_object": KiaraTable}
to__python_object(self, data, **config)
¶Source code in tabular/modules/table/__init__.py
def to__python_object(self, data: SerializedData, **config: Any):
import pyarrow as pa
columns = {}
for column_name in data.get_keys():
chunks = data.get_serialized_data(column_name)
# TODO: support multiple chunks
assert chunks.get_number_of_chunks() == 1
files = list(chunks.get_chunks(as_files=True, symlink_ok=True))
assert len(files) == 1
file = files[0]
with pa.memory_map(file, "r") as column_chunk:
loaded_arrays: pa.Table = pa.ipc.open_file(column_chunk).read_all()
column = loaded_arrays.column(column_name)
if column_name == EMPTY_COLUMN_NAME_MARKER:
columns[""] = column
else:
columns[column_name] = column
arrow_table = pa.table(columns)
table = KiaraTable.create_table(arrow_table)
return table
ExportTableModule (DataExportModule)
¶Export table data items.
Source code in tabular/modules/table/__init__.py
class ExportTableModule(DataExportModule):
"""Export table data items."""
_module_type_name = "export.table"
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
"""Export a table as csv file."""
import pyarrow.csv as csv
target_path = os.path.join(base_path, f"{name}.csv")
csv.write_csv(value.arrow_table, target_path)
return {"files": target_path}
# def export__table__as__sqlite_db(
# self, value: KiaraTable, base_path: str, name: str
# ):
#
# target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
#
# raise NotImplementedError()
# # shutil.copy2(value.db_file_path, target_path)
#
# return {"files": target_path}
export__table__as__csv_file(self, value, base_path, name)
¶Export a table as csv file.
Source code in tabular/modules/table/__init__.py
def export__table__as__csv_file(self, value: KiaraTable, base_path: str, name: str):
"""Export a table as csv file."""
import pyarrow.csv as csv
target_path = os.path.join(base_path, f"{name}.csv")
csv.write_csv(value.arrow_table, target_path)
return {"files": target_path}
MergeTableConfig (KiaraModuleConfig)
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):
inputs_schema: Dict[str, ValueSchema] = Field(
description="A dict describing the inputs for this merge process."
)
column_map: Dict[str, str] = Field(
description="A map describing", default_factory=dict
)
MergeTableModule (KiaraModule)
¶Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the desired column name as key, and a field-name in the following format as value: - '[inputs_schema key]' for inputs of type 'array' - '[inputs_schema_key].orig_column_name' for inputs of type 'table'
Source code in tabular/modules/table/__init__.py
class MergeTableModule(KiaraModule):
"""Create a table from other tables and/or arrays.
This module needs configuration to be set (for now). It's currently not possible to merge an arbitrary
number of tables/arrays, all tables to be merged must be specified in the module configuration.
Column names of the resulting table can be controlled by the 'column_map' configuration, which takes the
desired column name as key, and a field-name in the following format as value:
- '[inputs_schema key]' for inputs of type 'array'
- '[inputs_schema_key].orig_column_name' for inputs of type 'table'
"""
_module_type_name = "table.merge"
_config_cls = MergeTableConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_dict = self.get_config_value("inputs_schema")
return input_schema_dict
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, including all source tables and columns.",
}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
else:
if all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class MergeTableConfig(KiaraModuleConfig):
inputs_schema: Dict[str, ValueSchema] = Field(
description="A dict describing the inputs for this merge process."
)
column_map: Dict[str, str] = Field(
description="A map describing", default_factory=dict
)
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
input_schema_dict = self.get_config_value("inputs_schema")
return input_schema_dict
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs = {
"table": {
"type": "table",
"doc": "The merged table, including all source tables and columns.",
}
}
return outputs
process(self, inputs, outputs, job_log)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
import pyarrow as pa
inputs_schema: Dict[str, Any] = self.get_config_value("inputs_schema")
column_map: Dict[str, str] = self.get_config_value("column_map")
sources = {}
for field_name in inputs_schema.keys():
sources[field_name] = inputs.get_value_data(field_name)
len_dict = {}
arrays = {}
column_map_final = dict(column_map)
for source_key, table_or_array in sources.items():
if isinstance(table_or_array, KiaraTable):
rows = table_or_array.num_rows
for name in table_or_array.column_names:
array_name = f"{source_key}.{name}"
if column_map and array_name not in column_map.values():
job_log.add_log(
f"Ignoring column '{name}' of input table '{source_key}': not listed in column_map."
)
continue
column = table_or_array.arrow_table.column(name)
arrays[array_name] = column
if not column_map:
if name in column_map_final:
raise Exception(
f"Can't merge table, duplicate column name: {name}."
)
column_map_final[name] = array_name
elif isinstance(table_or_array, KiaraArray):
if column_map and source_key not in column_map.values():
job_log.add_log(
f"Ignoring array '{source_key}': not listed in column_map."
)
continue
rows = len(table_or_array)
arrays[source_key] = table_or_array.arrow_array
if not column_map:
if source_key in column_map_final.keys():
raise Exception(
f"Can't merge table, duplicate column name: {source_key}."
)
column_map_final[source_key] = source_key
else:
raise KiaraProcessingException(
f"Can't merge table: invalid type '{type(table_or_array)}' for source '{source_key}'."
)
len_dict[source_key] = rows
all_rows = None
for source_key, rows in len_dict.items():
if all_rows is None:
all_rows = rows
else:
if all_rows != rows:
all_rows = None
break
if all_rows is None:
len_str = ""
for name, rows in len_dict.items():
len_str = f" {name} ({rows})"
raise KiaraProcessingException(
f"Can't merge table, sources have different lengths: {len_str}"
)
column_names = []
columns = []
for column_name, ref in column_map_final.items():
column_names.append(column_name)
column = arrays[ref]
columns.append(column)
table = pa.Table.from_arrays(arrays=columns, names=column_names)
outputs.set_value("table", table)
QueryTableSQL (KiaraModule)
¶Execute a sql query against an (Arrow) table.
The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.
If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set one will be used.
Source code in tabular/modules/table/__init__.py
class QueryTableSQL(KiaraModule):
"""Execute a sql query against an (Arrow) table.
The default relation name for the sql query is 'data', but can be modified by the 'relation_name' config option/input.
If the 'query' module config option is not set, users can provide their own query, otherwise the pre-set
one will be used.
"""
_module_type_name = "query.table"
_config_cls = QueryTableSQLModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {
"table": {
"type": "table",
"doc": "The table to query",
}
}
if self.get_config_value("query") is None:
inputs["query"] = {"type": "string", "doc": "The query."}
inputs["relation_name"] = {
"type": "string",
"doc": "The name the table is referred to in the sql query.",
"default": "data",
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("query")
_relation_name: str = inputs.get_value_data("relation_name")
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value("relation_name")
if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
raise KiaraProcessingException(
f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
)
_table: KiaraTable = inputs.get_value_data("table")
rel_from_arrow = duckdb.arrow(_table.arrow_table)
result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)
outputs.set_value("query_result", result.arrow())
_config_cls (KiaraModuleConfig)
private
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):
query: Union[str, None] = Field(
description="The query to execute. If not specified, the user will be able to provide their own.",
default=None,
)
relation_name: Union[str, None] = Field(
description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
default="data",
)
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in tabular/modules/table/__init__.py
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs = {
"table": {
"type": "table",
"doc": "The table to query",
}
}
if self.get_config_value("query") is None:
inputs["query"] = {"type": "string", "doc": "The query."}
inputs["relation_name"] = {
"type": "string",
"doc": "The name the table is referred to in the sql query.",
"default": "data",
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in tabular/modules/table/__init__.py
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {"query_result": {"type": "table", "doc": "The query result."}}
process(self, inputs, outputs)
¶Source code in tabular/modules/table/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
import duckdb
if self.get_config_value("query") is None:
_query: str = inputs.get_value_data("query")
_relation_name: str = inputs.get_value_data("relation_name")
else:
_query = self.get_config_value("query")
_relation_name = self.get_config_value("relation_name")
if _relation_name.upper() in RESERVED_SQL_KEYWORDS:
raise KiaraProcessingException(
f"Invalid relation name '{_relation_name}': this is a reserved sql keyword, please select a different name."
)
_table: KiaraTable = inputs.get_value_data("table")
rel_from_arrow = duckdb.arrow(_table.arrow_table)
result: duckdb.DuckDBPyRelation = rel_from_arrow.query(_relation_name, _query)
outputs.set_value("query_result", result.arrow())
QueryTableSQLModuleConfig (KiaraModuleConfig)
pydantic-model
¶Source code in tabular/modules/table/__init__.py
class QueryTableSQLModuleConfig(KiaraModuleConfig):
query: Union[str, None] = Field(
description="The query to execute. If not specified, the user will be able to provide their own.",
default=None,
)
relation_name: Union[str, None] = Field(
description="The name the table is referred to in the sql query. If not specified, the user will be able to provide their own.",
default="data",
)
RenderTableModule (RenderTableModuleBase)
¶Source code in tabular/modules/table/__init__.py
class RenderTableModule(RenderTableModuleBase):
_module_type_name = "render.table"
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
def render__table__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
render__table__as__string(self, value, render_config)
¶Source code in tabular/modules/table/__init__.py
def render__table__as__string(self, value: Value, render_config: Mapping[str, Any]):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_string(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
render__table__as__terminal_renderable(self, value, render_config)
¶Source code in tabular/modules/table/__init__.py
def render__table__as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
):
input_number_of_rows = render_config.get("number_of_rows", 20)
input_row_offset = render_config.get("row_offset", 0)
wrap, data_related_scenes = self.preprocess_table(
value=value,
input_number_of_rows=input_number_of_rows,
input_row_offset=input_row_offset,
)
pretty = wrap.as_terminal_renderable(max_row_height=1)
return RenderValueResult(
value_id=value.value_id,
render_config=render_config,
render_manifest=self.manifest.manifest_hash,
rendered=pretty,
related_scenes=data_related_scenes,
)
RenderTableModuleBase (RenderValueModule)
¶Source code in tabular/modules/table/__init__.py
class RenderTableModuleBase(RenderValueModule):
_module_type_name: str = None # type: ignore
def preprocess_table(
self, value: Value, input_number_of_rows: int, input_row_offset: int
):
import duckdb
import pyarrow as pa
if value.data_type_name == "array":
array: KiaraArray = value.data
arrow_table = pa.table(data=[array.arrow_array], names=["array"])
column_names: Iterable[str] = ["array"]
else:
table: KiaraTable = value.data
arrow_table = table.arrow_table
column_names = table.column_names
columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]
query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
rel_from_arrow = duckdb.arrow(arrow_table)
query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)
result_table = query_result.arrow()
wrap = ArrowTabularWrap(table=result_table)
related_scenes: Dict[str, Union[None, RenderScene]] = {}
row_offset = arrow_table.num_rows - input_number_of_rows
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < arrow_table.num_rows:
next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(arrow_table.num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
},
)
else:
related_scenes["first"] = None
related_scenes["previous"] = None
related_scenes["next"] = None
related_scenes["last"] = None
return wrap, related_scenes
preprocess_table(self, value, input_number_of_rows, input_row_offset)
¶Source code in tabular/modules/table/__init__.py
def preprocess_table(
self, value: Value, input_number_of_rows: int, input_row_offset: int
):
import duckdb
import pyarrow as pa
if value.data_type_name == "array":
array: KiaraArray = value.data
arrow_table = pa.table(data=[array.arrow_array], names=["array"])
column_names: Iterable[str] = ["array"]
else:
table: KiaraTable = value.data
arrow_table = table.arrow_table
column_names = table.column_names
columnns = [f'"{x}"' if not x.startswith('"') else x for x in column_names]
query = f"""SELECT {', '.join(columnns)} FROM data LIMIT {input_number_of_rows} OFFSET {input_row_offset}"""
rel_from_arrow = duckdb.arrow(arrow_table)
query_result: duckdb.DuckDBPyRelation = rel_from_arrow.query("data", query)
result_table = query_result.arrow()
wrap = ArrowTabularWrap(table=result_table)
related_scenes: Dict[str, Union[None, RenderScene]] = {}
row_offset = arrow_table.num_rows - input_number_of_rows
if row_offset > 0:
if input_row_offset > 0:
related_scenes["first"] = RenderScene.construct(
title="first",
description=f"Display the first {input_number_of_rows} rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": 0,
"number_of_rows": input_number_of_rows,
},
)
p_offset = input_row_offset - input_number_of_rows
if p_offset < 0:
p_offset = 0
previous = {
"row_offset": p_offset,
"number_of_rows": input_number_of_rows,
}
related_scenes["previous"] = RenderScene.construct(title="previous", description=f"Display the previous {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=previous) # type: ignore
else:
related_scenes["first"] = None
related_scenes["previous"] = None
n_offset = input_row_offset + input_number_of_rows
if n_offset < arrow_table.num_rows:
next = {"row_offset": n_offset, "number_of_rows": input_number_of_rows}
related_scenes["next"] = RenderScene.construct(title="next", description=f"Display the next {input_number_of_rows} rows of this table.", manifest_hash=self.manifest.manifest_hash, render_config=next) # type: ignore
else:
related_scenes["next"] = None
last_page = int(arrow_table.num_rows / input_number_of_rows)
current_start = last_page * input_number_of_rows
if (input_row_offset + input_number_of_rows) > arrow_table.num_rows:
related_scenes["last"] = None
else:
related_scenes["last"] = RenderScene.construct(
title="last",
description="Display the final rows of this table.",
manifest_hash=self.manifest.manifest_hash,
render_config={
"row_offset": current_start, # type: ignore
"number_of_rows": input_number_of_rows, # type: ignore
},
)
else:
related_scenes["first"] = None
related_scenes["previous"] = None
related_scenes["next"] = None
related_scenes["last"] = None
return wrap, related_scenes
filters
¶
TableFiltersModule (FilterModule)
¶Source code in tabular/modules/table/filters.py
class TableFiltersModule(FilterModule):
_module_type_name = "table.filters"
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
return "table"
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
if filter_name in ["select_columns", "drop_columns"]:
return {
"columns": {
"type": "list",
"doc": "The name of the columns to include.",
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore invalid column names.",
"default": True,
},
}
return None
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names_to_ignore = filter_inputs["columns"]
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
_column_names = []
_columns = []
for column_name in arrow_table.column_names:
if column_name in column_names_to_ignore:
continue
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):
pass
create_filter_inputs(self, filter_name)
¶Source code in tabular/modules/table/filters.py
def create_filter_inputs(self, filter_name: str) -> Union[None, ValueMapSchema]:
if filter_name in ["select_columns", "drop_columns"]:
return {
"columns": {
"type": "list",
"doc": "The name of the columns to include.",
"optional": True,
},
"ignore_invalid_column_names": {
"type": "boolean",
"doc": "Whether to ignore invalid column names.",
"default": True,
},
}
return None
filter__drop_columns(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__drop_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names_to_ignore = filter_inputs["columns"]
if not column_names_to_ignore:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
for column_name in column_names_to_ignore:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
_column_names = []
_columns = []
for column_name in arrow_table.column_names:
if column_name in column_names_to_ignore:
continue
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
filter__select_columns(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__select_columns(self, value: Value, filter_inputs: Mapping[str, Any]):
import pyarrow as pa
ignore_invalid = filter_inputs["ignore_invalid_column_names"]
column_names = filter_inputs["columns"]
if not column_names:
return value
table: KiaraTable = value.data
arrow_table = table.arrow_table
_column_names = []
_columns = []
for column_name in column_names:
if column_name not in arrow_table.column_names:
if ignore_invalid:
continue
else:
raise KiaraProcessingException(
f"Can't select column '{column_name}' from table: column name not available. Available columns: {', '.join(arrow_table.column_names)}."
)
column = arrow_table.column(column_name)
_column_names.append(column_name)
_columns.append(column)
return pa.table(data=_columns, names=_column_names)
filter__select_rows(self, value, filter_inputs)
¶Source code in tabular/modules/table/filters.py
def filter__select_rows(self, value: Value, filter_inputs: Mapping[str, Any]):
pass
retrieve_supported_type()
classmethod
¶Source code in tabular/modules/table/filters.py
@classmethod
def retrieve_supported_type(cls) -> Union[Dict[str, Any], str]:
return "table"
pipelines
special
¶
Default (empty) module that is used as a base path for pipelines contained in this package.
utils
¶
Functions¶
convert_arrow_column_types_to_sqlite(table)
¶
Source code in tabular/utils.py
def convert_arrow_column_types_to_sqlite(
table: "pa.Table",
) -> Dict[str, SqliteDataType]:
result: Dict[str, SqliteDataType] = {}
for column_name in table.column_names:
field = table.field(column_name)
sqlite_type = convert_arrow_type_to_sqlite(str(field.type))
result[column_name] = sqlite_type
return result
convert_arrow_type_to_sqlite(data_type)
¶
Source code in tabular/utils.py
def convert_arrow_type_to_sqlite(data_type: str) -> SqliteDataType:
if data_type.startswith("int") or data_type.startswith("uint"):
return "INTEGER"
if (
data_type.startswith("float")
or data_type.startswith("decimal")
or data_type.startswith("double")
):
return "REAL"
if data_type.startswith("time") or data_type.startswith("date"):
return "TEXT"
if data_type == "bool":
return "INTEGER"
if data_type in ["string", "utf8", "large_string", "large_utf8"]:
return "TEXT"
if data_type in ["binary", "large_binary"]:
return "BLOB"
raise Exception(f"Can't convert to sqlite type: {data_type}")
create_sqlite_schema_data_from_arrow_table(table, column_map=None, index_columns=None, nullable_columns=None, unique_columns=None, primary_key=None)
¶
Create a sql schema statement from an Arrow table object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
table |
pa.Table |
the Arrow table object |
required |
column_map |
Optional[Mapping[str, str]] |
a map that contains column names that should be changed in the new table |
None |
index_columns |
Optional[Iterable[str]] |
a list of column names (after mapping) to create module_indexes for |
None |
extra_column_info |
a list of extra schema instructions per column name (after mapping) |
required |
Source code in tabular/utils.py
def create_sqlite_schema_data_from_arrow_table(
table: "pa.Table",
column_map: Union[Mapping[str, str], None] = None,
index_columns: Union[Iterable[str], None] = None,
nullable_columns: Union[Iterable[str], None] = None,
unique_columns: Union[Iterable[str], None] = None,
primary_key: Union[str, None] = None,
) -> SqliteTableSchema:
"""Create a sql schema statement from an Arrow table object.
Arguments:
table: the Arrow table object
column_map: a map that contains column names that should be changed in the new table
index_columns: a list of column names (after mapping) to create module_indexes for
extra_column_info: a list of extra schema instructions per column name (after mapping)
"""
columns = convert_arrow_column_types_to_sqlite(table=table)
if column_map is None:
column_map = {}
temp: Dict[str, SqliteDataType] = {}
if index_columns is None:
index_columns = []
if nullable_columns is None:
nullable_columns = []
if unique_columns is None:
unique_columns = []
for cn, sqlite_data_type in columns.items():
if cn in column_map.keys():
new_key = column_map[cn]
index_columns = [
x if x not in column_map.keys() else column_map[x]
for x in index_columns
]
unique_columns = [
x if x not in column_map.keys() else column_map[x]
for x in unique_columns
]
nullable_columns = [
x if x not in column_map.keys() else column_map[x]
for x in nullable_columns
]
else:
new_key = cn
temp[new_key] = sqlite_data_type
columns = temp
if not columns:
raise Exception("Resulting table schema has no columns.")
else:
for ic in index_columns:
if ic not in columns.keys():
raise Exception(
f"Can't create schema, requested index column name not available: {ic}"
)
schema = SqliteTableSchema(
columns=columns,
index_columns=index_columns,
nullable_columns=nullable_columns,
unique_columns=unique_columns,
primary_key=primary_key,
)
return schema
create_sqlite_table_from_tabular_file(target_db_file, file_item, table_name=None, is_csv=True, is_tsv=False, is_nl=False, primary_key_column_names=None, flatten_nested_json_objects=False, csv_delimiter=None, quotechar=None, sniff=True, no_headers=False, encoding='utf-8', batch_size=100, detect_types=True)
¶
Source code in tabular/utils.py
def create_sqlite_table_from_tabular_file(
target_db_file: str,
file_item: FileModel,
table_name: Union[str, None] = None,
is_csv: bool = True,
is_tsv: bool = False,
is_nl: bool = False,
primary_key_column_names: Union[Iterable[str], None] = None,
flatten_nested_json_objects: bool = False,
csv_delimiter: Union[str, None] = None,
quotechar: Union[str, None] = None,
sniff: bool = True,
no_headers: bool = False,
encoding: str = "utf-8",
batch_size: int = 100,
detect_types: bool = True,
):
if not table_name:
table_name = file_item.file_name_without_extension
f = open(file_item.path, "rb")
try:
insert_upsert_implementation(
path=target_db_file,
table=table_name,
file=f,
pk=primary_key_column_names,
flatten=flatten_nested_json_objects,
nl=is_nl,
csv=is_csv,
tsv=is_tsv,
lines=False,
text=False,
convert=None,
imports=None,
delimiter=csv_delimiter,
quotechar=quotechar,
sniff=sniff,
no_headers=no_headers,
encoding=encoding,
batch_size=batch_size,
alter=False,
upsert=False,
ignore=False,
replace=False,
truncate=False,
not_null=None,
default=None,
detect_types=detect_types,
analyze=False,
load_extension=None,
silent=True,
bulk_sql=None,
)
except Exception as e:
log_exception(e)
finally:
f.close()
insert_db_table_from_file_bundle(database, file_bundle, table_name='file_items', include_content=True)
¶
Source code in tabular/utils.py
def insert_db_table_from_file_bundle(
database: KiaraDatabase,
file_bundle: FileBundle,
table_name: str = "file_items",
include_content: bool = True,
):
# TODO: check if table with that name exists
from sqlalchemy import Column, Integer, MetaData, String, Table, Text, insert
from sqlalchemy.engine import Engine
# if db_file_path is None:
# temp_f = tempfile.mkdtemp()
# db_file_path = os.path.join(temp_f, "db.sqlite")
#
# def cleanup():
# shutil.rmtree(db_file_path, ignore_errors=True)
#
# atexit.register(cleanup)
metadata_obj = MetaData()
file_items = Table(
table_name,
metadata_obj,
Column("id", Integer, primary_key=True),
Column("size", Integer(), nullable=False),
Column("mime_type", String(length=64), nullable=False),
Column("rel_path", String(), nullable=False),
Column("file_name", String(), nullable=False),
Column("content", Text(), nullable=not include_content),
)
engine: Engine = database.get_sqlalchemy_engine()
metadata_obj.create_all(engine)
with engine.connect() as con:
# TODO: commit in batches for better performance
for index, rel_path in enumerate(sorted(file_bundle.included_files.keys())):
f: FileModel = file_bundle.included_files[rel_path]
if not include_content:
content: Union[str, None] = f.read_text() # type: ignore
else:
content = None
_values = {
"id": index,
"size": f.size,
"mime_type": f.mime_type,
"rel_path": rel_path,
"file_name": f.file_name,
"content": content,
}
stmt = insert(file_items).values(**_values)
con.execute(stmt)
con.commit()