utils
CAMEL_TO_SNAKE_REGEX
¶
SUBCLASS_TYPE
¶
WORD_REGEX_PATTERN
¶
logger
¶
Functions¶
camel_case_to_snake_case(camel_text, repl='_')
¶
Source code in kiara/utils/__init__.py
def camel_case_to_snake_case(camel_text: str, repl: str = "_"):
return CAMEL_TO_SNAKE_REGEX.sub(repl, camel_text).lower()
check_valid_field_names(*field_names)
¶
Check whether the provided field names are all valid.
Returns:
Type | Description |
---|---|
List[str] |
an iterable of strings with invalid field names |
Source code in kiara/utils/__init__.py
def check_valid_field_names(*field_names) -> List[str]:
"""Check whether the provided field names are all valid.
Returns:
an iterable of strings with invalid field names
"""
return [x for x in field_names if x in INVALID_VALUE_NAMES or x.startswith("_")]
find_free_id(stem, current_ids, sep='_')
¶
Find a free var (or other name) based on a stem string, based on a list of provided existing names.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stem |
str |
the base string to use |
required |
current_ids |
Iterable[str] |
currently existing names |
required |
method |
str |
the method to create new names (allowed: 'count' -- for now) |
required |
method_args |
dict |
prototing_config for the creation method |
required |
Returns:
Type | Description |
---|---|
str |
a free name |
Source code in kiara/utils/__init__.py
def find_free_id(
stem: str,
current_ids: Iterable[str],
sep="_",
) -> str:
"""Find a free var (or other name) based on a stem string, based on a list of provided existing names.
Args:
stem (str): the base string to use
current_ids (Iterable[str]): currently existing names
method (str): the method to create new names (allowed: 'count' -- for now)
method_args (dict): prototing_config for the creation method
Returns:
str: a free name
"""
start_count = 1
if stem not in current_ids:
return stem
i = start_count
# new_name = None
while True:
new_name = f"{stem}{sep}{i}"
if new_name in current_ids:
i = i + 1
continue
break
return new_name
first_line(text)
¶
Source code in kiara/utils/__init__.py
def first_line(text: str):
if "\n" in text:
return text.split("\n")[0].strip()
else:
return text
get_auto_workflow_alias(module_type, use_incremental_ids=False)
¶
Return an id for a workflow obj of a provided module class.
If 'use_incremental_ids' is set to True, a unique id is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_type |
str |
the name of the module type |
required |
use_incremental_ids |
bool |
whether to return a unique (incremental) id |
False |
Returns:
Type | Description |
---|---|
str |
a module id |
Source code in kiara/utils/__init__.py
def get_auto_workflow_alias(module_type: str, use_incremental_ids: bool = False) -> str:
"""Return an id for a workflow obj of a provided module class.
If 'use_incremental_ids' is set to True, a unique id is returned.
Args:
module_type (str): the name of the module type
use_incremental_ids (bool): whether to return a unique (incremental) id
Returns:
str: a module id
"""
if not use_incremental_ids:
return module_type
nr = _AUTO_MODULE_ID.setdefault(module_type, 0)
_AUTO_MODULE_ID[module_type] = nr + 1
return f"{module_type}_{nr}"
get_dev_config()
¶
Source code in kiara/utils/__init__.py
def get_dev_config() -> "KiaraDevSettings":
from kiara.utils.develop import KIARA_DEV_SETTINGS
return KIARA_DEV_SETTINGS
is_debug()
¶
Source code in kiara/utils/__init__.py
def is_debug() -> bool:
debug = os.environ.get("DEBUG", "")
if debug.lower() == "true":
return True
else:
return False
is_develop()
¶
Source code in kiara/utils/__init__.py
def is_develop() -> bool:
develop = os.environ.get("DEVELOP", "")
if not develop:
develop = os.environ.get("DEV", "")
if develop and develop.lower() != "false":
return True
return False
is_jupyter()
¶
Source code in kiara/utils/__init__.py
def is_jupyter() -> bool:
try:
get_ipython # type: ignore
except NameError:
return False
ipython = get_ipython() # type: ignore # noqa
shell = ipython.__class__.__name__
if shell == "TerminalInteractiveShell":
return False
elif "google.colab" in str(ipython.__class__) or shell == "ZMQInteractiveShell":
return True
else:
return False
log_exception(exc)
¶
Source code in kiara/utils/__init__.py
def log_exception(exc: Exception):
if is_debug():
logger.exception(exc)
if is_develop():
from kiara.utils.develop import DetailLevel
config = get_dev_config()
if config.log.exc in [DetailLevel.NONE, "none"]:
return
show_locals = config.log.exc in [DetailLevel.FULL, "full"]
from kiara.interfaces import get_console
from kiara.utils.develop import log_dev_message
exc_info = sys.exc_info()
if not exc_info:
# TODO: create exc_info from exception?
if not is_debug():
logger.exception(exc)
else:
console = get_console()
log_dev_message(
Traceback.from_exception(
*exc_info, show_locals=show_locals, width=console.width - 4 # type: ignore
),
title="Exception details",
)
log_message(msg, **data)
¶
Source code in kiara/utils/__init__.py
def log_message(msg: str, **data):
if is_debug():
logger.debug(msg, **data)
# else:
# logger.debug(msg, **data)
to_camel_case(text)
¶
Source code in kiara/utils/__init__.py
def to_camel_case(text: str) -> str:
words = WORD_REGEX_PATTERN.split(text)
return "".join(w.title() for i, w in enumerate(words))
Modules¶
class_loading
¶
KiaraEntryPointItem
¶
KiaraEntryPointIterable
¶
SUBCLASS_TYPE
¶
logger
¶
Functions¶
find_all_archive_types()
¶
Find all KiaraArchive subclasses via package entry points.
Source code in kiara/utils/class_loading.py
def find_all_archive_types() -> Dict[str, Type["KiaraArchive"]]:
"""Find all [KiaraArchive][kiara.registries.KiaraArchive] subclasses via package entry points."""
from kiara.registries import KiaraArchive
return load_all_subclasses_for_entry_point(
entry_point_name="kiara.archive_type",
base_class=KiaraArchive, # type: ignore
type_id_key="_archive_type_name",
type_id_func=_cls_name_id_func,
attach_python_metadata=False,
)
find_all_cli_subcommands()
¶
Source code in kiara/utils/class_loading.py
def find_all_cli_subcommands():
entry_point_name = "kiara.cli_subcommands"
log2 = logging.getLogger("stevedore")
out_hdlr = logging.StreamHandler(sys.stdout)
out_hdlr.setFormatter(
logging.Formatter(
f"{entry_point_name} plugin search message/error -> %(message)s"
)
)
out_hdlr.setLevel(logging.INFO)
log2.addHandler(out_hdlr)
if is_debug():
log2.setLevel(logging.DEBUG)
else:
out_hdlr.setLevel(logging.INFO)
log2.setLevel(logging.INFO)
log_message("events.loading.entry_points", entry_point_name=entry_point_name)
mgr = ExtensionManager(
namespace=entry_point_name,
invoke_on_load=False,
propagate_map_exceptions=True,
)
return [plugin.plugin for plugin in mgr]
find_all_data_types()
¶
Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
Source code in kiara/utils/class_loading.py
def find_all_data_types() -> Dict[str, Type["DataType"]]:
"""Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
"""
from kiara.data_types import DataType
all_data_types = load_all_subclasses_for_entry_point(
entry_point_name="kiara.data_types",
base_class=DataType, # type: ignore
type_id_key="_data_type_name",
type_id_func=_cls_name_id_func,
)
invalid = [x for x in all_data_types.keys() if "." in x]
if invalid:
raise Exception(
f"Invalid value type name(s), type names can't contain '.': {', '.join(invalid)}"
)
return all_data_types
find_all_kiara_model_classes()
¶
Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
Source code in kiara/utils/class_loading.py
def find_all_kiara_model_classes() -> Dict[str, Type["KiaraModel"]]:
"""Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
"""
from kiara.models import KiaraModel
return load_all_subclasses_for_entry_point(
entry_point_name="kiara.model_classes",
base_class=KiaraModel, # type: ignore
type_id_key="_kiara_model_id",
type_id_func=_cls_name_id_func,
attach_python_metadata=False,
)
find_all_kiara_modules()
¶
Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
Source code in kiara/utils/class_loading.py
def find_all_kiara_modules() -> Dict[str, Type["KiaraModule"]]:
"""Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.
TODO
"""
from kiara.modules import KiaraModule
modules = load_all_subclasses_for_entry_point(
entry_point_name="kiara.modules",
base_class=KiaraModule, # type: ignore
type_id_key="_module_type_name",
attach_python_metadata=True,
)
result = {}
# need to test this, since I couldn't add an abstract method to the KiaraModule class itself (mypy complained because it is potentially overloaded)
for k, cls in modules.items():
if not hasattr(cls, "process"):
if is_develop():
msg = f"Invalid kiara module: **{cls.__module__}.{cls.__name__}**\n\nMissing method(s):\n- *process*"
log_dev_message(msg=Markdown(msg))
# TODO: check signature of process method
log_message(
"ignore.subclass",
sub_class=cls,
base_class=KiaraModule,
reason="'process' method is missing",
)
continue
result[k] = cls
return result
find_all_kiara_pipeline_paths(skip_errors=False)
¶
Source code in kiara/utils/class_loading.py
def find_all_kiara_pipeline_paths(
skip_errors: bool = False,
) -> Dict[str, Union[Mapping[str, Any], None]]:
import logging
log2 = logging.getLogger("stevedore")
out_hdlr = logging.StreamHandler(sys.stdout)
out_hdlr.setFormatter(
logging.Formatter("kiara pipeline search plugin error -> %(message)s")
)
out_hdlr.setLevel(logging.INFO)
log2.addHandler(out_hdlr)
log2.setLevel(logging.INFO)
log_message("events.loading.pipelines")
mgr = ExtensionManager(
namespace="kiara.pipelines", invoke_on_load=False, propagate_map_exceptions=True
)
paths: Dict[str, Union[Mapping[str, Any], None]] = {}
# TODO: make sure we load 'core' first?
for plugin in mgr:
name = plugin.name
if (
isinstance(plugin.plugin, tuple)
and len(plugin.plugin) >= 1
and callable(plugin.plugin[0])
) or callable(plugin.plugin):
try:
if callable(plugin.plugin):
func = plugin.plugin
args = []
else:
func = plugin.plugin[0]
args = plugin.plugin[1:]
f_args = []
metadata: Union[Mapping[str, Any], None] = None
if len(args) >= 1:
f_args.append(args[0])
if len(args) >= 2:
metadata = args[1]
assert isinstance(metadata, Mapping)
if len(args) > 3:
logger.debug(
"ignore.pipeline_lookup_arguments",
reason="more than 2 arguments provided",
surplus_args=args[2:],
path=f_args[0],
)
result = func(f_args[0])
if not result:
continue
if isinstance(result, str):
paths[result] = metadata
else:
for path in paths:
assert path not in paths.keys()
paths[path] = metadata
except Exception as e:
log_exception(e)
if skip_errors:
log_message(
"ignore.pipline_entrypoint", entrypoint_name=name, reason=str(e)
)
continue
raise Exception(f"Error trying to load plugin '{plugin.plugin}': {e}")
else:
if skip_errors:
log_message(
"ignore.pipline_entrypoint",
entrypoint_name=name,
reason=f"invalid plugin type '{type(plugin.plugin)}'",
)
continue
msg = f"Can't load pipelines for entrypoint '{name}': invalid plugin type '{type(plugin.plugin)}'"
raise Exception(msg)
return paths
find_all_operation_types()
¶
Source code in kiara/utils/class_loading.py
def find_all_operation_types() -> Dict[str, Type["OperationType"]]:
from kiara.operations import OperationType
result = load_all_subclasses_for_entry_point(
entry_point_name="kiara.operation_types",
base_class=OperationType, # type: ignore
type_id_key="_operation_type_name",
)
return result
find_data_types_under(module)
¶
Source code in kiara/utils/class_loading.py
def find_data_types_under(module: Union[str, ModuleType]) -> List[Type["DataType"]]:
from kiara.data_types import DataType
return find_subclasses_under(
base_class=DataType, # type: ignore
python_module=module,
)
find_kiara_model_classes_under(module)
¶
Source code in kiara/utils/class_loading.py
def find_kiara_model_classes_under(
module: Union[str, ModuleType]
) -> List[Type["KiaraModel"]]:
from kiara.models import KiaraModel
result = find_subclasses_under(
base_class=KiaraModel, # type: ignore
python_module=module,
)
return result
find_kiara_modules_under(module)
¶
Source code in kiara/utils/class_loading.py
def find_kiara_modules_under(
module: Union[str, ModuleType],
) -> List[Type["KiaraModule"]]:
from kiara.modules import KiaraModule
return find_subclasses_under(
base_class=KiaraModule, # type: ignore
python_module=module,
)
find_operations_under(module)
¶
Source code in kiara/utils/class_loading.py
def find_operations_under(
module: Union[str, ModuleType]
) -> List[Type["OperationType"]]:
from kiara.operations import OperationType
return find_subclasses_under(
base_class=OperationType, # type: ignore
python_module=module,
)
find_pipeline_base_path_for_module(module)
¶
Source code in kiara/utils/class_loading.py
def find_pipeline_base_path_for_module(
module: Union[str, ModuleType]
) -> Union[str, None]:
if hasattr(sys, "frozen"):
raise NotImplementedError("Pyinstaller bundling not supported yet.")
if isinstance(module, str):
module = importlib.import_module(module)
module_file = module.__file__
assert module_file is not None
path = os.path.dirname(module_file)
if not os.path.exists:
log_message("ignore.pipeline_folder", path=path, reason="folder does not exist")
return None
return path
find_subclasses_under(base_class, python_module)
¶
Find all (non-abstract) subclasses of a base class that live under a module (recursively).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_class |
Type[~SUBCLASS_TYPE] |
the parent class |
required |
python_module |
Union[str, module] |
the Python module to search |
required |
Returns:
Type | Description |
---|---|
List[Type[~SUBCLASS_TYPE]] |
a list of all subclasses |
Source code in kiara/utils/class_loading.py
def find_subclasses_under(
base_class: Type[SUBCLASS_TYPE],
python_module: Union[str, ModuleType],
) -> List[Type[SUBCLASS_TYPE]]:
"""Find all (non-abstract) subclasses of a base class that live under a module (recursively).
Arguments:
base_class: the parent class
python_module: the Python module to search
Returns:
a list of all subclasses
"""
if hasattr(sys, "frozen"):
raise NotImplementedError("Pyinstaller bundling not supported yet.")
try:
if isinstance(python_module, str):
python_module = importlib.import_module(python_module)
_import_modules_recursively(python_module)
except Exception as e:
log_exception(e)
log_message("ignore.python_module", module=str(python_module), reason=str(e))
return []
subclasses: Iterable[Type[SUBCLASS_TYPE]] = _get_all_subclasses(base_class)
result = []
for sc in subclasses:
if not sc.__module__.startswith(python_module.__name__):
continue
result.append(sc)
return result
load_all_subclasses_for_entry_point(entry_point_name, base_class, ignore_abstract_classes=True, type_id_key=None, type_id_func=None, type_id_no_attach=False, attach_python_metadata=False)
¶
Find all subclasses of a base class via package entry points.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entry_point_name |
str |
the entry point name to query entries for |
required |
base_class |
Type[~SUBCLASS_TYPE] |
the base class to look for |
required |
ignore_abstract_classes |
bool |
whether to include abstract classes in the result |
True |
type_id_key |
Optional[str] |
if provided, the found classes will have their id attached as an attribute, using the value of this as the name. if an attribute of this name already exists, it will be used as id without further processing |
None |
type_id_func |
Callable |
a function to take the found class as input, and returns a string representing the id of the class. By default, the module path + "." + class name (snake-case) is used (minus the string 'kiara_modules. |
None |
type_id_no_attach |
bool |
in case you want to use the type_id_key to set the id, but don't want it attached to classes that don't have it, set this to true. In most cases, you won't need this option |
False |
attach_python_metadata |
Union[bool, str] |
whether to attach a PythonClass metadata model to the class. By default, '_python_class' is used as attribute name if this argument is 'True', If this argument is a string, that will be used as name instead. |
False |
Source code in kiara/utils/class_loading.py
def load_all_subclasses_for_entry_point(
entry_point_name: str,
base_class: Type[SUBCLASS_TYPE],
ignore_abstract_classes: bool = True,
type_id_key: Union[str, None] = None,
type_id_func: Callable = None,
type_id_no_attach: bool = False,
attach_python_metadata: Union[bool, str] = False,
) -> Dict[str, Type[SUBCLASS_TYPE]]:
"""Find all subclasses of a base class via package entry points.
Arguments:
entry_point_name: the entry point name to query entries for
base_class: the base class to look for
ignore_abstract_classes: whether to include abstract classes in the result
type_id_key: if provided, the found classes will have their id attached as an attribute, using the value of this as the name. if an attribute of this name already exists, it will be used as id without further processing
type_id_func: a function to take the found class as input, and returns a string representing the id of the class. By default, the module path + "." + class name (snake-case) is used (minus the string 'kiara_modules.<project_name>'', if it exists at the beginning
type_id_no_attach: in case you want to use the type_id_key to set the id, but don't want it attached to classes that don't have it, set this to true. In most cases, you won't need this option
attach_python_metadata: whether to attach a [PythonClass][kiara.models.python_class.PythonClass] metadata model to the class. By default, '_python_class' is used as attribute name if this argument is 'True', If this argument is a string, that will be used as name instead.
"""
log2 = logging.getLogger("stevedore")
out_hdlr = logging.StreamHandler(sys.stdout)
out_hdlr.setFormatter(
logging.Formatter(
f"{entry_point_name} plugin search message/error -> %(message)s"
)
)
out_hdlr.setLevel(logging.INFO)
log2.addHandler(out_hdlr)
if is_debug():
log2.setLevel(logging.DEBUG)
else:
out_hdlr.setLevel(logging.INFO)
log2.setLevel(logging.INFO)
log_message("events.loading.entry_points", entry_point_name=entry_point_name)
mgr = ExtensionManager(
namespace=entry_point_name,
invoke_on_load=False,
propagate_map_exceptions=True,
)
result_entrypoints: Dict[str, Type[SUBCLASS_TYPE]] = {}
result_dynamic: Dict[str, Type[SUBCLASS_TYPE]] = {}
for plugin in mgr:
name = plugin.name
if isinstance(plugin.plugin, type):
# this means an actual (sub-)class was provided in the entrypoint
cls = plugin.plugin
if not issubclass(cls, base_class):
log_message(
"ignore.entrypoint",
entry_point=name,
base_class=base_class,
sub_class=plugin.plugin,
reason=f"Entry point reference not a subclass of '{base_class}'.",
)
continue
_process_subclass(
sub_class=cls,
base_class=base_class,
type_id_key=type_id_key,
type_id_func=type_id_func,
type_id_no_attach=type_id_no_attach,
attach_python_metadata=attach_python_metadata,
ignore_abstract_classes=ignore_abstract_classes,
)
result_entrypoints[name] = cls
elif (
isinstance(plugin.plugin, tuple)
and len(plugin.plugin) >= 1
and callable(plugin.plugin[0])
) or callable(plugin.plugin):
try:
if callable(plugin.plugin):
func = plugin.plugin
args = []
else:
func = plugin.plugin[0]
args = plugin.plugin[1:]
classes = func(*args)
except Exception as e:
log_exception(e)
raise Exception(f"Error trying to load plugin '{plugin.plugin}': {e}")
for sub_class in classes:
type_id = _process_subclass(
sub_class=sub_class,
base_class=base_class,
type_id_key=type_id_key,
type_id_func=type_id_func,
type_id_no_attach=type_id_no_attach,
attach_python_metadata=attach_python_metadata,
ignore_abstract_classes=ignore_abstract_classes,
)
if type_id is None:
continue
if type_id in result_dynamic.keys():
raise Exception(
f"Duplicate type id '{type_id}' for type {entry_point_name}: {result_dynamic[type_id]} -- {sub_class}"
)
result_dynamic[type_id] = sub_class
else:
raise Exception(
f"Can't load subclasses for entry point {entry_point_name} and base class {base_class}: invalid plugin type {type(plugin.plugin)}"
)
for k, v in result_dynamic.items():
if k in result_entrypoints.keys():
msg = f"Duplicate item name '{k}' for type {entry_point_name}: {v} -- {result_entrypoints[k]}."
try:
if type_id_key not in v.__dict__.keys():
msg = f"{msg} Most likely the name is picked up from a subclass, try to add a '{type_id_key}' class attribute to your implementing class, with the name you want to give your type as value."
except Exception:
pass
raise Exception(msg)
result_entrypoints[k] = v
return result_entrypoints
cli
special
¶
F
¶
FC
¶
Classes¶
OutputFormat (Enum)
¶
An enumeration.
Source code in kiara/utils/cli/__init__.py
class OutputFormat(Enum):
@classmethod
def as_dict(cls):
return {i.name: i.value for i in cls}
@classmethod
def keys_as_list(cls):
return cls._member_names_
@classmethod
def values_as_list(cls):
return [i.value for i in cls]
TERMINAL = "terminal"
HTML = "html"
JSON = "json"
JSON_INCL_SCHEMA = "json-incl-schema"
JSON_SCHEMA = "json-schema"
Functions¶
dict_from_cli_args(*args, *, list_keys=None)
¶
Source code in kiara/utils/cli/__init__.py
def dict_from_cli_args(
*args: str, list_keys: Union[Iterable[str], None] = None
) -> Dict[str, Any]:
if not args:
return {}
config: Dict[str, Any] = {}
for arg in args:
if "=" in arg:
key, value = arg.split("=", maxsplit=1)
try:
_v = json.loads(value)
except Exception:
_v = value
part_config = {key: _v}
elif os.path.isfile(os.path.realpath(os.path.expanduser(arg))):
path = os.path.realpath(os.path.expanduser(arg))
part_config = get_data_from_file(path)
assert isinstance(part_config, Mapping)
else:
try:
part_config = json.loads(arg)
assert isinstance(part_config, Mapping)
except Exception:
raise Exception(f"Could not parse argument into data: {arg}")
if list_keys is None:
list_keys = []
for k, v in part_config.items():
if k in list_keys:
config.setdefault(k, []).append(v)
else:
if k in config.keys():
logger.warning("duplicate.key", old_value=k, new_value=v)
config[k] = v
return config
is_rich_renderable(item)
¶
Source code in kiara/utils/cli/__init__.py
def is_rich_renderable(item: Any):
return isinstance(item, (ConsoleRenderable, RichCast, str))
output_format_option(*param_decls)
¶
Attaches an option to the command. All positional arguments are
passed as parameter declarations to :class:Option
; all keyword
arguments are forwarded unchanged (except cls
).
This is equivalent to creating an :class:Option
instance manually
and attaching it to the :attr:Command.params
list.
:param cls: the option class to instantiate. This defaults to
:class:Option
.
Source code in kiara/utils/cli/__init__.py
def output_format_option(*param_decls: str) -> Callable[[FC], FC]:
"""Attaches an option to the command. All positional arguments are
passed as parameter declarations to :class:`Option`; all keyword
arguments are forwarded unchanged (except ``cls``).
This is equivalent to creating an :class:`Option` instance manually
and attaching it to the :attr:`Command.params` list.
:param cls: the option class to instantiate. This defaults to
:class:`Option`.
"""
if not param_decls:
param_decls = ("--format", "-f")
attrs = {
"help": "The output format. Defaults to 'terminal'.",
"type": click.Choice(OutputFormat.values_as_list()),
}
def decorator(f: FC) -> FC:
# Issue 926, copy attrs, so pre-defined options can re-use the same cls=
option_attrs = attrs.copy()
OptionClass = option_attrs.pop("cls", None) or Option
_param_memo(f, OptionClass(param_decls, **option_attrs)) # type: ignore
return f
return decorator
render_json_schema_str(model)
¶
Source code in kiara/utils/cli/__init__.py
def render_json_schema_str(model: BaseModel):
try:
json_str = model.schema_json(option=orjson.OPT_INDENT_2)
except TypeError:
json_str = model.schema_json(indent=2)
return json_str
render_json_str(model)
¶
Source code in kiara/utils/cli/__init__.py
def render_json_str(model: BaseModel):
try:
json_str = model.json(option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
except TypeError:
json_str = model.json(indent=2)
return json_str
terminal_print(msg=None, in_panel=None, rich_config=None, empty_line_before=False, **config)
¶
Source code in kiara/utils/cli/__init__.py
def terminal_print(
msg: Any = None,
in_panel: Union[str, None] = None,
rich_config: Union[Mapping[str, Any], None] = None,
empty_line_before: bool = False,
**config: Any,
) -> None:
if msg is None:
msg = ""
console = get_console()
msg = extract_renderable(msg, render_config=config)
# if hasattr(msg, "create_renderable"):
# msg = msg.create_renderable(**config) # type: ignore
if in_panel is not None:
msg = Panel(msg, title_align="left", title=in_panel)
if empty_line_before:
console.print()
if rich_config:
console.print(msg, **rich_config)
else:
console.print(msg)
terminal_print_model(*models, *, format=None, empty_line_before=None, in_panel=None, **render_config)
¶
Source code in kiara/utils/cli/__init__.py
def terminal_print_model(
*models: BaseModel,
format: Union[None, OutputFormat, str] = None,
empty_line_before: Union[bool, None] = None,
in_panel: Union[str, None] = None,
**render_config: Any,
):
if format is None:
format = OutputFormat.TERMINAL
if isinstance(format, str):
format = OutputFormat(format)
if empty_line_before is None:
if format == OutputFormat.TERMINAL:
empty_line_before = True
else:
empty_line_before = False
if format == OutputFormat.TERMINAL:
if len(models) == 1:
terminal_print(
models[0],
in_panel=in_panel,
empty_line_before=empty_line_before,
**render_config,
)
else:
rg = []
if not models:
return
for model in models[0:-1]:
renderable = extract_renderable(model, render_config)
rg.append(renderable)
rg.append(Rule(style="b"))
last = extract_renderable(models[-1], render_config)
rg.append(last)
group = Group(*rg)
terminal_print(group, in_panel=in_panel, **render_config)
elif format == OutputFormat.JSON:
if len(models) == 1:
json_str = render_json_str(models[0])
syntax = Syntax(json_str, "json", background_color="default")
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
else:
json_strs = []
for model in models:
json_str = render_json_str(model)
json_strs.append(json_str)
json_str_full = "[" + ",\n".join(json_strs) + "]"
syntax = Syntax(json_str_full, "json", background_color="default")
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
elif format == OutputFormat.JSON_SCHEMA:
if len(models) == 1:
syntax = Syntax(
models[0].schema_json(option=orjson.OPT_INDENT_2),
"json",
background_color="default",
)
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
else:
json_strs = []
for model in models:
json_strs.append(render_json_schema_str(model))
json_str_full = "[" + ",\n".join(json_strs) + "]"
syntax = Syntax(json_str_full, "json", background_color="default")
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
elif format == OutputFormat.JSON_INCL_SCHEMA:
if len(models) == 1:
data = models[0].dict()
schema = models[0].schema()
all = {"data": data, "schema": schema}
json_str = orjson_dumps(all, option=orjson.OPT_INDENT_2)
syntax = Syntax(json_str, "json", background_color="default")
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
else:
all_data = []
for model in models:
data = model.dict()
schema = model.schema()
all_data.append({"data": data, "schema": schema})
json_str = orjson_dumps(all_data, option=orjson.OPT_INDENT_2)
# print(json_str)
syntax = Syntax(json_str, "json", background_color="default")
terminal_print(
syntax,
empty_line_before=empty_line_before,
rich_config={"soft_wrap": True},
)
elif format == OutputFormat.HTML:
all_html = ""
for model in models:
if hasattr(model, "create_html"):
html = model.create_html() # type: ignore
all_html = f"{all_html}\n{html}"
else:
raise NotImplementedError()
syntax = Syntax(all_html, "html", background_color="default")
terminal_print(
syntax, empty_line_before=empty_line_before, rich_config={"soft_wrap": True}
)
Modules¶
rich_click
¶
Functions¶
rich_format_operation_help(obj, ctx, operation)
¶Print nicely formatted help text using rich.
Based on original code from rich-cli, by @willmcgugan. https://github.com/Textualize/rich-cli/blob/8a2767c7a340715fc6fbf4930ace717b9b2fc5e5/src/rich_cli/main.py#L162-L236
Replacement for the click function format_help(). Takes a command or group and builds the help text output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
click.Command or click.Group |
Command or group to build help text for |
required |
ctx |
click.Context |
Click Context object |
required |
table |
a rich table, including all the inputs of the current operation |
required |
Source code in kiara/utils/cli/rich_click.py
def rich_format_operation_help(
obj: Union[click.Command, click.Group],
ctx: click.Context,
operation: KiaraOperation,
) -> None:
"""Print nicely formatted help text using rich.
Based on original code from rich-cli, by @willmcgugan.
https://github.com/Textualize/rich-cli/blob/8a2767c7a340715fc6fbf4930ace717b9b2fc5e5/src/rich_cli/__main__.py#L162-L236
Replacement for the click function format_help().
Takes a command or group and builds the help text output.
Args:
obj (click.Command or click.Group): Command or group to build help text for
ctx (click.Context): Click Context object
table: a rich table, including all the inputs of the current operation
"""
renderables: List[RenderableType] = []
# Header text if we have it
if HEADER_TEXT:
renderables.append(
Padding(_make_rich_rext(HEADER_TEXT, STYLE_HEADER_TEXT), (1, 1, 0, 1))
)
# Print usage
cmd_arg = ctx.params["module_or_operation"]
_cmd = f"[yellow bold]Usage: [/yellow bold][bold]kiara run [OPTIONS] [i]{cmd_arg}[/i] [INPUTS][/bold]"
renderables.append(Padding(_cmd, 1))
# renderables.append(obj.get_usage(ctx))
# renderables.append(Panel(Padding(highlighter(obj.get_usage(ctx)), 1), style=STYLE_USAGE_COMMAND, box=box.MINIMAL))
# Print command / group help if we have some
desc = operation.operation.doc.full_doc
renderables.append(
Padding(
Align(Markdown(desc), width=MAX_WIDTH, pad=False),
(0, 1, 1, 1),
)
)
# if obj.help:
#
# # Print with a max width and some padding
# renderables.append(
# Padding(
# Align(_get_help_text(obj), width=MAX_WIDTH, pad=False),
# (0, 1, 1, 1),
# )
# )
# Look through OPTION_GROUPS for this command
# stick anything unmatched into a default group at the end
option_groups = OPTION_GROUPS.get(ctx.command_path, []).copy()
option_groups.append({"options": []})
argument_group_options = []
for param in obj.get_params(ctx):
# Skip positional arguments - they don't have opts or helptext and are covered in usage
# See https://click.palletsprojects.com/en/8.0.x/documentation/#documenting-arguments
if type(param) is click.core.Argument and not SHOW_ARGUMENTS:
continue
# Skip if option is hidden
if getattr(param, "hidden", False):
continue
# Already mentioned in a config option group
for option_group in option_groups:
if any([opt in option_group.get("options", []) for opt in param.opts]):
break
# No break, no mention - add to the default group
else:
if type(param) is click.core.Argument and not GROUP_ARGUMENTS_OPTIONS:
argument_group_options.append(param.opts[0])
else:
list_of_option_groups: List = option_groups[-1]["options"] # type: ignore
list_of_option_groups.append(param.opts[0])
# If we're not grouping arguments and we got some, prepend before default options
if len(argument_group_options) > 0:
extra_option_group = {
"name": ARGUMENTS_PANEL_TITLE,
"options": argument_group_options,
}
option_groups.insert(len(option_groups) - 1, extra_option_group) # type: ignore
# Print each option group panel
for option_group in option_groups:
options_rows = []
for opt in option_group.get("options", []):
# Get the param
for param in obj.get_params(ctx):
if any([opt in param.opts]):
break
# Skip if option is not listed in this group
else:
continue
# Short and long form
opt_long_strs = []
opt_short_strs = []
for idx, opt in enumerate(param.opts):
opt_str = opt
try:
opt_str += "/" + param.secondary_opts[idx]
except IndexError:
pass
if "--" in opt:
opt_long_strs.append(opt_str)
else:
opt_short_strs.append(opt_str)
# Column for a metavar, if we have one
metavar = Text(style=STYLE_METAVAR, overflow="fold")
metavar_str = param.make_metavar()
# Do it ourselves if this is a positional argument
if type(param) is click.core.Argument and metavar_str == param.name.upper(): # type: ignore
metavar_str = param.type.name.upper()
# Skip booleans and choices (handled above)
if metavar_str != "BOOLEAN":
metavar.append(metavar_str)
# Range - from
# https://github.com/pallets/click/blob/c63c70dabd3f86ca68678b4f00951f78f52d0270/src/click/core.py#L2698-L2706 # noqa: E501
try:
# skip count with default range type
if isinstance(param.type, click.types._NumberRangeBase) and not (
param.count and param.type.min == 0 and param.type.max is None # type: ignore
):
range_str = param.type._describe_range()
if range_str:
metavar.append(RANGE_STRING.format(range_str))
except AttributeError:
# click.types._NumberRangeBase is only in Click 8x onwards
pass
# Required asterisk
required: RenderableType = ""
if param.required:
required = Text(REQUIRED_SHORT_STRING, style=STYLE_REQUIRED_SHORT)
# Highlighter to make [ | ] and <> dim
class MetavarHighlighter(RegexHighlighter):
highlights = [
r"^(?P<metavar_sep>(\[|<))",
r"(?P<metavar_sep>\|)",
r"(?P<metavar_sep>(\]|>)$)",
]
metavar_highlighter = MetavarHighlighter()
rows = [
required,
highlighter(highlighter(",".join(opt_long_strs))),
highlighter(highlighter(",".join(opt_short_strs))),
metavar_highlighter(metavar),
_get_parameter_help(param, ctx), # type: ignore
]
# Remove metavar if specified in config
if not SHOW_METAVARS_COLUMN:
rows.pop(3)
options_rows.append(rows)
if len(options_rows) > 0:
t_styles = {
"show_lines": STYLE_OPTIONS_TABLE_SHOW_LINES,
"leading": STYLE_OPTIONS_TABLE_LEADING,
"box": STYLE_OPTIONS_TABLE_BOX,
"border_style": STYLE_OPTIONS_TABLE_BORDER_STYLE,
"row_styles": STYLE_OPTIONS_TABLE_ROW_STYLES,
"pad_edge": STYLE_OPTIONS_TABLE_PAD_EDGE,
"padding": STYLE_OPTIONS_TABLE_PADDING,
}
t_styles.update(option_group.get("table_styles", {})) # type: ignore
box_style = getattr(box, t_styles.pop("box"), None) # type: ignore
options_table = Table(
highlight=True,
show_header=False,
expand=True,
box=box_style,
**t_styles, # type: ignore
)
# Strip the required column if none are required
if all([x[0] == "" for x in options_rows]):
options_rows = [x[1:] for x in options_rows]
for row in options_rows:
options_table.add_row(*row)
renderables.append(
Panel(
options_table,
border_style=STYLE_OPTIONS_PANEL_BORDER, # type: ignore
title=option_group.get("name", OPTIONS_PANEL_TITLE), # type: ignore
title_align=ALIGN_OPTIONS_PANEL, # type: ignore
width=MAX_WIDTH, # type: ignore
)
)
#
# Groups only:
# List click command groups
#
if hasattr(obj, "list_commands"):
# Look through COMMAND_GROUPS for this command
# stick anything unmatched into a default group at the end
cmd_groups = COMMAND_GROUPS.get(ctx.command_path, []).copy()
cmd_groups.append({"commands": []})
for command in obj.list_commands(ctx): # type: ignore
for cmd_group in cmd_groups:
if command in cmd_group.get("commands", []):
break
else:
commands: List = cmd_groups[-1]["commands"] # type: ignore
commands.append(command)
# Print each command group panel
for cmd_group in cmd_groups:
t_styles = {
"show_lines": STYLE_COMMANDS_TABLE_SHOW_LINES,
"leading": STYLE_COMMANDS_TABLE_LEADING,
"box": STYLE_COMMANDS_TABLE_BOX,
"border_style": STYLE_COMMANDS_TABLE_BORDER_STYLE,
"row_styles": STYLE_COMMANDS_TABLE_ROW_STYLES,
"pad_edge": STYLE_COMMANDS_TABLE_PAD_EDGE,
"padding": STYLE_COMMANDS_TABLE_PADDING,
}
t_styles.update(cmd_group.get("table_styles", {})) # type: ignore
box_style = getattr(box, t_styles.pop("box"), None) # type: ignore
commands_table = Table(
highlight=False,
show_header=False,
expand=True,
box=box_style, # type: ignore
**t_styles, # type: ignore
)
# Define formatting in first column, as commands don't match highlighter regex
commands_table.add_column(style="bold cyan", no_wrap=True)
for command in cmd_group.get("commands", []):
# Skip if command does not exist
if command not in obj.list_commands(ctx): # type: ignore
continue
cmd = obj.get_command(ctx, command) # type: ignore
assert cmd is not None
if cmd.hidden:
continue
# Use the truncated short text as with vanilla text if requested
if USE_CLICK_SHORT_HELP:
helptext = cmd.get_short_help_str()
else:
# Use short_help function argument if used, or the full help
helptext = cmd.short_help or cmd.help or ""
commands_table.add_row(command, _make_command_help(helptext))
if commands_table.row_count > 0:
renderables.append(
Panel(
commands_table,
border_style=STYLE_COMMANDS_PANEL_BORDER, # type: ignore
title=cmd_group.get("name", COMMANDS_PANEL_TITLE), # type: ignore
title_align=ALIGN_COMMANDS_PANEL, # type: ignore
width=MAX_WIDTH, # type: ignore
)
)
inputs_table = operation.create_renderable(
show_operation_name=False,
show_operation_doc=False,
show_inputs=True,
show_outputs_schema=False,
show_headers=False,
)
inputs_panel = Panel(
inputs_table,
title="Inputs",
border_style=STYLE_COMMANDS_PANEL_BORDER, # type: ignore
title_align=ALIGN_COMMANDS_PANEL, # type: ignore
width=MAX_WIDTH, # type: ignore
)
renderables.append(inputs_panel)
# Epilogue if we have it
if obj.epilog:
# Remove single linebreaks, replace double with single
lines = obj.epilog.split("\n\n")
epilogue = "\n".join([x.replace("\n", " ").strip() for x in lines])
renderables.append(
Padding(Align(highlighter(epilogue), width=MAX_WIDTH, pad=False), 1)
)
# Footer text if we have it
if FOOTER_TEXT:
renderables.append(
Padding(_make_rich_rext(FOOTER_TEXT, STYLE_FOOTER_TEXT), (1, 1, 0, 1))
)
group = Group(*renderables)
terminal_print(group)
concurrency
¶
Classes¶
ThreadSaveCounter
¶
A thread-safe counter, can be used in kiara modules to update completion percentage.
Source code in kiara/utils/concurrency.py
class ThreadSaveCounter(object):
"""A thread-safe counter, can be used in kiara modules to update completion percentage."""
def __init__(self):
self._current = 0
self._lock = threading.Lock()
@property
def current(self):
return self._current
def current_percent(self, total: int) -> int:
return int((self.current / total) * 100)
def increment(self):
with self._lock:
self._current += 1
return self._current
def decrement(self):
with self._lock:
self._current -= 1
return self._current
current
property
readonly
¶current_percent(self, total)
¶Source code in kiara/utils/concurrency.py
def current_percent(self, total: int) -> int:
return int((self.current / total) * 100)
decrement(self)
¶Source code in kiara/utils/concurrency.py
def decrement(self):
with self._lock:
self._current -= 1
return self._current
increment(self)
¶Source code in kiara/utils/concurrency.py
def increment(self):
with self._lock:
self._current += 1
return self._current
data
¶
logger
¶
pretty_print_data(kiara, value_id, target_type='terminal_renderable', **render_config)
¶
Source code in kiara/utils/data.py
def pretty_print_data(
kiara: "Kiara",
value_id: uuid.UUID,
target_type="terminal_renderable",
**render_config: Any,
) -> Any:
value = kiara.data_registry.get_value(value_id=value_id)
op_type: PrettyPrintOperationType = kiara.operation_registry.get_operation_type("pretty_print") # type: ignore
try:
op: Union[Operation, None] = op_type.get_operation_for_render_combination(
source_type=value.value_schema.type, target_type=target_type
)
except Exception as e:
logger.debug(
"error.pretty_print",
source_type=value.value_schema.type,
target_type=target_type,
error=e,
)
op = None
if target_type == "terminal_renderable":
try:
op = op_type.get_operation_for_render_combination(
source_type="any", target_type="string"
)
except Exception:
pass
if op is None:
raise Exception(
f"Can't find operation to render '{value.value_schema.type}' as '{target_type}."
)
result = op.run(kiara=kiara, inputs={"value": value})
rendered = result.get_value_data("rendered_value")
return rendered
render_value(kiara, value_id, target_type='terminal_renderable', render_instruction=None)
¶
Source code in kiara/utils/data.py
def render_value(
kiara: "Kiara",
value_id: uuid.UUID,
target_type="terminal_renderable",
render_instruction: Union[RenderInstruction, None] = None,
) -> RenderValueResult:
value = kiara.data_registry.get_value(value_id=value_id)
op_type: RenderValueOperationType = kiara.operation_registry.get_operation_type("render_value") # type: ignore
ops = op_type.get_render_operations_for_source_type(value.data_type_name)
if target_type not in ops.keys():
if not ops:
msg = f"No render operations registered for source type '{value.data_type_name}'."
else:
msg = f"Registered target types for source type '{value.data}': {', '.join(ops.keys())}."
raise Exception(
f"No render operation for source type '{value.data_type_name}' to target type '{target_type}' registered. {msg}"
)
op = ops[target_type]
result = op.run(
kiara=kiara, inputs={"value": value, "render_instruction": render_instruction}
)
return RenderValueResult(
rendered=result.get_value_data("rendered_value"),
metadata=result.get_value_data("render_metadata"),
)
db
¶
get_kiara_db_url(base_path)
¶
Source code in kiara/utils/db.py
def get_kiara_db_url(base_path: str):
abs_path = os.path.abspath(os.path.expanduser(base_path))
db_url = f"sqlite+pysqlite:///{abs_path}/kiara.db"
return db_url
orm_json_deserialize(obj)
¶
Source code in kiara/utils/db.py
def orm_json_deserialize(obj: str) -> Any:
return orjson.loads(obj)
orm_json_serialize(obj)
¶
Source code in kiara/utils/db.py
def orm_json_serialize(obj: Any) -> str:
if hasattr(obj, "json"):
return obj.json()
if isinstance(obj, str):
return obj
elif isinstance(obj, Mapping):
return orjson_dumps(obj, default=None)
else:
raise Exception(f"Unsupported type for json serialization: {type(obj)}")
debug
¶
DEFAULT_VALUE_MAP_RENDER_CONFIG
¶
create_module_preparation_table(kiara, job_config, job_id, **render_config)
¶
Source code in kiara/utils/debug.py
def create_module_preparation_table(
kiara: "Kiara", job_config: JobConfig, job_id: uuid.UUID, **render_config: Any
) -> Table:
dev_config = get_dev_config()
table = Table(show_header=False, box=box.SIMPLE)
table.add_column("key", style="i")
table.add_column("value")
table.add_row("job_id", str(job_id))
module_details = dev_config.log.pre_run.module_info
if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]:
table.add_row("module", job_config.module_type)
table.add_row(
"module desc",
kiara.context_info.module_types.item_infos[
job_config.module_type
].documentation.description,
)
elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]:
table.add_row("module", job_config.module_type)
table.add_row(
"module doc",
kiara.context_info.module_types.item_infos[
job_config.module_type
].documentation.full_doc,
)
if module_config_is_empty(job_config.module_config):
table.add_row("module_config", "-- no config --")
else:
module = kiara.module_registry.create_module(manifest=job_config)
table.add_row("module_config", module.config)
inputs_details = dev_config.log.pre_run.inputs_info
if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
render_config["show_type"] = False
value_map_rend = create_value_map_renderable(
value_map=job_config.inputs, **render_config
)
table.add_row("inputs", value_map_rend)
elif inputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
value_map = kiara.data_registry.load_values(values=job_config.inputs)
table.add_row("inputs", value_map.create_renderable(**render_config))
return table
create_post_run_table(kiara, job, module, job_config, **render_config)
¶
Source code in kiara/utils/debug.py
def create_post_run_table(
kiara: "Kiara",
job: ActiveJob,
module: "KiaraModule",
job_config: JobConfig,
**render_config: Any
) -> Table:
dev_config = get_dev_config()
table = Table(show_header=False, box=box.SIMPLE)
table.add_column("key", style="i")
table.add_column("value")
table.add_row("job_id", str(job.job_id))
module_details = dev_config.log.post_run.module_info
if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]:
table.add_row("module", module.module_type_name)
table.add_row(
"module desc",
kiara.context_info.module_types.item_infos[
module.module_type_name
].documentation.description,
)
elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]:
table.add_row("module", module.module_type_name)
table.add_row(
"module doc",
kiara.context_info.module_types.item_infos[
module.module_type_name
].documentation.full_doc,
)
if module_config_is_empty(module.config.dict()):
table.add_row("module_config", "-- no config --")
else:
table.add_row("module_config", module.config)
inputs_details = dev_config.log.post_run.inputs_info
if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
render_config["show_type"] = False
value_map_rend: RenderableType = create_value_map_renderable(
value_map=job_config.inputs, **render_config
)
table.add_row("inputs", value_map_rend)
elif inputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
value_map = kiara.data_registry.load_values(values=job_config.inputs)
table.add_row("inputs", value_map.create_renderable(**render_config))
outputs_details = dev_config.log.post_run.outputs_info
if outputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
if outputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
render_config["show_type"] = False
if job.results is None:
value_map_rend = "-- no results --"
else:
value_map_rend = create_value_map_renderable(
value_map=job.results, **render_config
)
table.add_row("outputs", value_map_rend)
elif outputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
if job.results is None:
value_map_rend = "-- no results --"
else:
value_map = kiara.data_registry.load_values(values=job.results)
value_map_rend = value_map.create_renderable(**render_config)
table.add_row("outputs", value_map_rend)
return table
create_value_map_renderable(value_map, **render_config)
¶
Source code in kiara/utils/debug.py
def create_value_map_renderable(value_map: Mapping[str, Any], **render_config: Any):
show_type = render_config.get("show_type", True)
rc = dict(DEFAULT_VALUE_MAP_RENDER_CONFIG)
rc.update(render_config)
table = Table(show_header=True, box=box.SIMPLE)
table.add_column("field name", style="i")
if show_type:
table.add_column("type")
table.add_column("value")
for k, v in value_map.items():
row: List[Any] = [k]
if isinstance(v, Value):
if show_type:
row.append("value object")
row.append(v.create_renderable(**rc))
elif isinstance(v, uuid.UUID):
if show_type:
row.append("value id")
row.append(str(v))
else:
if show_type:
row.append("raw data")
row.append(str(v))
table.add_row(*row)
return table
terminal_print_manifest(manifest)
¶
Source code in kiara/utils/debug.py
def terminal_print_manifest(manifest: Manifest):
terminal_print(manifest.create_renderable())
develop
special
¶
KIARA_DEV_SETTINGS
¶
Classes¶
DetailLevel (Enum)
¶
KiaraDevLogSettings (BaseModel)
pydantic-model
¶
Source code in kiara/utils/develop/__init__.py
class KiaraDevLogSettings(BaseModel):
PROFILES: ClassVar[Dict[str, Any]] = {
"full": {
"log_pre_run": True,
"pre_run": {
"pipeline_steps": True,
"module_info": "full",
"inputs_info": "full",
},
"log_post_run": True,
"post_run": {
"pipeline_steps": True,
"module_info": "minimal",
"inputs_info": "minimal",
"outputs_info": "full",
},
},
"internal": {
"pre_run": {"internal_modules": True},
"post_run": {"internal_modules": True},
},
}
class Config:
extra = Extra.forbid
validate_assignment = True
use_enum_values = True
exc: DetailLevel = Field(
description="How detailed to print exceptions", default=DetailLevel.MINIMAL
)
log_pre_run: bool = Field(
description="Print details about a module and its inputs before running it.",
default=True,
)
pre_run: PreRunMsgDetails = Field(
description="Fine-grained settings about what to display in the pre-run message.",
default_factory=PreRunMsgDetails,
)
log_post_run: bool = Field(
description="Print details about the results of a module run.", default=True
)
post_run: PostRunMsgDetails = Field(
description="Fine-grained settings aobut what to display in the post-run message.",
default_factory=PostRunMsgDetails,
)
Attributes¶
PROFILES: ClassVar[Dict[str, Any]]
¶exc: DetailLevel
pydantic-field
¶How detailed to print exceptions
log_post_run: bool
pydantic-field
¶Print details about the results of a module run.
log_pre_run: bool
pydantic-field
¶Print details about a module and its inputs before running it.
post_run: PostRunMsgDetails
pydantic-field
¶Fine-grained settings aobut what to display in the post-run message.
pre_run: PreRunMsgDetails
pydantic-field
¶Fine-grained settings about what to display in the pre-run message.
KiaraDevSettings (BaseSettings)
pydantic-model
¶
Source code in kiara/utils/develop/__init__.py
class KiaraDevSettings(BaseSettings):
class Config:
extra = Extra.forbid
validate_assignment = True
env_prefix = "dev_"
use_enum_values = True
env_nested_delimiter = "__"
@classmethod
def customise_sources(
cls,
init_settings,
env_settings,
file_secret_settings,
):
return (
init_settings,
profile_settings_source,
dev_config_file_settings_source,
env_settings,
)
log: KiaraDevLogSettings = Field(
description="Settings about what messages to print in 'develop' mode, and what details to include.",
default_factory=KiaraDevLogSettings,
)
job_cache: bool = Field(
description="Whether to always disable the job cache (ignores the runtime_job_cache setting in the kiara configuration).",
default=True,
)
def create_renderable(self, **render_config: Any):
from kiara.utils.output import create_recursive_table_from_model_object
return create_recursive_table_from_model_object(
self, render_config=render_config
)
Attributes¶
job_cache: bool
pydantic-field
¶Whether to always disable the job cache (ignores the runtime_job_cache setting in the kiara configuration).
log: KiaraDevLogSettings
pydantic-field
¶Settings about what messages to print in 'develop' mode, and what details to include.
Config
¶Source code in kiara/utils/develop/__init__.py
class Config:
extra = Extra.forbid
validate_assignment = True
env_prefix = "dev_"
use_enum_values = True
env_nested_delimiter = "__"
@classmethod
def customise_sources(
cls,
init_settings,
env_settings,
file_secret_settings,
):
return (
init_settings,
profile_settings_source,
dev_config_file_settings_source,
env_settings,
)
env_nested_delimiter
¶env_prefix
¶extra
¶use_enum_values
¶validate_assignment
¶customise_sources(init_settings, env_settings, file_secret_settings)
classmethod
¶Source code in kiara/utils/develop/__init__.py
@classmethod
def customise_sources(
cls,
init_settings,
env_settings,
file_secret_settings,
):
return (
init_settings,
profile_settings_source,
dev_config_file_settings_source,
env_settings,
)
create_renderable(self, **render_config)
¶Source code in kiara/utils/develop/__init__.py
def create_renderable(self, **render_config: Any):
from kiara.utils.output import create_recursive_table_from_model_object
return create_recursive_table_from_model_object(
self, render_config=render_config
)
PostRunMsgDetails (BaseModel)
pydantic-model
¶
Source code in kiara/utils/develop/__init__.py
class PostRunMsgDetails(BaseModel):
class Config:
extra = Extra.forbid
validate_assignment = True
use_enum_values = True
pipeline_steps: bool = Field(
description="Whether to also display information for modules that are run as part of a pipeline",
default=False,
)
module_info: DetailLevel = Field(
description="Whether to display details about the module that was run.",
default=DetailLevel.NONE,
)
internal_modules: bool = Field(
description="Whether to also print details about runs of internal module.",
default=False,
)
inputs_info: DetailLevel = Field(
description="Whether to display details about the run inputs.",
default=DetailLevel.NONE,
)
outputs_info: DetailLevel = Field(
description="Whether to display details about the run outputs.",
default=DetailLevel.MINIMAL,
)
Attributes¶
inputs_info: DetailLevel
pydantic-field
¶Whether to display details about the run inputs.
internal_modules: bool
pydantic-field
¶Whether to also print details about runs of internal module.
module_info: DetailLevel
pydantic-field
¶Whether to display details about the module that was run.
outputs_info: DetailLevel
pydantic-field
¶Whether to display details about the run outputs.
pipeline_steps: bool
pydantic-field
¶Whether to also display information for modules that are run as part of a pipeline
PreRunMsgDetails (BaseModel)
pydantic-model
¶
Source code in kiara/utils/develop/__init__.py
class PreRunMsgDetails(BaseModel):
class Config:
extra = Extra.forbid
validate_assignment = True
use_enum_values = True
pipeline_steps: bool = Field(
description="Whether to also display information for modules that are run as part of a pipeline.",
default=False,
)
module_info: DetailLevel = Field(
description="Whether to display details about the module to be run.",
default=DetailLevel.MINIMAL,
)
internal_modules: bool = Field(
description="Whether to also print details about runs of internal modules.",
default=False,
)
inputs_info: DetailLevel = Field(
description="Whether to display details about the run inputs.",
default=DetailLevel.MINIMAL,
)
Attributes¶
inputs_info: DetailLevel
pydantic-field
¶Whether to display details about the run inputs.
internal_modules: bool
pydantic-field
¶Whether to also print details about runs of internal modules.
module_info: DetailLevel
pydantic-field
¶Whether to display details about the module to be run.
pipeline_steps: bool
pydantic-field
¶Whether to also display information for modules that are run as part of a pipeline.
Functions¶
dev_config_file_settings_source(settings)
¶
A simple settings source that loads variables from a JSON file at the project's root.
Here we happen to choose to use the env_file_encoding
from Config
when reading config.json
Source code in kiara/utils/develop/__init__.py
def dev_config_file_settings_source(settings: BaseSettings) -> Dict[str, Any]:
"""
A simple settings source that loads variables from a JSON file
at the project's root.
Here we happen to choose to use the `env_file_encoding` from Config
when reading `config.json`
"""
if os.path.exists(KIARA_DEV_CONFIG_FILE):
dev_config = get_data_from_file(KIARA_DEV_CONFIG_FILE)
else:
dev_config = {}
return dev_config
log_dev_message(msg, title=None)
¶
Source code in kiara/utils/develop/__init__.py
def log_dev_message(msg: RenderableType, title: Union[str, None] = None):
if not is_develop():
return
if not title:
title = "Develop-mode message"
panel = Panel(Group("", msg), title=f"[yellow]{title}[/yellow]", title_align="left")
from kiara.utils.cli import terminal_print
terminal_print(panel)
profile_settings_source(settings)
¶
Source code in kiara/utils/develop/__init__.py
def profile_settings_source(settings: BaseSettings) -> Dict[str, Any]:
profile_name = os.environ.get("DEVELOP", None)
if not profile_name:
profile_name = os.environ.get("develop", None)
if not profile_name:
profile_name = os.environ.get("DEV", None)
if not profile_name:
profile_name = os.environ.get("dev", None)
if not profile_name:
profile_name = os.environ.get("DEV_PROFILE", None)
if not profile_name:
profile_name = os.environ.get("dev_profile", None)
result: Dict[str, Any] = {}
if not profile_name:
return result
profile_name = profile_name.lower()
from pydantic.fields import ModelField
model: ModelField
for model in KiaraDevSettings.__fields__.values():
if not issubclass(model.type_, BaseModel):
continue
profiles = getattr(model.type_, "PROFILES", None)
if not profiles:
continue
p = profiles.get(profile_name, None)
if not p:
continue
result[model.name] = p
return result
dicts
¶
merge_dicts(*dicts)
¶
Source code in kiara/utils/dicts.py
def merge_dicts(*dicts: Mapping[str, Any]) -> Dict[str, Any]:
if not dicts:
return {}
current: Dict[str, Any] = {}
for d in dicts:
dpath.util.merge(current, copy.deepcopy(d))
return current
doc
¶
extract_doc_from_cls(cls, only_first_line=False)
¶
Source code in kiara/utils/doc.py
def extract_doc_from_cls(cls: typing.Type, only_first_line: bool = False):
doc = cls.__doc__
if not doc:
doc = DEFAULT_NO_DESC_VALUE
else:
doc = cleandoc(doc)
if only_first_line:
return first_line(doc)
else:
return doc.strip()
files
¶
yaml
¶
get_data_from_file(path, content_type=None)
¶
Source code in kiara/utils/files.py
def get_data_from_file(
path: Union[str, Path], content_type: Union[str, None] = None
) -> Any:
if isinstance(path, str):
path = Path(os.path.expanduser(path))
content = path.read_text()
if content_type:
assert content_type in ["json", "yaml"]
else:
if path.name.endswith(".json"):
content_type = "json"
elif path.name.endswith(".yaml") or path.name.endswith(".yml"):
content_type = "yaml"
else:
raise ValueError(
"Invalid data format, only 'json' or 'yaml' are supported currently."
)
if content_type == "json":
data = json.loads(content)
else:
data = yaml.load(content)
return data
global_metadata
¶
get_metadata_for_python_module_or_class(module_or_class)
¶
Source code in kiara/utils/global_metadata.py
@lru_cache()
def get_metadata_for_python_module_or_class(
module_or_class: typing.Union[ModuleType, typing.Type]
) -> typing.List[typing.Dict[str, typing.Any]]:
metadata: typing.List[typing.Dict[str, typing.Any]] = []
if isinstance(module_or_class, type):
if hasattr(module_or_class, KIARA_MODULE_METADATA_ATTRIBUTE):
md = getattr(module_or_class, KIARA_MODULE_METADATA_ATTRIBUTE)
assert isinstance(md, typing.Mapping)
metadata.append(md) # type: ignore
_module_or_class: typing.Union[
str, ModuleType, typing.Type
] = module_or_class.__module__
else:
_module_or_class = module_or_class
current_module = _module_or_class
while current_module:
if isinstance(current_module, str):
current_module = importlib.import_module(current_module)
if hasattr(current_module, KIARA_MODULE_METADATA_ATTRIBUTE):
md = getattr(current_module, KIARA_MODULE_METADATA_ATTRIBUTE)
assert isinstance(md, typing.Mapping)
metadata.append(md) # type: ignore
if "." in current_module.__name__:
current_module = ".".join(current_module.__name__.split(".")[0:-1])
else:
current_module = ""
metadata.reverse()
return metadata
graphs
¶
print_ascii_graph(graph)
¶
Source code in kiara/utils/graphs.py
def print_ascii_graph(graph: nx.Graph):
try:
from asciinet import graph_to_ascii
except: # noqa
print(
"\nCan't print graph on terminal, package 'asciinet' not available. Please install it into the current virtualenv using:\n\npip install 'git+https://github.com/cosminbasca/asciinet.git#egg=asciinet&subdirectory=pyasciinet'"
)
return
try:
from asciinet._libutil import check_java
check_java("Java ")
except Exception as e:
print(e)
print(
"\nJava is currently necessary to print ascii graph. This might change in the future, but to use this functionality please install a JRE."
)
return
print(graph_to_ascii(graph))
hashfs
special
¶
HashFS is a content-addressable file management system. What does that mean? Simply, that HashFS manages a directory where files are saved based on the file's hash.
Typical use cases for this kind of system are ones where:
- Files are written once and never change (e.g. image storage).
- It's desirable to have no duplicate files (e.g. user uploads).
- File metadata is stored elsewhere (e.g. in a database).
Adapted from: https://github.com/dgilland/hashfs
License¶
The MIT License (MIT)
Copyright (c) 2015, Derrick Gilland
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Classes¶
HashAddress (HashAddress)
¶
File address containing file's path on disk and it's content hash ID.
Attributes:
Name | Type | Description |
---|---|---|
id |
str |
Hash ID (hexdigest) of file contents. |
relpath |
str |
Relative path location to :attr: |
abspath |
str |
Absoluate path location of file on disk. |
is_duplicate |
boolean |
Whether the hash address created was
a duplicate of a previously existing file. Can only be |
Source code in kiara/utils/hashfs/__init__.py
class HashAddress(
namedtuple("HashAddress", ["id", "relpath", "abspath", "is_duplicate"])
):
"""File address containing file's path on disk and it's content hash ID.
Attributes:
id (str): Hash ID (hexdigest) of file contents.
relpath (str): Relative path location to :attr:`HashFS.root`.
abspath (str): Absoluate path location of file on disk.
is_duplicate (boolean, optional): Whether the hash address created was
a duplicate of a previously existing file. Can only be ``True``
after a put operation. Defaults to ``False``.
"""
def __new__(cls, id, relpath, abspath, is_duplicate=False):
return super(HashAddress, cls).__new__(cls, id, relpath, abspath, is_duplicate) # type: ignore
HashFS
¶
Content addressable file manager.
Attributes:
Name | Type | Description |
---|---|---|
root |
str |
Directory path used as root of storage space. |
depth |
int |
Depth of subfolders to create when saving a file. |
width |
int |
Width of each subfolder to create when saving a file. |
algorithm |
str |
Hash algorithm to use when computing file hash.
Algorithm should be available in |
fmode |
int |
File mode permission to set when adding files to
directory. Defaults to |
dmode |
int |
Directory mode permission to set for
subdirectories. Defaults to |
Source code in kiara/utils/hashfs/__init__.py
class HashFS(object):
"""Content addressable file manager.
Attributes:
root (str): Directory path used as root of storage space.
depth (int, optional): Depth of subfolders to create when saving a
file.
width (int, optional): Width of each subfolder to create when saving a
file.
algorithm (str): Hash algorithm to use when computing file hash.
Algorithm should be available in ``hashlib`` module. Defaults to
``'sha256'``.
fmode (int, optional): File mode permission to set when adding files to
directory. Defaults to ``0o664`` which allows owner/group to
read/write and everyone else to read.
dmode (int, optional): Directory mode permission to set for
subdirectories. Defaults to ``0o755`` which allows owner/group to
read/write and everyone else to read and everyone to execute.
"""
def __init__(
self,
root: str,
depth: int = 4,
width: int = 1,
algorithm: str = "sha256",
fmode=0o664,
dmode=0o755,
):
self.root: str = os.path.realpath(root)
self.depth: int = depth
self.width: int = width
self.algorithm: str = algorithm
self.fmode = fmode
self.dmode = dmode
def put(self, file: BinaryIO) -> "HashAddress":
"""Store contents of `file` on disk using its content hash for the
address.
Args:
file (mixed): Readable object or path to file.
Returns:
HashAddress: File's hash address.
"""
stream = Stream(file)
with closing(stream):
id = self.computehash(stream)
filepath, is_duplicate = self._copy(stream, id)
return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)
def put_with_precomputed_hash(
self, file: Union[str, Path, BinaryIO], hash_id: str
) -> "HashAddress":
stream = Stream(file)
with closing(stream):
filepath, is_duplicate = self._copy(stream=stream, id=hash_id)
return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)
def _copy(self, stream: "Stream", id: str):
"""Copy the contents of `stream` onto disk with an optional file
extension appended. The copy process uses a temporary file to store the
initial contents and then moves that file to it's final location.
"""
filepath = self.idpath(id)
if not os.path.isfile(filepath):
# Only move file if it doesn't already exist.
is_duplicate = False
fname = self._mktempfile(stream)
self.makepath(os.path.dirname(filepath))
shutil.move(fname, filepath)
else:
is_duplicate = True
return (filepath, is_duplicate)
def _mktempfile(self, stream):
"""Create a named temporary file from a :class:`Stream` object and
return its filename.
"""
tmp = NamedTemporaryFile(delete=False)
if self.fmode is not None:
oldmask = os.umask(0)
try:
os.chmod(tmp.name, self.fmode)
finally:
os.umask(oldmask)
for data in stream:
tmp.write(to_bytes(data))
tmp.close()
return tmp.name
def get(self, file):
"""Return :class:`HashAdress` from given id or path. If `file` does not
refer to a valid file, then ``None`` is returned.
Args:
file (str): Address ID or path of file.
Returns:
HashAddress: File's hash address.
"""
realpath = self.realpath(file)
if realpath is None:
return None
else:
return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)
def open(self, file, mode="rb"):
"""Return open buffer object from given id or path.
Args:
file (str): Address ID or path of file.
mode (str, optional): Mode to open file in. Defaults to ``'rb'``.
Returns:
Buffer: An ``io`` buffer dependent on the `mode`.
Raises:
IOError: If file doesn't exist.
"""
realpath = self.realpath(file)
if realpath is None:
raise IOError("Could not locate file: {0}".format(file))
return io.open(realpath, mode)
def delete(self, file):
"""Delete file using id or path. Remove any empty directories after
deleting. No exception is raised if file doesn't exist.
Args:
file (str): Address ID or path of file.
"""
realpath = self.realpath(file)
if realpath is None:
return
try:
os.remove(realpath)
except OSError: # pragma: no cover
pass
else:
self.remove_empty(os.path.dirname(realpath))
def remove_empty(self, subpath):
"""Successively remove all empty folders starting with `subpath` and
proceeding "up" through directory tree until reaching the :attr:`root`
folder.
"""
# Don't attempt to remove any folders if subpath is not a
# subdirectory of the root directory.
if not self.haspath(subpath):
return
while subpath != self.root:
if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
break
os.rmdir(subpath)
subpath = os.path.dirname(subpath)
def files(self):
"""Return generator that yields all files in the :attr:`root`
directory.
"""
for folder, subfolders, files in walk(self.root):
for file in files:
yield os.path.abspath(os.path.join(folder, file))
def folders(self):
"""Return generator that yields all folders in the :attr:`root`
directory that contain files.
"""
for folder, subfolders, files in walk(self.root):
if files:
yield folder
def count(self):
"""Return count of the number of files in the :attr:`root` directory."""
count = 0
for _ in self:
count += 1
return count
def size(self):
"""Return the total size in bytes of all files in the :attr:`root`
directory.
"""
total = 0
for path in self.files():
total += os.path.getsize(path)
return total
def exists(self, file):
"""Check whether a given file id or path exists on disk."""
return bool(self.realpath(file))
def haspath(self, path):
"""Return whether `path` is a subdirectory of the :attr:`root`
directory.
"""
return issubdir(path, self.root)
def makepath(self, path):
"""Physically create the folder path on disk."""
try:
os.makedirs(path, self.dmode)
except FileExistsError:
assert os.path.isdir(path), "expected {} to be a directory".format(path)
def relpath(self, path):
"""Return `path` relative to the :attr:`root` directory."""
return os.path.relpath(path, self.root)
def realpath(self, file):
"""Attempt to determine the real path of a file id or path through
successive checking of candidate paths. If the real path is stored with
an extension, the path is considered a match if the basename matches
the expected file path of the id.
"""
# Check for absoluate path.
if os.path.isfile(file):
return file
# Check for relative path.
relpath = os.path.join(self.root, file)
if os.path.isfile(relpath):
return relpath
# Check for sharded path.
filepath = self.idpath(file)
if os.path.isfile(filepath):
return filepath
# Check for sharded path with any extension.
paths = glob.glob("{0}.*".format(filepath))
if paths:
return paths[0]
# Could not determine a match.
return None
def idpath(self, id):
"""Build the file path for a given hash id. Optionally, append a
file extension.
"""
paths = self.shard(id)
return os.path.join(self.root, *paths)
def computehash(self, stream) -> str:
"""Compute hash of file using :attr:`algorithm`."""
hashobj = hashlib.new(self.algorithm)
for data in stream:
hashobj.update(to_bytes(data))
return hashobj.hexdigest()
def shard(self, id):
"""Shard content ID into subfolders."""
return shard(id, self.depth, self.width)
def unshard(self, path):
"""Unshard path to determine hash value."""
if not self.haspath(path):
raise ValueError(
"Cannot unshard path. The path {0!r} is not "
"a subdirectory of the root directory {1!r}".format(path, self.root)
)
return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")
def repair(self):
"""Repair any file locations whose content address doesn't match it's
file path.
"""
repaired = []
corrupted = tuple(self.corrupted())
oldmask = os.umask(0)
try:
for path, address in corrupted:
if os.path.isfile(address.abspath):
# File already exists so just delete corrupted path.
os.remove(path)
else:
# File doesn't exists so move it.
self.makepath(os.path.dirname(address.abspath))
shutil.move(path, address.abspath)
os.chmod(address.abspath, self.fmode)
repaired.append((path, address))
finally:
os.umask(oldmask)
return repaired
def corrupted(self):
"""Return generator that yields corrupted files as ``(path, address)``
where ``path`` is the path of the corrupted file and ``address`` is
the :class:`HashAddress` of the expected location.
"""
for path in self.files():
stream = Stream(path)
with closing(stream):
id = self.computehash(stream)
expected_path = self.idpath(id)
if expected_path != path:
yield (
path,
HashAddress(id, self.relpath(expected_path), expected_path),
)
def __contains__(self, file):
"""Return whether a given file id or path is contained in the
:attr:`root` directory.
"""
return self.exists(file)
def __iter__(self):
"""Iterate over all files in the :attr:`root` directory."""
return self.files()
def __len__(self):
"""Return count of the number of files in the :attr:`root` directory."""
return self.count()
Methods¶
computehash(self, stream)
¶Compute hash of file using :attr:algorithm
.
Source code in kiara/utils/hashfs/__init__.py
def computehash(self, stream) -> str:
"""Compute hash of file using :attr:`algorithm`."""
hashobj = hashlib.new(self.algorithm)
for data in stream:
hashobj.update(to_bytes(data))
return hashobj.hexdigest()
corrupted(self)
¶Return generator that yields corrupted files as (path, address)
where path
is the path of the corrupted file and address
is
the :class:HashAddress
of the expected location.
Source code in kiara/utils/hashfs/__init__.py
def corrupted(self):
"""Return generator that yields corrupted files as ``(path, address)``
where ``path`` is the path of the corrupted file and ``address`` is
the :class:`HashAddress` of the expected location.
"""
for path in self.files():
stream = Stream(path)
with closing(stream):
id = self.computehash(stream)
expected_path = self.idpath(id)
if expected_path != path:
yield (
path,
HashAddress(id, self.relpath(expected_path), expected_path),
)
count(self)
¶Return count of the number of files in the :attr:root
directory.
Source code in kiara/utils/hashfs/__init__.py
def count(self):
"""Return count of the number of files in the :attr:`root` directory."""
count = 0
for _ in self:
count += 1
return count
delete(self, file)
¶Delete file using id or path. Remove any empty directories after deleting. No exception is raised if file doesn't exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
str |
Address ID or path of file. |
required |
Source code in kiara/utils/hashfs/__init__.py
def delete(self, file):
"""Delete file using id or path. Remove any empty directories after
deleting. No exception is raised if file doesn't exist.
Args:
file (str): Address ID or path of file.
"""
realpath = self.realpath(file)
if realpath is None:
return
try:
os.remove(realpath)
except OSError: # pragma: no cover
pass
else:
self.remove_empty(os.path.dirname(realpath))
exists(self, file)
¶Check whether a given file id or path exists on disk.
Source code in kiara/utils/hashfs/__init__.py
def exists(self, file):
"""Check whether a given file id or path exists on disk."""
return bool(self.realpath(file))
files(self)
¶Return generator that yields all files in the :attr:root
directory.
Source code in kiara/utils/hashfs/__init__.py
def files(self):
"""Return generator that yields all files in the :attr:`root`
directory.
"""
for folder, subfolders, files in walk(self.root):
for file in files:
yield os.path.abspath(os.path.join(folder, file))
folders(self)
¶Return generator that yields all folders in the :attr:root
directory that contain files.
Source code in kiara/utils/hashfs/__init__.py
def folders(self):
"""Return generator that yields all folders in the :attr:`root`
directory that contain files.
"""
for folder, subfolders, files in walk(self.root):
if files:
yield folder
get(self, file)
¶Return :class:HashAdress
from given id or path. If file
does not
refer to a valid file, then None
is returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
str |
Address ID or path of file. |
required |
Returns:
Type | Description |
---|---|
HashAddress |
File's hash address. |
Source code in kiara/utils/hashfs/__init__.py
def get(self, file):
"""Return :class:`HashAdress` from given id or path. If `file` does not
refer to a valid file, then ``None`` is returned.
Args:
file (str): Address ID or path of file.
Returns:
HashAddress: File's hash address.
"""
realpath = self.realpath(file)
if realpath is None:
return None
else:
return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)
haspath(self, path)
¶Return whether path
is a subdirectory of the :attr:root
directory.
Source code in kiara/utils/hashfs/__init__.py
def haspath(self, path):
"""Return whether `path` is a subdirectory of the :attr:`root`
directory.
"""
return issubdir(path, self.root)
idpath(self, id)
¶Build the file path for a given hash id. Optionally, append a file extension.
Source code in kiara/utils/hashfs/__init__.py
def idpath(self, id):
"""Build the file path for a given hash id. Optionally, append a
file extension.
"""
paths = self.shard(id)
return os.path.join(self.root, *paths)
makepath(self, path)
¶Physically create the folder path on disk.
Source code in kiara/utils/hashfs/__init__.py
def makepath(self, path):
"""Physically create the folder path on disk."""
try:
os.makedirs(path, self.dmode)
except FileExistsError:
assert os.path.isdir(path), "expected {} to be a directory".format(path)
open(self, file, mode='rb')
¶Return open buffer object from given id or path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
str |
Address ID or path of file. |
required |
mode |
str |
Mode to open file in. Defaults to |
'rb' |
Returns:
Type | Description |
---|---|
Buffer |
An |
Exceptions:
Type | Description |
---|---|
IOError |
If file doesn't exist. |
Source code in kiara/utils/hashfs/__init__.py
def open(self, file, mode="rb"):
"""Return open buffer object from given id or path.
Args:
file (str): Address ID or path of file.
mode (str, optional): Mode to open file in. Defaults to ``'rb'``.
Returns:
Buffer: An ``io`` buffer dependent on the `mode`.
Raises:
IOError: If file doesn't exist.
"""
realpath = self.realpath(file)
if realpath is None:
raise IOError("Could not locate file: {0}".format(file))
return io.open(realpath, mode)
put(self, file)
¶Store contents of file
on disk using its content hash for the
address.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file |
mixed |
Readable object or path to file. |
required |
Returns:
Type | Description |
---|---|
HashAddress |
File's hash address. |
Source code in kiara/utils/hashfs/__init__.py
def put(self, file: BinaryIO) -> "HashAddress":
"""Store contents of `file` on disk using its content hash for the
address.
Args:
file (mixed): Readable object or path to file.
Returns:
HashAddress: File's hash address.
"""
stream = Stream(file)
with closing(stream):
id = self.computehash(stream)
filepath, is_duplicate = self._copy(stream, id)
return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)
put_with_precomputed_hash(self, file, hash_id)
¶Source code in kiara/utils/hashfs/__init__.py
def put_with_precomputed_hash(
self, file: Union[str, Path, BinaryIO], hash_id: str
) -> "HashAddress":
stream = Stream(file)
with closing(stream):
filepath, is_duplicate = self._copy(stream=stream, id=hash_id)
return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)
realpath(self, file)
¶Attempt to determine the real path of a file id or path through successive checking of candidate paths. If the real path is stored with an extension, the path is considered a match if the basename matches the expected file path of the id.
Source code in kiara/utils/hashfs/__init__.py
def realpath(self, file):
"""Attempt to determine the real path of a file id or path through
successive checking of candidate paths. If the real path is stored with
an extension, the path is considered a match if the basename matches
the expected file path of the id.
"""
# Check for absoluate path.
if os.path.isfile(file):
return file
# Check for relative path.
relpath = os.path.join(self.root, file)
if os.path.isfile(relpath):
return relpath
# Check for sharded path.
filepath = self.idpath(file)
if os.path.isfile(filepath):
return filepath
# Check for sharded path with any extension.
paths = glob.glob("{0}.*".format(filepath))
if paths:
return paths[0]
# Could not determine a match.
return None
relpath(self, path)
¶Return path
relative to the :attr:root
directory.
Source code in kiara/utils/hashfs/__init__.py
def relpath(self, path):
"""Return `path` relative to the :attr:`root` directory."""
return os.path.relpath(path, self.root)
remove_empty(self, subpath)
¶Successively remove all empty folders starting with subpath
and
proceeding "up" through directory tree until reaching the :attr:root
folder.
Source code in kiara/utils/hashfs/__init__.py
def remove_empty(self, subpath):
"""Successively remove all empty folders starting with `subpath` and
proceeding "up" through directory tree until reaching the :attr:`root`
folder.
"""
# Don't attempt to remove any folders if subpath is not a
# subdirectory of the root directory.
if not self.haspath(subpath):
return
while subpath != self.root:
if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
break
os.rmdir(subpath)
subpath = os.path.dirname(subpath)
repair(self)
¶Repair any file locations whose content address doesn't match it's file path.
Source code in kiara/utils/hashfs/__init__.py
def repair(self):
"""Repair any file locations whose content address doesn't match it's
file path.
"""
repaired = []
corrupted = tuple(self.corrupted())
oldmask = os.umask(0)
try:
for path, address in corrupted:
if os.path.isfile(address.abspath):
# File already exists so just delete corrupted path.
os.remove(path)
else:
# File doesn't exists so move it.
self.makepath(os.path.dirname(address.abspath))
shutil.move(path, address.abspath)
os.chmod(address.abspath, self.fmode)
repaired.append((path, address))
finally:
os.umask(oldmask)
return repaired
shard(self, id)
¶Shard content ID into subfolders.
Source code in kiara/utils/hashfs/__init__.py
def shard(self, id):
"""Shard content ID into subfolders."""
return shard(id, self.depth, self.width)
size(self)
¶Return the total size in bytes of all files in the :attr:root
directory.
Source code in kiara/utils/hashfs/__init__.py
def size(self):
"""Return the total size in bytes of all files in the :attr:`root`
directory.
"""
total = 0
for path in self.files():
total += os.path.getsize(path)
return total
unshard(self, path)
¶Unshard path to determine hash value.
Source code in kiara/utils/hashfs/__init__.py
def unshard(self, path):
"""Unshard path to determine hash value."""
if not self.haspath(path):
raise ValueError(
"Cannot unshard path. The path {0!r} is not "
"a subdirectory of the root directory {1!r}".format(path, self.root)
)
return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")
Stream
¶
Common interface for file-like objects.
The input obj
can be a file-like object or a path to a file. If obj
is
a path to a file, then it will be opened until :meth:close
is called.
If obj
is a file-like object, then it's original position will be
restored when :meth:close
is called instead of closing the object
automatically. Closing of the stream is deferred to whatever process passed
the stream in.
Successive readings of the stream is supported without having to manually
set it's position back to 0
.
Source code in kiara/utils/hashfs/__init__.py
class Stream(object):
"""Common interface for file-like objects.
The input `obj` can be a file-like object or a path to a file. If `obj` is
a path to a file, then it will be opened until :meth:`close` is called.
If `obj` is a file-like object, then it's original position will be
restored when :meth:`close` is called instead of closing the object
automatically. Closing of the stream is deferred to whatever process passed
the stream in.
Successive readings of the stream is supported without having to manually
set it's position back to ``0``.
"""
def __init__(self, obj: Union[BinaryIO, str, Path]):
if hasattr(obj, "read"):
pos = obj.tell() # type: ignore
elif os.path.isfile(obj): # type: ignore
obj = io.open(obj, "rb") # type: ignore
pos = None
else:
raise ValueError("Object must be a valid file path or a readable object")
try:
file_stat = os.stat(obj.name) # type: ignore
buffer_size = file_stat.st_blksize
except Exception:
buffer_size = 8192
self._obj = obj
self._pos = pos
self._buffer_size = buffer_size
def __iter__(self):
"""Read underlying IO object and yield results. Return object to
original position if we didn't open it originally.
"""
self._obj.seek(0)
while True:
data = self._obj.read(self._buffer_size)
if not data:
break
yield data
if self._pos is not None:
self._obj.seek(self._pos)
def close(self):
"""Close underlying IO object if we opened it, else return it to
original position.
"""
if self._pos is None:
self._obj.close()
else:
self._obj.seek(self._pos)
Methods¶
close(self)
¶Close underlying IO object if we opened it, else return it to original position.
Source code in kiara/utils/hashfs/__init__.py
def close(self):
"""Close underlying IO object if we opened it, else return it to
original position.
"""
if self._pos is None:
self._obj.close()
else:
self._obj.seek(self._pos)
Functions¶
compact(items)
¶
Return only truthy elements of items
.
Source code in kiara/utils/hashfs/__init__.py
def compact(items: List[Any]) -> List[Any]:
"""Return only truthy elements of `items`."""
return [item for item in items if item]
issubdir(subpath, path)
¶
Return whether subpath
is a sub-directory of path
.
Source code in kiara/utils/hashfs/__init__.py
def issubdir(subpath: str, path: str):
"""Return whether `subpath` is a sub-directory of `path`."""
# Append os.sep so that paths like /usr/var2/log doesn't match /usr/var.
path = os.path.realpath(path) + os.sep
subpath = os.path.realpath(subpath)
return subpath.startswith(path)
shard(digest, depth, width)
¶
Source code in kiara/utils/hashfs/__init__.py
def shard(digest, depth, width):
# This creates a list of `depth` number of tokens with width
# `width` from the first part of the id plus the remainder.
return compact(
[digest[i * width : width * (i + 1)] for i in range(depth)] # noqa
+ [digest[depth * width :]] # noqa
)
to_bytes(text)
¶
Source code in kiara/utils/hashfs/__init__.py
def to_bytes(text: Union[str, bytes]):
if not isinstance(text, bytes):
text = bytes(text, "utf8")
return text
hashing
¶
NONE_CID
¶
compute_cid(data, hash_codec='sha2-256', encode='base58btc')
¶
Source code in kiara/utils/hashing.py
def compute_cid(
data: EncodableType,
hash_codec: str = "sha2-256",
encode: str = "base58btc",
) -> Tuple[bytes, CID]:
encoded = dag_cbor.encode(data)
hash_func = multihash.get(hash_codec)
digest = hash_func.digest(encoded)
cid = CID(encode, 1, codec="dag-cbor", digest=digest)
return encoded, cid
compute_cid_from_file(file, codec='raw', hash_codec='sha2-256')
¶
Source code in kiara/utils/hashing.py
def compute_cid_from_file(
file: str, codec: Union[str, int, Multicodec] = "raw", hash_codec: str = "sha2-256"
):
assert hash_codec == "sha2-256"
hash_func = hashlib.sha256
file_hash = hash_func()
CHUNK_SIZE = 65536
with open(file, "rb") as f:
fb = f.read(CHUNK_SIZE)
while len(fb) > 0:
file_hash.update(fb)
fb = f.read(CHUNK_SIZE)
wrapped = multihash.wrap(file_hash.digest(), "sha2-256")
return create_cid_digest(digest=wrapped, codec=codec)
create_cid_digest(digest, codec='raw')
¶
Source code in kiara/utils/hashing.py
def create_cid_digest(
digest: Union[
str, BytesLike, Tuple[Union[str, int, Multihash], Union[str, BytesLike]]
],
codec: Union[str, int, Multicodec] = "raw",
) -> CID:
cid = CID("base58btc", 1, codec, digest)
return cid
json
¶
orjson_dumps(v, *, default=None, **args)
¶
Source code in kiara/utils/json.py
def orjson_dumps(v, *, default=None, **args):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
try:
return orjson.dumps(v, default=default, **args).decode()
except Exception as e:
if is_debug():
print(f"Error dumping json data: {e}")
from kiara import dbg
dbg(v)
raise e
jupyter
¶
create_image(graph)
¶
Source code in kiara/utils/jupyter.py
def create_image(graph: nx.Graph):
try:
import pygraphviz as pgv # noqa
except: # noqa
return "pygraphviz not available, please install it manually into the current virtualenv"
# graph = nx.relabel_nodes(graph, lambda x: hash(x))
G = nx.nx_agraph.to_agraph(graph)
G.node_attr["shape"] = "box"
# G.unflatten().layout(prog="dot")
G.layout(prog="dot")
b = G.draw(format="png")
return b
graph_to_image(graph, return_bytes=False)
¶
Source code in kiara/utils/jupyter.py
def graph_to_image(graph: nx.Graph, return_bytes: bool = False):
b = create_image(graph=graph)
if return_bytes:
return b
else:
return Image(b)
save_image(graph, path)
¶
Source code in kiara/utils/jupyter.py
def save_image(graph: nx.Graph, path: str):
graph_b = create_image(graph=graph)
with open(path, "wb") as f:
f.write(graph_b)
metadata
¶
find_metadata_models(alias=None, only_for_package=None)
¶
Source code in kiara/utils/metadata.py
def find_metadata_models(
alias: Union[str, None] = None, only_for_package: Union[str, None] = None
) -> MetadataTypeClassesInfo:
model_registry = ModelRegistry.instance()
_group = model_registry.get_models_of_type(ValueMetadata) # type: ignore
classes: Dict[str, Type[ValueMetadata]] = {}
for model_id, info in _group.items():
classes[model_id] = info.python_class.get_class() # type: ignore
group: MetadataTypeClassesInfo = MetadataTypeClassesInfo.create_from_type_items(group_alias=alias, **classes) # type: ignore
if only_for_package:
temp = {}
for key, info in group.items():
if info.context.labels.get("package") == only_for_package:
temp[key] = info
group = MetadataTypeClassesInfo.construct(
group_id=group.instance_id, group_alias=group.group_alias, item_infos=temp # type: ignore
)
return group
models
¶
Functions¶
assemble_subcomponent_tree(data)
¶
Source code in kiara/utils/models.py
def assemble_subcomponent_tree(data: "KiaraModel") -> Union[nx.DiGraph, None]:
graph = nx.DiGraph()
def assemble_tree(info_model: KiaraModel, current_node_id, level: int = 0):
graph.add_node(current_node_id, obj=info_model, level=level)
scn = info_model.subcomponent_keys
if not scn:
return
for child_path in scn:
child_obj = info_model.get_subcomponent(child_path)
new_node_id = f"{current_node_id}.{child_path}"
graph.add_edge(current_node_id, new_node_id)
assemble_tree(child_obj, new_node_id, level + 1)
assemble_tree(data, KIARA_DEFAULT_ROOT_NODE_ID)
return graph
create_pydantic_model(model_cls, _use_pydantic_construct=False, **field_values)
¶
Source code in kiara/utils/models.py
def create_pydantic_model(
model_cls: Type[BaseModel],
_use_pydantic_construct: bool = PYDANTIC_USE_CONSTRUCT,
**field_values: Any,
):
if _use_pydantic_construct:
raise NotImplementedError()
return model_cls.construct(**field_values)
else:
return model_cls(**field_values)
get_subcomponent_from_model(data, path)
¶
Return subcomponents of a model under a specified path.
Source code in kiara/utils/models.py
def get_subcomponent_from_model(data: "KiaraModel", path: str) -> "KiaraModel":
"""Return subcomponents of a model under a specified path."""
if "." in path:
first_token, rest = path.split(".", maxsplit=1)
sc = data.get_subcomponent(first_token)
return sc.get_subcomponent(rest)
if hasattr(data, "__custom_root_type__") and data.__custom_root_type__:
if isinstance(data.__root__, Mapping): # type: ignore
if path in data.__root__.keys(): # type: ignore
return data.__root__[path] # type: ignore
else:
matches = {}
for k in data.__root__.keys(): # type: ignore
if k.startswith(f"{path}."):
rest = k[len(path) + 1 :] # noqa
matches[rest] = data.__root__[k] # type: ignore
if not matches:
raise KeyError(f"No child models under '{path}'.")
else:
raise NotImplementedError()
# subcomponent_group = KiaraModelGroup.create_from_child_models(**matches)
# return subcomponent_group
else:
raise NotImplementedError()
else:
if path in data.__fields__.keys():
return getattr(data, path)
else:
raise KeyError(
f"No subcomponent for key '{path}' in model: {data.instance_id}."
)
retrieve_data_subcomponent_keys(data)
¶
Source code in kiara/utils/models.py
def retrieve_data_subcomponent_keys(data: Any) -> Iterable[str]:
if hasattr(data, "__custom_root_type__") and data.__custom_root_type__:
if isinstance(data.__root__, Mapping): # type: ignore
result = set()
for k, v in data.__root__.items(): # type: ignore
if isinstance(v, BaseModel):
result.add(k.split(".")[0])
return result
else:
return []
elif isinstance(data, BaseModel):
matches = []
for x in data.__fields__.keys():
_type = data.__fields__[x].type_
if isinstance(_type, type) and issubclass(_type, BaseModel):
matches.append(x)
return matches
else:
log_message(
f"No subcomponents retrieval supported for data of type: {type(data)}"
)
return []
modules
¶
module_config_is_empty(config)
¶
Source code in kiara/utils/modules.py
def module_config_is_empty(config: Mapping[str, Any]):
c = dict(config)
d = c.pop("defaults", None)
if d:
return False
constants = c.pop("constants", None)
if constants:
return False
return False if c else True
operations
¶
create_operation(module_or_operation, operation_config=None, kiara=None)
¶
Source code in kiara/utils/operations.py
def create_operation(
module_or_operation: str,
operation_config: Union[None, Mapping[str, Any]] = None,
kiara: Union[None, "Kiara"] = None,
) -> Operation:
operation: Union[Operation, None]
if kiara is None:
from kiara.context import Kiara
kiara = Kiara.instance()
if module_or_operation in kiara.operation_registry.operation_ids:
operation = kiara.operation_registry.get_operation(module_or_operation)
if operation_config:
raise Exception(
f"Specified run target '{module_or_operation}' is an operation, additional module configuration is not allowed."
)
elif module_or_operation in kiara.module_type_names:
manifest = Manifest(
module_type=module_or_operation, module_config=operation_config
)
module = kiara.create_module(manifest=manifest)
operation = Operation.create_from_module(module)
elif os.path.isfile(module_or_operation):
data = get_data_from_file(module_or_operation)
pipeline_name = data.pop("pipeline_name", None)
if pipeline_name is None:
pipeline_name = os.path.basename(module_or_operation)
# self._defaults = data.pop("inputs", {})
execution_context = ExecutionContext(
pipeline_dir=os.path.abspath(os.path.dirname(module_or_operation))
)
pipeline_config = PipelineConfig.from_config(
pipeline_name=pipeline_name,
data=data,
kiara=kiara,
execution_context=execution_context,
)
manifest = kiara.create_manifest("pipeline", config=pipeline_config.dict())
module = kiara.create_module(manifest=manifest)
operation = Operation.create_from_module(module, doc=pipeline_config.doc)
else:
raise Exception(
f"Can't assemble operation, invalid operation/module name: {module_or_operation}. Must be registered module or operation name, or file."
)
# manifest = Manifest(
# module_type=module_or_operation,
# module_config=self._operation_config,
# )
# module = self._kiara.create_module(manifest=manifest)
# operation = Operation.create_from_module(module=module)
if operation is None:
merged = set(kiara.module_type_names)
merged.update(kiara.operation_registry.operation_ids)
raise NoSuchExecutionTargetException(
selected_target=module_or_operation,
msg=f"Invalid run target name '{module_or_operation}'. Must be a path to a pipeline file, or one of the available modules/operations.",
available_targets=sorted(merged),
)
return operation
filter_operations(kiara, pkg_name=None, **operations)
¶
Source code in kiara/utils/operations.py
def filter_operations(
kiara: "Kiara", pkg_name: Union[str, None] = None, **operations: "Operation"
) -> OperationGroupInfo:
result: Dict[str, OperationInfo] = {}
# op_infos = kiara.operation_registry.get_context_metadata(only_for_package=pkg_name)
modules = kiara.module_registry.get_context_metadata(only_for_package=pkg_name)
for op_id, op in operations.items():
if op.module.module_type_name != "pipeline":
if op.module.module_type_name in modules.keys():
result[op_id] = OperationInfo.create_from_operation(
kiara=kiara, operation=op
)
continue
else:
package: Union[str, None] = op.metadata.get("labels", {}).get(
"package", None
)
if not pkg_name or (package and package == pkg_name):
result[op_id] = OperationInfo.create_from_operation(
kiara=kiara, operation=op
)
# opt_types = kiara.operation_registry.find_all_operation_types(op_id)
# match = False
# for ot in opt_types:
# if ot in op_infos.keys():
# match = True
# break
#
# if match:
# result[op_id] = OperationInfo.create_from_operation(
# kiara=kiara, operation=op
# )
return OperationGroupInfo.construct(item_infos=result) # type: ignore
output
¶
log
¶
Classes¶
ArrowTabularWrap (TabularWrap)
¶
Source code in kiara/utils/output.py
class ArrowTabularWrap(TabularWrap):
def __init__(self, table: "ArrowTable"):
self._table: "ArrowTable" = table
super().__init__()
def retrieve_column_names(self) -> Iterable[str]:
return self._table.column_names
def retrieve_number_of_rows(self) -> int:
return self._table.num_rows
def slice(self, offset: int = 0, length: Union[int, None] = None):
return self._table.slice(offset=offset, length=length)
def to_pydict(self) -> Mapping:
return self._table.to_pydict()
retrieve_column_names(self)
¶Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:
return self._table.column_names
retrieve_number_of_rows(self)
¶Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:
return self._table.num_rows
slice(self, offset=0, length=None)
¶Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None):
return self._table.slice(offset=offset, length=length)
to_pydict(self)
¶Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:
return self._table.to_pydict()
DictTabularWrap (TabularWrap)
¶
Source code in kiara/utils/output.py
class DictTabularWrap(TabularWrap):
def __init__(self, data: Mapping[str, Any]):
self._data: Mapping[str, Any] = data
def retrieve_number_of_rows(self) -> int:
return len(self._data)
def retrieve_column_names(self) -> Iterable[str]:
return self._data.keys()
def to_pydict(self) -> Mapping:
return self._data
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
result = {}
start = None
end = None
for cn in self._data.keys():
if start is None:
if offset > len(self._data):
return DictTabularWrap({cn: [] for cn in self._data.keys()})
start = offset
if not length:
end = len(self._data)
else:
end = start + length
if end > len(self._data):
end = len(self._data)
result[cn] = self._data[cn][start:end]
return DictTabularWrap(result)
retrieve_column_names(self)
¶Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:
return self._data.keys()
retrieve_number_of_rows(self)
¶Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:
return len(self._data)
slice(self, offset=0, length=None)
¶Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
result = {}
start = None
end = None
for cn in self._data.keys():
if start is None:
if offset > len(self._data):
return DictTabularWrap({cn: [] for cn in self._data.keys()})
start = offset
if not length:
end = len(self._data)
else:
end = start + length
if end > len(self._data):
end = len(self._data)
result[cn] = self._data[cn][start:end]
return DictTabularWrap(result)
to_pydict(self)
¶Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:
return self._data
OutputDetails (BaseModel)
pydantic-model
¶
Source code in kiara/utils/output.py
class OutputDetails(BaseModel):
@classmethod
def from_data(cls, data: Any):
if isinstance(data, str):
if "=" in data:
data = [data]
else:
data = [f"format={data}"]
if isinstance(data, Iterable):
from kiara.utils.cli import dict_from_cli_args
data = list(data)
if len(data) == 1 and isinstance(data[0], str) and "=" not in data[0]:
data = [f"format={data[0]}"]
output_details_dict = dict_from_cli_args(*data)
else:
raise TypeError(
f"Can't parse output detail config: invalid input type '{type(data)}'."
)
output_details = OutputDetails(**output_details_dict)
return output_details
format: str = Field(description="The output format.")
target: str = Field(description="The output target.")
config: Dict[str, Any] = Field(
description="Output configuration.", default_factory=dict
)
@root_validator(pre=True)
def _set_defaults(cls, values):
target: str = values.pop("target", "terminal")
format: str = values.pop("format", None)
if format is None:
if target == "terminal":
format = "terminal"
else:
if target == "file":
format = "json"
else:
ext = target.split(".")[-1]
if ext in ["yaml", "json"]:
format = ext
else:
format = "json"
result = {"format": format, "target": target, "config": dict(values)}
return result
Attributes¶
config: Dict[str, Any]
pydantic-field
¶Output configuration.
format: str
pydantic-field
required
¶The output format.
target: str
pydantic-field
required
¶The output target.
from_data(data)
classmethod
¶Source code in kiara/utils/output.py
@classmethod
def from_data(cls, data: Any):
if isinstance(data, str):
if "=" in data:
data = [data]
else:
data = [f"format={data}"]
if isinstance(data, Iterable):
from kiara.utils.cli import dict_from_cli_args
data = list(data)
if len(data) == 1 and isinstance(data[0], str) and "=" not in data[0]:
data = [f"format={data[0]}"]
output_details_dict = dict_from_cli_args(*data)
else:
raise TypeError(
f"Can't parse output detail config: invalid input type '{type(data)}'."
)
output_details = OutputDetails(**output_details_dict)
return output_details
RenderConfig (BaseModel)
pydantic-model
¶
SqliteTabularWrap (TabularWrap)
¶
Source code in kiara/utils/output.py
class SqliteTabularWrap(TabularWrap):
def __init__(self, engine: "Engine", table_name: str):
self._engine: Engine = engine
self._table_name: str = table_name
super().__init__()
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._engine.connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
def retrieve_column_names(self) -> Iterable[str]:
from sqlalchemy import inspect
engine = self._engine
inspector = inspect(engine)
columns = inspector.get_columns(self._table_name)
result = [column["name"] for column in columns]
return result
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
def to_pydict(self) -> Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
retrieve_column_names(self)
¶Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:
from sqlalchemy import inspect
engine = self._engine
inspector = inspect(engine)
columns = inspector.get_columns(self._table_name)
result = [column["name"] for column in columns]
return result
retrieve_number_of_rows(self)
¶Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._engine.connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
slice(self, offset=0, length=None)
¶Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
to_pydict(self)
¶Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._engine.connect() as con:
result = con.execute(text(query))
result_dict: Dict[str, List[Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
TabularWrap (ABC)
¶
Source code in kiara/utils/output.py
class TabularWrap(ABC):
def __init__(self):
self._num_rows: Union[int, None] = None
self._column_names: Union[Iterable[str], None] = None
@property
def num_rows(self) -> int:
if self._num_rows is None:
self._num_rows = self.retrieve_number_of_rows()
return self._num_rows
@property
def column_names(self) -> Iterable[str]:
if self._column_names is None:
self._column_names = self.retrieve_column_names()
return self._column_names
@abstractmethod
def retrieve_column_names(self) -> Iterable[str]:
pass
@abstractmethod
def retrieve_number_of_rows(self) -> int:
pass
@abstractmethod
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
pass
@abstractmethod
def to_pydict(self) -> Mapping:
pass
def pretty_print(
self,
rows_head: Union[int, None] = None,
rows_tail: Union[int, None] = None,
max_row_height: Union[int, None] = None,
max_cell_length: Union[int, None] = None,
show_table_header: bool = True,
) -> RenderableType:
rich_table = RichTable(box=box.SIMPLE, show_header=show_table_header)
for cn in self.retrieve_column_names():
rich_table.add_column(cn)
num_split_rows = 2
if rows_head is not None:
if rows_head < 0:
rows_head = 0
if rows_head > self.retrieve_number_of_rows():
rows_head = self.retrieve_number_of_rows()
rows_tail = None
num_split_rows = 0
if rows_tail is not None:
if rows_head + rows_tail >= self.num_rows: # type: ignore
rows_head = self.retrieve_number_of_rows()
rows_tail = None
num_split_rows = 0
else:
num_split_rows = 0
if rows_head is not None:
head = self.slice(0, rows_head)
num_rows = rows_head
else:
head = self
num_rows = self.retrieve_number_of_rows()
table_dict = head.to_pydict()
for i in range(0, num_rows):
row = []
for cn in self.retrieve_column_names():
cell = table_dict[cn][i]
cell_str = str(cell)
if max_row_height and max_row_height > 0 and "\n" in cell_str:
lines = cell_str.split("\n")
if len(lines) > max_row_height:
if max_row_height == 1:
lines = lines[0:1]
else:
half = int(max_row_height / 2)
lines = lines[0:half] + [".."] + lines[-half:]
cell_str = "\n".join(lines)
if max_cell_length and max_cell_length > 0:
lines = []
for line in cell_str.split("\n"):
if len(line) > max_cell_length:
line = line[0:max_cell_length] + " ..."
else:
line = line
lines.append(line)
cell_str = "\n".join(lines)
row.append(cell_str)
rich_table.add_row(*row)
if num_split_rows:
for i in range(0, num_split_rows):
row = []
for _ in self.retrieve_column_names():
row.append("...")
rich_table.add_row(*row)
if rows_head:
if rows_tail is not None:
if rows_tail < 0:
rows_tail = 0
tail = self.slice(self.retrieve_number_of_rows() - rows_tail)
table_dict = tail.to_pydict()
for i in range(0, num_rows):
row = []
for cn in self.retrieve_column_names():
cell = table_dict[cn][i]
cell_str = str(cell)
if max_row_height and max_row_height > 0 and "\n" in cell_str:
lines = cell_str.split("\n")
if len(lines) > max_row_height:
if max_row_height == 1:
lines = lines[0:1]
else:
half = int(len(lines) / 2)
lines = lines[0:half] + [".."] + lines[-half:]
cell_str = "\n".join(lines)
if max_cell_length and max_cell_length > 0:
lines = []
for line in cell_str.split("\n"):
if len(line) > max_cell_length:
line = line[0:(max_cell_length)] + " ..."
else:
line = line
lines.append(line)
cell_str = "\n".join(lines)
row.append(cell_str)
rich_table.add_row(*row)
return rich_table
column_names: Iterable[str]
property
readonly
¶num_rows: int
property
readonly
¶pretty_print(self, rows_head=None, rows_tail=None, max_row_height=None, max_cell_length=None, show_table_header=True)
¶Source code in kiara/utils/output.py
def pretty_print(
self,
rows_head: Union[int, None] = None,
rows_tail: Union[int, None] = None,
max_row_height: Union[int, None] = None,
max_cell_length: Union[int, None] = None,
show_table_header: bool = True,
) -> RenderableType:
rich_table = RichTable(box=box.SIMPLE, show_header=show_table_header)
for cn in self.retrieve_column_names():
rich_table.add_column(cn)
num_split_rows = 2
if rows_head is not None:
if rows_head < 0:
rows_head = 0
if rows_head > self.retrieve_number_of_rows():
rows_head = self.retrieve_number_of_rows()
rows_tail = None
num_split_rows = 0
if rows_tail is not None:
if rows_head + rows_tail >= self.num_rows: # type: ignore
rows_head = self.retrieve_number_of_rows()
rows_tail = None
num_split_rows = 0
else:
num_split_rows = 0
if rows_head is not None:
head = self.slice(0, rows_head)
num_rows = rows_head
else:
head = self
num_rows = self.retrieve_number_of_rows()
table_dict = head.to_pydict()
for i in range(0, num_rows):
row = []
for cn in self.retrieve_column_names():
cell = table_dict[cn][i]
cell_str = str(cell)
if max_row_height and max_row_height > 0 and "\n" in cell_str:
lines = cell_str.split("\n")
if len(lines) > max_row_height:
if max_row_height == 1:
lines = lines[0:1]
else:
half = int(max_row_height / 2)
lines = lines[0:half] + [".."] + lines[-half:]
cell_str = "\n".join(lines)
if max_cell_length and max_cell_length > 0:
lines = []
for line in cell_str.split("\n"):
if len(line) > max_cell_length:
line = line[0:max_cell_length] + " ..."
else:
line = line
lines.append(line)
cell_str = "\n".join(lines)
row.append(cell_str)
rich_table.add_row(*row)
if num_split_rows:
for i in range(0, num_split_rows):
row = []
for _ in self.retrieve_column_names():
row.append("...")
rich_table.add_row(*row)
if rows_head:
if rows_tail is not None:
if rows_tail < 0:
rows_tail = 0
tail = self.slice(self.retrieve_number_of_rows() - rows_tail)
table_dict = tail.to_pydict()
for i in range(0, num_rows):
row = []
for cn in self.retrieve_column_names():
cell = table_dict[cn][i]
cell_str = str(cell)
if max_row_height and max_row_height > 0 and "\n" in cell_str:
lines = cell_str.split("\n")
if len(lines) > max_row_height:
if max_row_height == 1:
lines = lines[0:1]
else:
half = int(len(lines) / 2)
lines = lines[0:half] + [".."] + lines[-half:]
cell_str = "\n".join(lines)
if max_cell_length and max_cell_length > 0:
lines = []
for line in cell_str.split("\n"):
if len(line) > max_cell_length:
line = line[0:(max_cell_length)] + " ..."
else:
line = line
lines.append(line)
cell_str = "\n".join(lines)
row.append(cell_str)
rich_table.add_row(*row)
return rich_table
retrieve_column_names(self)
¶Source code in kiara/utils/output.py
@abstractmethod
def retrieve_column_names(self) -> Iterable[str]:
pass
retrieve_number_of_rows(self)
¶Source code in kiara/utils/output.py
@abstractmethod
def retrieve_number_of_rows(self) -> int:
pass
slice(self, offset=0, length=None)
¶Source code in kiara/utils/output.py
@abstractmethod
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
pass
to_pydict(self)
¶Source code in kiara/utils/output.py
@abstractmethod
def to_pydict(self) -> Mapping:
pass
Functions¶
create_pipeline_steps_tree(pipeline_structure, pipeline_details)
¶
Source code in kiara/utils/output.py
def create_pipeline_steps_tree(
pipeline_structure: "PipelineStructure", pipeline_details: "PipelineDetails"
) -> Tree:
from kiara.models.module.pipeline import StepStatus
steps = Tree("steps")
for idx, stage in enumerate(pipeline_structure.processing_stages, start=1):
stage_node = steps.add(f"stage: [i]{idx}[/i]")
for step_id in sorted(stage):
step_node = stage_node.add(f"step: [i]{step_id}[/i]")
step_details = pipeline_details.step_states[step_id]
status = step_details.status
if status is StepStatus.INPUTS_READY:
step_node.add("status: [yellow]inputs ready[/yellow]")
elif status is StepStatus.RESULTS_READY:
step_node.add("status: [green]results ready[/green]")
else:
invalid_node = step_node.add("status: [red]inputs invalid[/red]")
invalid = step_details.invalid_details
for k, v in invalid.items():
invalid_node.add(f"[i]{k}[/i]: {v}")
return steps
create_recursive_table_from_model_object(model, render_config=None)
¶
Source code in kiara/utils/output.py
def create_recursive_table_from_model_object(
model: BaseModel,
render_config: Union[Mapping[str, Any], None] = None,
):
if render_config is None:
render_config = {}
show_lines = render_config.get("show_lines", True)
show_header = render_config.get("show_header", True)
model_cls = model.__class__
table = RichTable(box=box.SIMPLE, show_lines=show_lines, show_header=show_header)
table.add_column("Field")
table.add_column("Value")
props = model_cls.schema().get("properties", {})
for field_name in sorted(model_cls.__fields__.keys()):
data = getattr(model, field_name)
p = props.get(field_name, None)
p_type = None
if p is not None:
p_type = p.get("type", None)
# TODO: check 'anyOf' keys
if p_type is not None:
p_type = f"[i]{p_type}[/i]"
desc = p.get("description", None)
if not isinstance(data, BaseModel):
data_renderable = extract_renderable(data, render_config=render_config)
sub_model = None
else:
sub_model = create_recursive_table_from_model_object(
data, render_config={"show_lines": True, "show_header": False}
)
data_renderable = None
group = []
if data_renderable:
group.append(data_renderable)
group.append("")
if desc:
group.append(f"[i]{desc}[/i]")
if sub_model:
group.append(sub_model)
if p_type:
field_name = f"[b i]{field_name}[/b i] ([i]{p_type}[/i])"
else:
field_name = f"[b i]{field_name}[/b i]"
table.add_row(field_name, Group(*group))
return table
create_renderable_from_values(values, config=None)
¶
Create a renderable for this module configuration.
Source code in kiara/utils/output.py
def create_renderable_from_values(
values: Mapping[str, "Value"], config: Union[Mapping[str, Any], None] = None
) -> RenderableType:
"""Create a renderable for this module configuration."""
if config is None:
config = {}
render_format = config.get("render_format", "terminal")
if render_format not in ["terminal"]:
raise Exception(f"Invalid render format: {render_format}")
show_pedigree = config.get("show_pedigree", False)
show_data = config.get("show_data", False)
show_hash = config.get("show_hash", True)
# show_load_config = config.get("show_load_config", False)
table = RichTable(show_lines=True, box=box.MINIMAL_DOUBLE_HEAD)
table.add_column("value_id", "i")
table.add_column("data_type")
table.add_column("size")
if show_hash:
table.add_column("hash")
if show_pedigree:
table.add_column("pedigree")
if show_data:
table.add_column("data")
for id, value in sorted(values.items(), key=lambda item: item[1].value_schema.type):
row: List[RenderableType] = [id, value.value_schema.type, str(value.value_size)]
if show_hash:
row.append(str(value.value_hash))
if show_pedigree:
if value.pedigree == ORPHAN:
pedigree = "-- n/a --"
else:
pedigree = value.pedigree.json(option=orjson.OPT_INDENT_2)
row.append(pedigree)
if show_data:
data = value._data_registry.pretty_print_data(
value_id=value.value_id, target_type="terminal_renderable", **config
)
row.append(data)
# if show_load_config:
# load_config = value.retrieve_load_config()
# if load_config is None:
# load_config_str: RenderableType = "-- not stored (yet) --"
# else:
# load_config_str = load_config.create_renderable()
# row.append(load_config_str)
table.add_row(*row)
return table
create_table_from_base_model_cls(model_cls)
¶
Source code in kiara/utils/output.py
def create_table_from_base_model_cls(model_cls: Type[BaseModel]):
table = RichTable(box=box.SIMPLE, show_lines=True)
table.add_column("Field")
table.add_column("Type")
table.add_column("Description")
table.add_column("Required")
table.add_column("Default")
props = model_cls.schema().get("properties", {})
for field_name, field in sorted(model_cls.__fields__.items()):
row = [field_name]
p = props.get(field_name, None)
p_type = None
if p is not None:
p_type = p.get("type", None)
# TODO: check 'anyOf' keys
if p_type is None:
p_type = "-- check source --"
row.append(p_type)
desc = p.get("description", "")
row.append(desc)
row.append("yes" if field.required else "no")
default = field.default
if callable(default):
default = default()
if default is None:
default = ""
else:
try:
default = json.dumps(default, indent=2)
except Exception:
default = str(default)
row.append(default)
table.add_row(*row)
return table
create_table_from_field_schemas(fields, _add_default=True, _add_required=True, _show_header=False, _constants=None)
¶
Source code in kiara/utils/output.py
def create_table_from_field_schemas(
fields: Mapping[str, "ValueSchema"],
_add_default: bool = True,
_add_required: bool = True,
_show_header: bool = False,
_constants: Union[Mapping[str, Any], None] = None,
) -> RichTable:
table = RichTable(box=box.SIMPLE, show_header=_show_header)
table.add_column("field name", style="i", overflow="fold")
table.add_column("type")
table.add_column("description")
if _add_required:
table.add_column("Required")
if _add_default:
if _constants:
table.add_column("Default / Constant")
else:
table.add_column("Default")
for field_name, schema in fields.items():
row: List[RenderableType] = [field_name, schema.type, schema.doc]
if _add_required:
req = schema.is_required()
if not req:
req_str = "no"
else:
if schema.default in [
None,
SpecialValue.NO_VALUE,
SpecialValue.NOT_SET,
]:
req_str = "[b]yes[b]"
else:
req_str = "no"
row.append(req_str)
if _add_default:
if _constants and field_name in _constants.keys():
d = f"[b]{_constants[field_name]}[/b] (constant)"
else:
if schema.default in [
None,
SpecialValue.NO_VALUE,
SpecialValue.NOT_SET,
]:
d = "-- no default --"
else:
d = str(schema.default)
row.append(d)
table.add_row(*row)
return table
create_table_from_model_object(model, render_config=None, exclude_fields=None)
¶
Source code in kiara/utils/output.py
def create_table_from_model_object(
model: BaseModel,
render_config: Union[Mapping[str, Any], None] = None,
exclude_fields: Union[Set[str], None] = None,
):
model_cls = model.__class__
table = RichTable(box=box.SIMPLE, show_lines=True)
table.add_column("Field")
table.add_column("Type")
table.add_column("Value")
table.add_column("Description")
props = model_cls.schema().get("properties", {})
for field_name, field in sorted(model_cls.__fields__.items()):
if exclude_fields and field_name in exclude_fields:
continue
row = [field_name]
p = props.get(field_name, None)
p_type = None
if p is not None:
p_type = p.get("type", None)
# TODO: check 'anyOf' keys
if p_type is None:
p_type = "-- check source --"
row.append(p_type)
data = getattr(model, field_name)
row.append(extract_renderable(data, render_config=render_config))
desc = p.get("description", "")
row.append(desc)
table.add_row(*row)
return table
create_value_map_status_renderable(inputs, render_config=None, fields=None)
¶
Source code in kiara/utils/output.py
def create_value_map_status_renderable(
inputs: ValueMap,
render_config: Union[Mapping[str, Any], None] = None,
fields: Union[None, Iterable[str]] = None,
) -> RichTable:
if render_config is None:
render_config = {}
show_description: bool = render_config.get("show_description", True)
show_type: bool = render_config.get("show_type", True)
show_required: bool = render_config.get("show_required", True)
show_default: bool = render_config.get("show_default", True)
show_value_ids: bool = render_config.get("show_value_ids", False)
table = RichTable(box=box.SIMPLE, show_header=True)
table.add_column("field name", style="i")
table.add_column("status", style="b")
if show_type:
table.add_column("type")
if show_description:
table.add_column("description")
if show_required:
table.add_column("required")
if show_default:
table.add_column("default")
if show_value_ids:
table.add_column("value id", overflow="fold")
invalid = inputs.check_invalid()
if fields:
field_order = fields
else:
field_order = sorted(inputs.keys())
for field_name in field_order:
value = inputs.get(field_name, None)
if value is None:
log.debug(
"ignore.field", field_name=field_name, available_fields=inputs.keys()
)
continue
row: List[RenderableType] = [field_name]
if field_name in invalid.keys():
row.append(f"[red]{invalid[field_name]}[/red]")
else:
row.append("[green]valid[/green]")
value_schema = inputs.values_schema[field_name]
if show_type:
row.append(value_schema.type)
if show_description:
row.append(value_schema.doc.description)
if show_required:
req = value_schema.is_required()
if not req:
req_str = "no"
else:
if value_schema.default in [
None,
SpecialValue.NO_VALUE,
SpecialValue.NOT_SET,
]:
req_str = "[b]yes[b]"
else:
req_str = "no"
row.append(req_str)
if show_default:
default = value_schema.default
if callable(default):
default_val = default()
else:
default_val = default
if default_val in [None, SpecialValue.NOT_SET, SpecialValue.NO_VALUE]:
default_str = ""
else:
default_str = str(default_val)
row.append(default_str)
if show_value_ids:
row.append(str(inputs.get_value_obj(field_name=field_name).value_id))
table.add_row(*row)
return table
extract_renderable(item, render_config=None)
¶
Try to automatically find and extract or create an object that is renderable by the 'rich' library.
Source code in kiara/utils/output.py
def extract_renderable(item: Any, render_config: Union[Mapping[str, Any], None] = None):
"""Try to automatically find and extract or create an object that is renderable by the 'rich' library."""
if render_config is None:
render_config = {}
else:
render_config = dict(render_config)
inline_models_as_json = render_config.setdefault("inline_models_as_json", True)
if hasattr(item, "create_renderable"):
return item.create_renderable(**render_config)
elif isinstance(item, (ConsoleRenderable, RichCast, str)):
return item
elif isinstance(item, BaseModel) and not inline_models_as_json:
return create_table_from_model_object(item)
elif isinstance(item, BaseModel):
return item.json(indent=2)
elif isinstance(item, Mapping) and not inline_models_as_json:
table = RichTable(show_header=False, box=box.SIMPLE)
table.add_column("Key", style="i")
table.add_column("Value")
for k, v in item.items():
table.add_row(k, extract_renderable(v, render_config=render_config))
return table
elif isinstance(item, Mapping):
result = {}
for k, v in item.items():
if isinstance(v, BaseModel):
v = v.dict()
result[k] = v
return orjson_dumps(
result, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS
)
elif isinstance(item, Iterable):
_all = []
for i in item:
_all.append(extract_renderable(i))
rg = Group(*_all)
return rg
elif isinstance(item, Enum):
return item.value
else:
return str(item)
pipelines
¶
Functions¶
check_doc_sidecar(path, data)
¶
Source code in kiara/utils/pipelines.py
def check_doc_sidecar(
path: Union[Path, str], data: Mapping[str, Any]
) -> Mapping[str, Any]:
if isinstance(path, str):
path = Path(os.path.expanduser(path))
_doc = data["data"].get("documentation", None)
if _doc is None:
_doc_path = Path(path.as_posix() + ".md")
if _doc_path.is_file():
doc = _doc_path.read_text()
if doc:
data["data"]["documentation"] = doc
return data
create_step_value_address(value_address_config, default_field_name)
¶
Source code in kiara/utils/pipelines.py
def create_step_value_address(
value_address_config: Union[str, Mapping[str, Any]],
default_field_name: str,
) -> "StepValueAddress":
if isinstance(value_address_config, StepValueAddress):
return value_address_config
sub_value: Union[Mapping[str, Any], None] = None
if isinstance(value_address_config, str):
tokens = value_address_config.split(".")
if len(tokens) == 1:
step_id = value_address_config
output_name = default_field_name
elif len(tokens) == 2:
step_id = tokens[0]
output_name = tokens[1]
elif len(tokens) == 3:
step_id = tokens[0]
output_name = tokens[1]
sub_value = {"config": tokens[2]}
else:
raise NotImplementedError()
elif isinstance(value_address_config, Mapping):
step_id = value_address_config["step_id"]
output_name = value_address_config["value_name"]
sub_value = value_address_config.get("sub_value", None)
else:
raise TypeError(
f"Invalid type for creating step value address: {type(value_address_config)}"
)
if sub_value is not None and not isinstance(sub_value, Mapping):
raise ValueError(
f"Invalid type '{type(sub_value)}' for sub_value (step_id: {step_id}, value name: {output_name}): {sub_value}"
)
input_link = StepValueAddress(
step_id=step_id, value_name=output_name, sub_value=sub_value
)
return input_link
ensure_step_value_addresses(link, default_field_name)
¶
Source code in kiara/utils/pipelines.py
def ensure_step_value_addresses(
link: Union[str, Mapping, Iterable], default_field_name: str
) -> List["StepValueAddress"]:
if isinstance(link, (str, Mapping)):
input_links: List[StepValueAddress] = [
create_step_value_address(
value_address_config=link, default_field_name=default_field_name
)
]
elif isinstance(link, Iterable):
input_links = []
for o in link:
il = create_step_value_address(
value_address_config=o, default_field_name=default_field_name
)
input_links.append(il)
else:
raise TypeError(f"Can't parse input map, invalid type for output: {link}")
return input_links
get_pipeline_details_from_path(path, module_type_name=None, base_module=None)
¶
Load a pipeline description, save it's content, and determine it the pipeline base name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[str, pathlib.Path] |
the path to the pipeline file |
required |
module_type_name |
Optional[str] |
if specifies, overwrites any auto-detected or assigned pipeline name |
None |
base_module |
Optional[str] |
overrides the base module the assembled pipeline module will be located in the python hierarchy |
None |
Source code in kiara/utils/pipelines.py
def get_pipeline_details_from_path(
path: Union[str, Path],
module_type_name: Union[str, None] = None,
base_module: Union[str, None] = None,
) -> Mapping[str, Any]:
"""Load a pipeline description, save it's content, and determine it the pipeline base name.
Arguments:
path: the path to the pipeline file
module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy
"""
if isinstance(path, str):
path = Path(os.path.expanduser(path))
if not path.is_file():
raise Exception(
f"Can't add pipeline description '{path.as_posix()}': not a file"
)
data = get_data_from_file(path)
if not data:
raise Exception(
f"Can't register pipeline file '{path.as_posix()}': no content."
)
if module_type_name:
data[MODULE_TYPE_NAME_KEY] = module_type_name
if not isinstance(data, Mapping):
raise Exception("Not a dictionary type.")
# filename = path.name
# name = data.get(MODULE_TYPE_NAME_KEY, None)
# if name is None:
# name = filename.split(".", maxsplit=1)[0]
result = {"data": data, "source": path.as_posix(), "source_type": "file"}
if base_module:
result["base_module"] = base_module
return result
string_vars
¶
log
¶
create_var_regex(delimiter_start=None, delimiter_end=None)
¶
Source code in kiara/utils/string_vars.py
def create_var_regex(
delimiter_start: Union[str, None] = None, delimiter_end: Union[str, None] = None
) -> Pattern:
if delimiter_start is None:
delimiter_start = "\\$\\{"
# TODO: make this smarter
if delimiter_end is None:
delimiter_end = "\\}"
regex = re.compile(delimiter_start + "\\s*(.+?)\\s*" + delimiter_end)
return regex
find_regex_matches_in_obj(source_obj, regex, current=None)
¶
Source code in kiara/utils/string_vars.py
def find_regex_matches_in_obj(
source_obj: Any, regex: Pattern, current: Union[Set[str], None] = None
) -> Set[str]:
if current is None:
current = set()
if not source_obj:
return current
if isinstance(source_obj, Mapping):
for k, v in source_obj.items():
find_regex_matches_in_obj(k, regex=regex, current=current)
find_regex_matches_in_obj(v, regex=regex, current=current)
elif isinstance(source_obj, str):
matches = regex.findall(source_obj)
current.update(matches)
elif isinstance(source_obj, Sequence):
for item in source_obj:
find_regex_matches_in_obj(item, regex=regex, current=current)
return current
find_var_names_in_obj(template_obj, delimiter=None, delimiter_end=None)
¶
Source code in kiara/utils/string_vars.py
def find_var_names_in_obj(
template_obj: Any,
delimiter: Union[Pattern, str, None] = None,
delimiter_end: Union[str, None] = None,
) -> Set[str]:
if isinstance(delimiter, Pattern):
regex = delimiter
else:
regex = create_var_regex(delimiter_start=delimiter, delimiter_end=delimiter_end)
var_names = find_regex_matches_in_obj(template_obj, regex=regex)
return var_names
replace_var_names_in_obj(template_obj, repl_dict, delimiter=None, delimiter_end=None, ignore_missing_keys=False)
¶
Source code in kiara/utils/string_vars.py
def replace_var_names_in_obj(
template_obj: Any,
repl_dict: typing.Mapping[str, Any],
delimiter: Union[Pattern, str, None] = None,
delimiter_end: Union[str, None] = None,
ignore_missing_keys: bool = False,
) -> Any:
if isinstance(delimiter, Pattern):
regex = delimiter
else:
regex = create_var_regex(delimiter_start=delimiter, delimiter_end=delimiter_end)
if not template_obj:
return template_obj
if isinstance(template_obj, Mapping):
result: Any = {}
for k, v in template_obj.items():
key = replace_var_names_in_obj(
template_obj=k,
repl_dict=repl_dict,
delimiter=regex,
ignore_missing_keys=ignore_missing_keys,
)
value = replace_var_names_in_obj(
template_obj=v,
repl_dict=repl_dict,
delimiter=regex,
ignore_missing_keys=ignore_missing_keys,
)
result[key] = value
elif isinstance(template_obj, str):
result = replace_var_names_in_string(
template_obj,
repl_dict=repl_dict,
regex=regex,
ignore_missing_keys=ignore_missing_keys,
)
elif isinstance(template_obj, Sequence):
result = []
for item in template_obj:
r = replace_var_names_in_obj(
item,
repl_dict=repl_dict,
delimiter=regex,
ignore_missing_keys=ignore_missing_keys,
)
result.append(r)
else:
result = template_obj
return result
replace_var_names_in_string(template_string, repl_dict, regex, ignore_missing_keys=False)
¶
Source code in kiara/utils/string_vars.py
def replace_var_names_in_string(
template_string: str,
repl_dict: typing.Mapping[str, Any],
regex: Pattern,
ignore_missing_keys: bool = False,
) -> str:
def sub(match):
key = match.groups()[0]
if key not in repl_dict.keys():
if not ignore_missing_keys:
raise Exception(
msg=f"Can't insert variable '{key}'. Key not in provided input values, available keys: {', '.join(repl_dict.keys())}",
)
else:
return match[0]
else:
result = repl_dict[key]
return result
result = regex.sub(sub, template_string)
return result
values
¶
augment_values(values, schemas, constants=None)
¶
Source code in kiara/utils/values.py
def augment_values(
values: Mapping[str, Any],
schemas: Mapping[str, ValueSchema],
constants: Union[Mapping[str, ValueSchema], None] = None,
) -> Dict[str, Any]:
# TODO: check if extra fields were provided
if constants:
for k, v in constants.items():
if k in values.keys():
raise Exception(f"Invalid input: value provided for constant '{k}'")
values_new = {}
if constants:
for field_name, schema in constants.items():
v = schema.default
assert v not in [None, SpecialValue.NO_VALUE, SpecialValue.NOT_SET]
if callable(v):
values_new[field_name] = v()
else:
values_new[field_name] = copy.deepcopy(v)
for field_name, schema in schemas.items():
if field_name in values_new.keys():
raise Exception(
f"Duplicate field '{field_name}', this is most likely a bug."
)
if field_name not in values.keys():
if schema.default != SpecialValue.NOT_SET:
if callable(schema.default):
values_new[field_name] = schema.default()
else:
values_new[field_name] = copy.deepcopy(schema.default)
else:
values_new[field_name] = SpecialValue.NOT_SET
else:
value = values[field_name]
if value is None:
value = SpecialValue.NO_VALUE
values_new[field_name] = value
return values_new
create_schema_dict(schema_config)
¶
Source code in kiara/utils/values.py
def create_schema_dict(
schema_config: Mapping[str, Union[ValueSchema, Mapping[str, Any]]],
) -> Mapping[str, ValueSchema]:
invalid = check_valid_field_names(*schema_config.keys())
if invalid:
raise Exception(
f"Can't assemble schema because it contains invalid input field name(s) '{', '.join(invalid)}'. Change the input schema to not contain any of the reserved keywords: {', '.join(INVALID_VALUE_NAMES)}"
)
result = {}
for k, v in schema_config.items():
if isinstance(v, ValueSchema):
result[k] = v
elif isinstance(v, Mapping):
_v = dict(v)
if "doc" not in _v.keys():
_v["doc"] = DEFAULT_NO_DESC_VALUE
schema = ValueSchema(**_v)
result[k] = schema
else:
if v is None:
msg = "None"
else:
msg = v.__class__
raise Exception(
f"Invalid return type '{msg}' for field '{k}' when trying to create schema."
)
return result
extract_raw_value(kiara, value_id)
¶
Source code in kiara/utils/values.py
def extract_raw_value(kiara: "Kiara", value_id: uuid.UUID):
value = kiara.data_registry.get_value(value_id=value_id)
if value.pedigree != ORPHAN:
# TODO: find alias?
return f'"value:{value_id}"'
else:
if value.value_schema.type == "string":
return f'"{value.data}"'
elif value.value_schema.type == "list":
return value.data.list_data
else:
return value.data
extract_raw_values(kiara, **value_ids)
¶
Source code in kiara/utils/values.py
def extract_raw_values(kiara: "Kiara", **value_ids: uuid.UUID) -> Dict[str, Any]:
result = {}
for field_name, value_id in value_ids.items():
result[field_name] = extract_raw_value(kiara=kiara, value_id=value_id)
return result
overlay_constants_and_defaults(schemas, defaults, constants)
¶
Source code in kiara/utils/values.py
def overlay_constants_and_defaults(
schemas: Mapping[str, ValueSchema],
defaults: Mapping[str, Any],
constants: Mapping[str, Any],
):
for k, v in schemas.items():
default_value = defaults.get(k, None)
constant_value = constants.get(k, None)
# value_to_test = None
if default_value is not None and constant_value is not None:
raise Exception(
f"Module configuration error. Value '{k}' set in both 'constants' and 'defaults', this is not allowed."
)
# TODO: perform validation for constants/defaults
if default_value is not None:
schemas[k].default = default_value
if constant_value is not None:
schemas[k].default = constant_value
schemas[k].is_constant = True
input_schemas = {}
constants = {}
for k, v in schemas.items():
if v.is_constant:
constants[k] = v
else:
input_schemas[k] = v
return input_schemas, constants
yaml
¶
StringYAML (YAML)
¶
Source code in kiara/utils/yaml.py
class StringYAML(YAML):
def dump(self, data, stream=None, **kw):
inefficient = False
if stream is None:
inefficient = True
stream = StringIO()
YAML.dump(self, data, stream, **kw)
if inefficient:
return stream.getvalue()
dump(self, data, stream=None, **kw)
¶Source code in kiara/utils/yaml.py
def dump(self, data, stream=None, **kw):
inefficient = False
if stream is None:
inefficient = True
stream = StringIO()
YAML.dump(self, data, stream, **kw)
if inefficient:
return stream.getvalue()