Skip to content

utils

CAMEL_TO_SNAKE_REGEX

SUBCLASS_TYPE

WORD_REGEX_PATTERN

logger

Functions

camel_case_to_snake_case(camel_text, repl='_')

Source code in kiara/utils/__init__.py
def camel_case_to_snake_case(camel_text: str, repl: str = "_"):
    return CAMEL_TO_SNAKE_REGEX.sub(repl, camel_text).lower()

check_valid_field_names(*field_names)

Check whether the provided field names are all valid.

Returns:

Type Description
List[str]

an iterable of strings with invalid field names

Source code in kiara/utils/__init__.py
def check_valid_field_names(*field_names) -> List[str]:
    """Check whether the provided field names are all valid.

    Returns:
        an iterable of strings with invalid field names
    """

    return [x for x in field_names if x in INVALID_VALUE_NAMES or x.startswith("_")]

find_free_id(stem, current_ids, sep='_')

Find a free var (or other name) based on a stem string, based on a list of provided existing names.

Parameters:

Name Type Description Default
stem str

the base string to use

required
current_ids Iterable[str]

currently existing names

required
method str

the method to create new names (allowed: 'count' -- for now)

required
method_args dict

prototing_config for the creation method

required

Returns:

Type Description
str

a free name

Source code in kiara/utils/__init__.py
def find_free_id(
    stem: str,
    current_ids: Iterable[str],
    sep="_",
) -> str:
    """Find a free var (or other name) based on a stem string, based on a list of provided existing names.

    Args:
        stem (str): the base string to use
        current_ids (Iterable[str]): currently existing names
        method (str): the method to create new names (allowed: 'count' -- for now)
        method_args (dict): prototing_config for the creation method

    Returns:
        str: a free name
    """

    start_count = 1
    if stem not in current_ids:
        return stem

    i = start_count

    # new_name = None
    while True:
        new_name = f"{stem}{sep}{i}"
        if new_name in current_ids:
            i = i + 1
            continue
        break
    return new_name

first_line(text)

Source code in kiara/utils/__init__.py
def first_line(text: str):

    if "\n" in text:
        return text.split("\n")[0].strip()
    else:
        return text

get_auto_workflow_alias(module_type, use_incremental_ids=False)

Return an id for a workflow obj of a provided module class.

If 'use_incremental_ids' is set to True, a unique id is returned.

Parameters:

Name Type Description Default
module_type str

the name of the module type

required
use_incremental_ids bool

whether to return a unique (incremental) id

False

Returns:

Type Description
str

a module id

Source code in kiara/utils/__init__.py
def get_auto_workflow_alias(module_type: str, use_incremental_ids: bool = False) -> str:
    """Return an id for a workflow obj of a provided module class.

    If 'use_incremental_ids' is set to True, a unique id is returned.

    Args:
        module_type (str): the name of the module type
        use_incremental_ids (bool): whether to return a unique (incremental) id

    Returns:
        str: a module id
    """

    if not use_incremental_ids:
        return module_type

    nr = _AUTO_MODULE_ID.setdefault(module_type, 0)
    _AUTO_MODULE_ID[module_type] = nr + 1

    return f"{module_type}_{nr}"

get_dev_config()

Source code in kiara/utils/__init__.py
def get_dev_config() -> "KiaraDevSettings":

    from kiara.utils.develop import KIARA_DEV_SETTINGS

    return KIARA_DEV_SETTINGS

is_debug()

Source code in kiara/utils/__init__.py
def is_debug() -> bool:

    debug = os.environ.get("DEBUG", "")
    if debug.lower() == "true":
        return True
    else:
        return False

is_develop()

Source code in kiara/utils/__init__.py
def is_develop() -> bool:

    develop = os.environ.get("DEVELOP", "")
    if not develop:
        develop = os.environ.get("DEV", "")

    if develop and develop.lower() != "false":
        return True

    return False

is_jupyter()

Source code in kiara/utils/__init__.py
def is_jupyter() -> bool:

    try:
        get_ipython  # type: ignore
    except NameError:
        return False
    ipython = get_ipython()  # type: ignore  # noqa
    shell = ipython.__class__.__name__
    if shell == "TerminalInteractiveShell":
        return False
    elif "google.colab" in str(ipython.__class__) or shell == "ZMQInteractiveShell":
        return True
    else:
        return False

log_exception(exc)

Source code in kiara/utils/__init__.py
def log_exception(exc: Exception):

    if is_debug():
        logger.exception(exc)

    if is_develop():
        from kiara.utils.develop import DetailLevel

        config = get_dev_config()
        if config.log.exc in [DetailLevel.NONE, "none"]:
            return

        show_locals = config.log.exc in [DetailLevel.FULL, "full"]

        from kiara.interfaces import get_console
        from kiara.utils.develop import log_dev_message

        exc_info = sys.exc_info()

        if not exc_info:
            # TODO: create exc_info from exception?
            if not is_debug():
                logger.exception(exc)
        else:
            console = get_console()

            log_dev_message(
                Traceback.from_exception(
                    *exc_info, show_locals=show_locals, width=console.width - 4  # type: ignore
                ),
                title="Exception details",
            )

log_message(msg, **data)

Source code in kiara/utils/__init__.py
def log_message(msg: str, **data):

    if is_debug():
        logger.debug(msg, **data)
    # else:
    #     logger.debug(msg, **data)

to_camel_case(text)

Source code in kiara/utils/__init__.py
def to_camel_case(text: str) -> str:

    words = WORD_REGEX_PATTERN.split(text)
    return "".join(w.title() for i, w in enumerate(words))

Modules

class_loading

KiaraEntryPointItem
KiaraEntryPointIterable
SUBCLASS_TYPE
logger

Functions

find_all_archive_types()

Find all KiaraArchive subclasses via package entry points.

Source code in kiara/utils/class_loading.py
def find_all_archive_types() -> Dict[str, Type["KiaraArchive"]]:
    """Find all [KiaraArchive][kiara.registries.KiaraArchive] subclasses via package entry points."""

    from kiara.registries import KiaraArchive

    return load_all_subclasses_for_entry_point(
        entry_point_name="kiara.archive_type",
        base_class=KiaraArchive,  # type: ignore
        type_id_key="_archive_type_name",
        type_id_func=_cls_name_id_func,
        attach_python_metadata=False,
    )
find_all_cli_subcommands()
Source code in kiara/utils/class_loading.py
def find_all_cli_subcommands():

    entry_point_name = "kiara.cli_subcommands"
    log2 = logging.getLogger("stevedore")
    out_hdlr = logging.StreamHandler(sys.stdout)
    out_hdlr.setFormatter(
        logging.Formatter(
            f"{entry_point_name} plugin search message/error -> %(message)s"
        )
    )
    out_hdlr.setLevel(logging.INFO)
    log2.addHandler(out_hdlr)
    if is_debug():
        log2.setLevel(logging.DEBUG)
    else:
        out_hdlr.setLevel(logging.INFO)
        log2.setLevel(logging.INFO)

    log_message("events.loading.entry_points", entry_point_name=entry_point_name)

    mgr = ExtensionManager(
        namespace=entry_point_name,
        invoke_on_load=False,
        propagate_map_exceptions=True,
    )

    return [plugin.plugin for plugin in mgr]
find_all_data_types()

Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

TODO

Source code in kiara/utils/class_loading.py
def find_all_data_types() -> Dict[str, Type["DataType"]]:
    """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

    TODO
    """

    from kiara.data_types import DataType

    all_data_types = load_all_subclasses_for_entry_point(
        entry_point_name="kiara.data_types",
        base_class=DataType,  # type: ignore
        type_id_key="_data_type_name",
        type_id_func=_cls_name_id_func,
    )

    invalid = [x for x in all_data_types.keys() if "." in x]
    if invalid:
        raise Exception(
            f"Invalid value type name(s), type names can't contain '.': {', '.join(invalid)}"
        )

    return all_data_types
find_all_kiara_model_classes()

Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

TODO

Source code in kiara/utils/class_loading.py
def find_all_kiara_model_classes() -> Dict[str, Type["KiaraModel"]]:
    """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

    TODO
    """

    from kiara.models import KiaraModel

    return load_all_subclasses_for_entry_point(
        entry_point_name="kiara.model_classes",
        base_class=KiaraModel,  # type: ignore
        type_id_key="_kiara_model_id",
        type_id_func=_cls_name_id_func,
        attach_python_metadata=False,
    )
find_all_kiara_modules()

Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

TODO

Source code in kiara/utils/class_loading.py
def find_all_kiara_modules() -> Dict[str, Type["KiaraModule"]]:
    """Find all [KiaraModule][kiara.module.KiaraModule] subclasses via package entry points.

    TODO
    """

    from kiara.modules import KiaraModule

    modules = load_all_subclasses_for_entry_point(
        entry_point_name="kiara.modules",
        base_class=KiaraModule,  # type: ignore
        type_id_key="_module_type_name",
        attach_python_metadata=True,
    )

    result = {}
    # need to test this, since I couldn't add an abstract method to the KiaraModule class itself (mypy complained because it is potentially overloaded)
    for k, cls in modules.items():

        if not hasattr(cls, "process"):
            if is_develop():
                msg = f"Invalid kiara module: **{cls.__module__}.{cls.__name__}**\n\nMissing method(s):\n- *process*"
                log_dev_message(msg=Markdown(msg))

            # TODO: check signature of process method
            log_message(
                "ignore.subclass",
                sub_class=cls,
                base_class=KiaraModule,
                reason="'process' method is missing",
            )
            continue

        result[k] = cls
    return result
find_all_kiara_pipeline_paths(skip_errors=False)
Source code in kiara/utils/class_loading.py
def find_all_kiara_pipeline_paths(
    skip_errors: bool = False,
) -> Dict[str, Union[Mapping[str, Any], None]]:

    import logging

    log2 = logging.getLogger("stevedore")
    out_hdlr = logging.StreamHandler(sys.stdout)
    out_hdlr.setFormatter(
        logging.Formatter("kiara pipeline search plugin error -> %(message)s")
    )
    out_hdlr.setLevel(logging.INFO)
    log2.addHandler(out_hdlr)
    log2.setLevel(logging.INFO)

    log_message("events.loading.pipelines")

    mgr = ExtensionManager(
        namespace="kiara.pipelines", invoke_on_load=False, propagate_map_exceptions=True
    )

    paths: Dict[str, Union[Mapping[str, Any], None]] = {}
    # TODO: make sure we load 'core' first?
    for plugin in mgr:

        name = plugin.name
        if (
            isinstance(plugin.plugin, tuple)
            and len(plugin.plugin) >= 1
            and callable(plugin.plugin[0])
        ) or callable(plugin.plugin):
            try:
                if callable(plugin.plugin):
                    func = plugin.plugin
                    args = []
                else:
                    func = plugin.plugin[0]
                    args = plugin.plugin[1:]

                f_args = []
                metadata: Union[Mapping[str, Any], None] = None
                if len(args) >= 1:
                    f_args.append(args[0])
                if len(args) >= 2:
                    metadata = args[1]
                    assert isinstance(metadata, Mapping)
                if len(args) > 3:
                    logger.debug(
                        "ignore.pipeline_lookup_arguments",
                        reason="more than 2 arguments provided",
                        surplus_args=args[2:],
                        path=f_args[0],
                    )

                result = func(f_args[0])
                if not result:
                    continue
                if isinstance(result, str):
                    paths[result] = metadata
                else:
                    for path in paths:
                        assert path not in paths.keys()
                        paths[path] = metadata

            except Exception as e:
                log_exception(e)
                if skip_errors:
                    log_message(
                        "ignore.pipline_entrypoint", entrypoint_name=name, reason=str(e)
                    )
                    continue
                raise Exception(f"Error trying to load plugin '{plugin.plugin}': {e}")
        else:
            if skip_errors:
                log_message(
                    "ignore.pipline_entrypoint",
                    entrypoint_name=name,
                    reason=f"invalid plugin type '{type(plugin.plugin)}'",
                )
                continue
            msg = f"Can't load pipelines for entrypoint '{name}': invalid plugin type '{type(plugin.plugin)}'"
            raise Exception(msg)

    return paths
find_all_operation_types()
Source code in kiara/utils/class_loading.py
def find_all_operation_types() -> Dict[str, Type["OperationType"]]:

    from kiara.operations import OperationType

    result = load_all_subclasses_for_entry_point(
        entry_point_name="kiara.operation_types",
        base_class=OperationType,  # type: ignore
        type_id_key="_operation_type_name",
    )
    return result
find_data_types_under(module)
Source code in kiara/utils/class_loading.py
def find_data_types_under(module: Union[str, ModuleType]) -> List[Type["DataType"]]:

    from kiara.data_types import DataType

    return find_subclasses_under(
        base_class=DataType,  # type: ignore
        python_module=module,
    )
find_kiara_model_classes_under(module)
Source code in kiara/utils/class_loading.py
def find_kiara_model_classes_under(
    module: Union[str, ModuleType]
) -> List[Type["KiaraModel"]]:

    from kiara.models import KiaraModel

    result = find_subclasses_under(
        base_class=KiaraModel,  # type: ignore
        python_module=module,
    )

    return result
find_kiara_modules_under(module)
Source code in kiara/utils/class_loading.py
def find_kiara_modules_under(
    module: Union[str, ModuleType],
) -> List[Type["KiaraModule"]]:

    from kiara.modules import KiaraModule

    return find_subclasses_under(
        base_class=KiaraModule,  # type: ignore
        python_module=module,
    )
find_operations_under(module)
Source code in kiara/utils/class_loading.py
def find_operations_under(
    module: Union[str, ModuleType]
) -> List[Type["OperationType"]]:

    from kiara.operations import OperationType

    return find_subclasses_under(
        base_class=OperationType,  # type: ignore
        python_module=module,
    )
find_pipeline_base_path_for_module(module)
Source code in kiara/utils/class_loading.py
def find_pipeline_base_path_for_module(
    module: Union[str, ModuleType]
) -> Union[str, None]:

    if hasattr(sys, "frozen"):
        raise NotImplementedError("Pyinstaller bundling not supported yet.")

    if isinstance(module, str):
        module = importlib.import_module(module)

    module_file = module.__file__
    assert module_file is not None
    path = os.path.dirname(module_file)

    if not os.path.exists:
        log_message("ignore.pipeline_folder", path=path, reason="folder does not exist")
        return None

    return path
find_subclasses_under(base_class, python_module)

Find all (non-abstract) subclasses of a base class that live under a module (recursively).

Parameters:

Name Type Description Default
base_class Type[~SUBCLASS_TYPE]

the parent class

required
python_module Union[str, module]

the Python module to search

required

Returns:

Type Description
List[Type[~SUBCLASS_TYPE]]

a list of all subclasses

Source code in kiara/utils/class_loading.py
def find_subclasses_under(
    base_class: Type[SUBCLASS_TYPE],
    python_module: Union[str, ModuleType],
) -> List[Type[SUBCLASS_TYPE]]:
    """Find all (non-abstract) subclasses of a base class that live under a module (recursively).

    Arguments:
        base_class: the parent class
        python_module: the Python module to search

    Returns:
        a list of all subclasses
    """

    if hasattr(sys, "frozen"):
        raise NotImplementedError("Pyinstaller bundling not supported yet.")

    try:
        if isinstance(python_module, str):
            python_module = importlib.import_module(python_module)

        _import_modules_recursively(python_module)
    except Exception as e:
        log_exception(e)
        log_message("ignore.python_module", module=str(python_module), reason=str(e))
        return []

    subclasses: Iterable[Type[SUBCLASS_TYPE]] = _get_all_subclasses(base_class)

    result = []
    for sc in subclasses:

        if not sc.__module__.startswith(python_module.__name__):
            continue

        result.append(sc)

    return result
load_all_subclasses_for_entry_point(entry_point_name, base_class, ignore_abstract_classes=True, type_id_key=None, type_id_func=None, type_id_no_attach=False, attach_python_metadata=False)

Find all subclasses of a base class via package entry points.

Parameters:

Name Type Description Default
entry_point_name str

the entry point name to query entries for

required
base_class Type[~SUBCLASS_TYPE]

the base class to look for

required
ignore_abstract_classes bool

whether to include abstract classes in the result

True
type_id_key Optional[str]

if provided, the found classes will have their id attached as an attribute, using the value of this as the name. if an attribute of this name already exists, it will be used as id without further processing

None
type_id_func Callable

a function to take the found class as input, and returns a string representing the id of the class. By default, the module path + "." + class name (snake-case) is used (minus the string 'kiara_modules.'', if it exists at the beginning

None
type_id_no_attach bool

in case you want to use the type_id_key to set the id, but don't want it attached to classes that don't have it, set this to true. In most cases, you won't need this option

False
attach_python_metadata Union[bool, str]

whether to attach a PythonClass metadata model to the class. By default, '_python_class' is used as attribute name if this argument is 'True', If this argument is a string, that will be used as name instead.

False
Source code in kiara/utils/class_loading.py
def load_all_subclasses_for_entry_point(
    entry_point_name: str,
    base_class: Type[SUBCLASS_TYPE],
    ignore_abstract_classes: bool = True,
    type_id_key: Union[str, None] = None,
    type_id_func: Callable = None,
    type_id_no_attach: bool = False,
    attach_python_metadata: Union[bool, str] = False,
) -> Dict[str, Type[SUBCLASS_TYPE]]:
    """Find all subclasses of a base class via package entry points.

    Arguments:
        entry_point_name: the entry point name to query entries for
        base_class: the base class to look for
        ignore_abstract_classes: whether to include abstract classes in the result
        type_id_key: if provided, the found classes will have their id attached as an attribute, using the value of this as the name. if an attribute of this name already exists, it will be used as id without further processing
        type_id_func: a function to take the found class as input, and returns a string representing the id of the class. By default, the module path + "." + class name (snake-case) is used (minus the string 'kiara_modules.<project_name>'', if it exists at the beginning
        type_id_no_attach: in case you want to use the type_id_key to set the id, but don't want it attached to classes that don't have it, set this to true. In most cases, you won't need this option
        attach_python_metadata: whether to attach a [PythonClass][kiara.models.python_class.PythonClass] metadata model to the class. By default, '_python_class' is used as attribute name if this argument is 'True', If this argument is a string, that will be used as name instead.
    """

    log2 = logging.getLogger("stevedore")
    out_hdlr = logging.StreamHandler(sys.stdout)
    out_hdlr.setFormatter(
        logging.Formatter(
            f"{entry_point_name} plugin search message/error -> %(message)s"
        )
    )
    out_hdlr.setLevel(logging.INFO)
    log2.addHandler(out_hdlr)
    if is_debug():
        log2.setLevel(logging.DEBUG)
    else:
        out_hdlr.setLevel(logging.INFO)
        log2.setLevel(logging.INFO)

    log_message("events.loading.entry_points", entry_point_name=entry_point_name)

    mgr = ExtensionManager(
        namespace=entry_point_name,
        invoke_on_load=False,
        propagate_map_exceptions=True,
    )

    result_entrypoints: Dict[str, Type[SUBCLASS_TYPE]] = {}
    result_dynamic: Dict[str, Type[SUBCLASS_TYPE]] = {}

    for plugin in mgr:
        name = plugin.name

        if isinstance(plugin.plugin, type):
            # this means an actual (sub-)class was provided in the entrypoint

            cls = plugin.plugin
            if not issubclass(cls, base_class):
                log_message(
                    "ignore.entrypoint",
                    entry_point=name,
                    base_class=base_class,
                    sub_class=plugin.plugin,
                    reason=f"Entry point reference not a subclass of '{base_class}'.",
                )
                continue

            _process_subclass(
                sub_class=cls,
                base_class=base_class,
                type_id_key=type_id_key,
                type_id_func=type_id_func,
                type_id_no_attach=type_id_no_attach,
                attach_python_metadata=attach_python_metadata,
                ignore_abstract_classes=ignore_abstract_classes,
            )

            result_entrypoints[name] = cls
        elif (
            isinstance(plugin.plugin, tuple)
            and len(plugin.plugin) >= 1
            and callable(plugin.plugin[0])
        ) or callable(plugin.plugin):
            try:
                if callable(plugin.plugin):
                    func = plugin.plugin
                    args = []
                else:
                    func = plugin.plugin[0]
                    args = plugin.plugin[1:]
                classes = func(*args)
            except Exception as e:
                log_exception(e)
                raise Exception(f"Error trying to load plugin '{plugin.plugin}': {e}")

            for sub_class in classes:
                type_id = _process_subclass(
                    sub_class=sub_class,
                    base_class=base_class,
                    type_id_key=type_id_key,
                    type_id_func=type_id_func,
                    type_id_no_attach=type_id_no_attach,
                    attach_python_metadata=attach_python_metadata,
                    ignore_abstract_classes=ignore_abstract_classes,
                )

                if type_id is None:
                    continue

                if type_id in result_dynamic.keys():
                    raise Exception(
                        f"Duplicate type id '{type_id}' for type {entry_point_name}: {result_dynamic[type_id]} -- {sub_class}"
                    )
                result_dynamic[type_id] = sub_class

        else:
            raise Exception(
                f"Can't load subclasses for entry point {entry_point_name} and base class {base_class}: invalid plugin type {type(plugin.plugin)}"
            )

    for k, v in result_dynamic.items():
        if k in result_entrypoints.keys():
            msg = f"Duplicate item name '{k}' for type {entry_point_name}: {v} -- {result_entrypoints[k]}."
            try:
                if type_id_key not in v.__dict__.keys():
                    msg = f"{msg} Most likely the name is picked up from a subclass, try to add a '{type_id_key}' class attribute to your implementing class, with the name you want to give your type as value."
            except Exception:
                pass

            raise Exception(msg)
        result_entrypoints[k] = v

    return result_entrypoints

cli special

F
FC

Classes

OutputFormat (Enum)

An enumeration.

Source code in kiara/utils/cli/__init__.py
class OutputFormat(Enum):
    @classmethod
    def as_dict(cls):
        return {i.name: i.value for i in cls}

    @classmethod
    def keys_as_list(cls):
        return cls._member_names_

    @classmethod
    def values_as_list(cls):
        return [i.value for i in cls]

    TERMINAL = "terminal"
    HTML = "html"
    JSON = "json"
    JSON_INCL_SCHEMA = "json-incl-schema"
    JSON_SCHEMA = "json-schema"
HTML
JSON
JSON_INCL_SCHEMA
JSON_SCHEMA
TERMINAL

Functions

dict_from_cli_args(*args, *, list_keys=None)
Source code in kiara/utils/cli/__init__.py
def dict_from_cli_args(
    *args: str, list_keys: Union[Iterable[str], None] = None
) -> Dict[str, Any]:

    if not args:
        return {}

    config: Dict[str, Any] = {}
    for arg in args:
        if "=" in arg:
            key, value = arg.split("=", maxsplit=1)
            try:
                _v = json.loads(value)
            except Exception:
                _v = value
            part_config = {key: _v}
        elif os.path.isfile(os.path.realpath(os.path.expanduser(arg))):
            path = os.path.realpath(os.path.expanduser(arg))
            part_config = get_data_from_file(path)
            assert isinstance(part_config, Mapping)
        else:
            try:
                part_config = json.loads(arg)
                assert isinstance(part_config, Mapping)
            except Exception:
                raise Exception(f"Could not parse argument into data: {arg}")

        if list_keys is None:
            list_keys = []

        for k, v in part_config.items():
            if k in list_keys:
                config.setdefault(k, []).append(v)
            else:
                if k in config.keys():
                    logger.warning("duplicate.key", old_value=k, new_value=v)
                config[k] = v

    return config
is_rich_renderable(item)
Source code in kiara/utils/cli/__init__.py
def is_rich_renderable(item: Any):
    return isinstance(item, (ConsoleRenderable, RichCast, str))
output_format_option(*param_decls)

Attaches an option to the command. All positional arguments are passed as parameter declarations to :class:Option; all keyword arguments are forwarded unchanged (except cls). This is equivalent to creating an :class:Option instance manually and attaching it to the :attr:Command.params list.

:param cls: the option class to instantiate. This defaults to :class:Option.

Source code in kiara/utils/cli/__init__.py
def output_format_option(*param_decls: str) -> Callable[[FC], FC]:
    """Attaches an option to the command.  All positional arguments are
    passed as parameter declarations to :class:`Option`; all keyword
    arguments are forwarded unchanged (except ``cls``).
    This is equivalent to creating an :class:`Option` instance manually
    and attaching it to the :attr:`Command.params` list.

    :param cls: the option class to instantiate.  This defaults to
                :class:`Option`.
    """

    if not param_decls:
        param_decls = ("--format", "-f")

    attrs = {
        "help": "The output format. Defaults to 'terminal'.",
        "type": click.Choice(OutputFormat.values_as_list()),
    }

    def decorator(f: FC) -> FC:
        # Issue 926, copy attrs, so pre-defined options can re-use the same cls=
        option_attrs = attrs.copy()
        OptionClass = option_attrs.pop("cls", None) or Option
        _param_memo(f, OptionClass(param_decls, **option_attrs))  # type: ignore
        return f

    return decorator
render_json_schema_str(model)
Source code in kiara/utils/cli/__init__.py
def render_json_schema_str(model: BaseModel):

    try:
        json_str = model.schema_json(option=orjson.OPT_INDENT_2)
    except TypeError:
        json_str = model.schema_json(indent=2)

    return json_str
render_json_str(model)
Source code in kiara/utils/cli/__init__.py
def render_json_str(model: BaseModel):

    try:
        json_str = model.json(option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS)
    except TypeError:
        json_str = model.json(indent=2)

    return json_str
terminal_print(msg=None, in_panel=None, rich_config=None, empty_line_before=False, **config)
Source code in kiara/utils/cli/__init__.py
def terminal_print(
    msg: Any = None,
    in_panel: Union[str, None] = None,
    rich_config: Union[Mapping[str, Any], None] = None,
    empty_line_before: bool = False,
    **config: Any,
) -> None:

    if msg is None:
        msg = ""
    console = get_console()

    msg = extract_renderable(msg, render_config=config)
    # if hasattr(msg, "create_renderable"):
    #     msg = msg.create_renderable(**config)  # type: ignore

    if in_panel is not None:
        msg = Panel(msg, title_align="left", title=in_panel)

    if empty_line_before:
        console.print()
    if rich_config:
        console.print(msg, **rich_config)
    else:
        console.print(msg)
terminal_print_model(*models, *, format=None, empty_line_before=None, in_panel=None, **render_config)
Source code in kiara/utils/cli/__init__.py
def terminal_print_model(
    *models: BaseModel,
    format: Union[None, OutputFormat, str] = None,
    empty_line_before: Union[bool, None] = None,
    in_panel: Union[str, None] = None,
    **render_config: Any,
):

    if format is None:
        format = OutputFormat.TERMINAL

    if isinstance(format, str):
        format = OutputFormat(format)

    if empty_line_before is None:
        if format == OutputFormat.TERMINAL:
            empty_line_before = True
        else:
            empty_line_before = False

    if format == OutputFormat.TERMINAL:
        if len(models) == 1:
            terminal_print(
                models[0],
                in_panel=in_panel,
                empty_line_before=empty_line_before,
                **render_config,
            )
        else:
            rg = []
            if not models:
                return
            for model in models[0:-1]:
                renderable = extract_renderable(model, render_config)
                rg.append(renderable)
                rg.append(Rule(style="b"))
            last = extract_renderable(models[-1], render_config)
            rg.append(last)
            group = Group(*rg)
            terminal_print(group, in_panel=in_panel, **render_config)
    elif format == OutputFormat.JSON:
        if len(models) == 1:
            json_str = render_json_str(models[0])
            syntax = Syntax(json_str, "json", background_color="default")
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )
        else:
            json_strs = []
            for model in models:
                json_str = render_json_str(model)
                json_strs.append(json_str)

            json_str_full = "[" + ",\n".join(json_strs) + "]"
            syntax = Syntax(json_str_full, "json", background_color="default")
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )

    elif format == OutputFormat.JSON_SCHEMA:
        if len(models) == 1:
            syntax = Syntax(
                models[0].schema_json(option=orjson.OPT_INDENT_2),
                "json",
                background_color="default",
            )
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )
        else:
            json_strs = []
            for model in models:
                json_strs.append(render_json_schema_str(model))
            json_str_full = "[" + ",\n".join(json_strs) + "]"
            syntax = Syntax(json_str_full, "json", background_color="default")
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )
    elif format == OutputFormat.JSON_INCL_SCHEMA:
        if len(models) == 1:
            data = models[0].dict()
            schema = models[0].schema()
            all = {"data": data, "schema": schema}
            json_str = orjson_dumps(all, option=orjson.OPT_INDENT_2)
            syntax = Syntax(json_str, "json", background_color="default")
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )
        else:
            all_data = []
            for model in models:
                data = model.dict()
                schema = model.schema()
                all_data.append({"data": data, "schema": schema})
            json_str = orjson_dumps(all_data, option=orjson.OPT_INDENT_2)
            # print(json_str)
            syntax = Syntax(json_str, "json", background_color="default")
            terminal_print(
                syntax,
                empty_line_before=empty_line_before,
                rich_config={"soft_wrap": True},
            )

    elif format == OutputFormat.HTML:

        all_html = ""
        for model in models:
            if hasattr(model, "create_html"):
                html = model.create_html()  # type: ignore
                all_html = f"{all_html}\n{html}"
            else:
                raise NotImplementedError()

        syntax = Syntax(all_html, "html", background_color="default")
        terminal_print(
            syntax, empty_line_before=empty_line_before, rich_config={"soft_wrap": True}
        )

Modules

rich_click
Functions
rich_format_operation_help(obj, ctx, operation)

Print nicely formatted help text using rich.

Based on original code from rich-cli, by @willmcgugan. https://github.com/Textualize/rich-cli/blob/8a2767c7a340715fc6fbf4930ace717b9b2fc5e5/src/rich_cli/main.py#L162-L236

Replacement for the click function format_help(). Takes a command or group and builds the help text output.

Parameters:

Name Type Description Default
obj click.Command or click.Group

Command or group to build help text for

required
ctx click.Context

Click Context object

required
table

a rich table, including all the inputs of the current operation

required
Source code in kiara/utils/cli/rich_click.py
def rich_format_operation_help(
    obj: Union[click.Command, click.Group],
    ctx: click.Context,
    operation: KiaraOperation,
) -> None:
    """Print nicely formatted help text using rich.

    Based on original code from rich-cli, by @willmcgugan.
    https://github.com/Textualize/rich-cli/blob/8a2767c7a340715fc6fbf4930ace717b9b2fc5e5/src/rich_cli/__main__.py#L162-L236

    Replacement for the click function format_help().
    Takes a command or group and builds the help text output.

    Args:
        obj (click.Command or click.Group): Command or group to build help text for
        ctx (click.Context): Click Context object
        table: a rich table, including all the inputs of the current operation
    """

    renderables: List[RenderableType] = []
    # Header text if we have it
    if HEADER_TEXT:
        renderables.append(
            Padding(_make_rich_rext(HEADER_TEXT, STYLE_HEADER_TEXT), (1, 1, 0, 1))
        )

    # Print usage
    cmd_arg = ctx.params["module_or_operation"]
    _cmd = f"[yellow bold]Usage: [/yellow bold][bold]kiara run [OPTIONS] [i]{cmd_arg}[/i] [INPUTS][/bold]"
    renderables.append(Padding(_cmd, 1))
    # renderables.append(obj.get_usage(ctx))
    # renderables.append(Panel(Padding(highlighter(obj.get_usage(ctx)), 1), style=STYLE_USAGE_COMMAND, box=box.MINIMAL))

    # Print command / group help if we have some
    desc = operation.operation.doc.full_doc
    renderables.append(
        Padding(
            Align(Markdown(desc), width=MAX_WIDTH, pad=False),
            (0, 1, 1, 1),
        )
    )

    # if obj.help:
    #
    #     # Print with a max width and some padding
    #     renderables.append(
    #         Padding(
    #             Align(_get_help_text(obj), width=MAX_WIDTH, pad=False),
    #             (0, 1, 1, 1),
    #         )
    #     )

    # Look through OPTION_GROUPS for this command
    # stick anything unmatched into a default group at the end
    option_groups = OPTION_GROUPS.get(ctx.command_path, []).copy()
    option_groups.append({"options": []})
    argument_group_options = []

    for param in obj.get_params(ctx):

        # Skip positional arguments - they don't have opts or helptext and are covered in usage
        # See https://click.palletsprojects.com/en/8.0.x/documentation/#documenting-arguments
        if type(param) is click.core.Argument and not SHOW_ARGUMENTS:
            continue

        # Skip if option is hidden
        if getattr(param, "hidden", False):
            continue

        # Already mentioned in a config option group
        for option_group in option_groups:
            if any([opt in option_group.get("options", []) for opt in param.opts]):
                break

        # No break, no mention - add to the default group
        else:
            if type(param) is click.core.Argument and not GROUP_ARGUMENTS_OPTIONS:
                argument_group_options.append(param.opts[0])
            else:
                list_of_option_groups: List = option_groups[-1]["options"]  # type: ignore
                list_of_option_groups.append(param.opts[0])

    # If we're not grouping arguments and we got some, prepend before default options
    if len(argument_group_options) > 0:
        extra_option_group = {
            "name": ARGUMENTS_PANEL_TITLE,
            "options": argument_group_options,
        }
        option_groups.insert(len(option_groups) - 1, extra_option_group)  # type: ignore

    # Print each option group panel
    for option_group in option_groups:

        options_rows = []
        for opt in option_group.get("options", []):

            # Get the param
            for param in obj.get_params(ctx):
                if any([opt in param.opts]):
                    break
            # Skip if option is not listed in this group
            else:
                continue

            # Short and long form
            opt_long_strs = []
            opt_short_strs = []
            for idx, opt in enumerate(param.opts):
                opt_str = opt
                try:
                    opt_str += "/" + param.secondary_opts[idx]
                except IndexError:
                    pass
                if "--" in opt:
                    opt_long_strs.append(opt_str)
                else:
                    opt_short_strs.append(opt_str)

            # Column for a metavar, if we have one
            metavar = Text(style=STYLE_METAVAR, overflow="fold")
            metavar_str = param.make_metavar()

            # Do it ourselves if this is a positional argument
            if type(param) is click.core.Argument and metavar_str == param.name.upper():  # type: ignore
                metavar_str = param.type.name.upper()

            # Skip booleans and choices (handled above)
            if metavar_str != "BOOLEAN":
                metavar.append(metavar_str)

            # Range - from
            # https://github.com/pallets/click/blob/c63c70dabd3f86ca68678b4f00951f78f52d0270/src/click/core.py#L2698-L2706  # noqa: E501
            try:
                # skip count with default range type
                if isinstance(param.type, click.types._NumberRangeBase) and not (
                    param.count and param.type.min == 0 and param.type.max is None  # type: ignore
                ):
                    range_str = param.type._describe_range()
                    if range_str:
                        metavar.append(RANGE_STRING.format(range_str))
            except AttributeError:
                # click.types._NumberRangeBase is only in Click 8x onwards
                pass

            # Required asterisk
            required: RenderableType = ""
            if param.required:
                required = Text(REQUIRED_SHORT_STRING, style=STYLE_REQUIRED_SHORT)

            # Highlighter to make [ | ] and <> dim
            class MetavarHighlighter(RegexHighlighter):
                highlights = [
                    r"^(?P<metavar_sep>(\[|<))",
                    r"(?P<metavar_sep>\|)",
                    r"(?P<metavar_sep>(\]|>)$)",
                ]

            metavar_highlighter = MetavarHighlighter()

            rows = [
                required,
                highlighter(highlighter(",".join(opt_long_strs))),
                highlighter(highlighter(",".join(opt_short_strs))),
                metavar_highlighter(metavar),
                _get_parameter_help(param, ctx),  # type: ignore
            ]

            # Remove metavar if specified in config
            if not SHOW_METAVARS_COLUMN:
                rows.pop(3)

            options_rows.append(rows)

        if len(options_rows) > 0:
            t_styles = {
                "show_lines": STYLE_OPTIONS_TABLE_SHOW_LINES,
                "leading": STYLE_OPTIONS_TABLE_LEADING,
                "box": STYLE_OPTIONS_TABLE_BOX,
                "border_style": STYLE_OPTIONS_TABLE_BORDER_STYLE,
                "row_styles": STYLE_OPTIONS_TABLE_ROW_STYLES,
                "pad_edge": STYLE_OPTIONS_TABLE_PAD_EDGE,
                "padding": STYLE_OPTIONS_TABLE_PADDING,
            }
            t_styles.update(option_group.get("table_styles", {}))  # type: ignore
            box_style = getattr(box, t_styles.pop("box"), None)  # type: ignore

            options_table = Table(
                highlight=True,
                show_header=False,
                expand=True,
                box=box_style,
                **t_styles,  # type: ignore
            )
            # Strip the required column if none are required
            if all([x[0] == "" for x in options_rows]):
                options_rows = [x[1:] for x in options_rows]
            for row in options_rows:
                options_table.add_row(*row)
            renderables.append(
                Panel(
                    options_table,
                    border_style=STYLE_OPTIONS_PANEL_BORDER,  # type: ignore
                    title=option_group.get("name", OPTIONS_PANEL_TITLE),  # type: ignore
                    title_align=ALIGN_OPTIONS_PANEL,  # type: ignore
                    width=MAX_WIDTH,  # type: ignore
                )
            )

    #
    # Groups only:
    # List click command groups
    #
    if hasattr(obj, "list_commands"):
        # Look through COMMAND_GROUPS for this command
        # stick anything unmatched into a default group at the end
        cmd_groups = COMMAND_GROUPS.get(ctx.command_path, []).copy()
        cmd_groups.append({"commands": []})
        for command in obj.list_commands(ctx):  # type: ignore
            for cmd_group in cmd_groups:
                if command in cmd_group.get("commands", []):
                    break
            else:
                commands: List = cmd_groups[-1]["commands"]  # type: ignore
                commands.append(command)

        # Print each command group panel
        for cmd_group in cmd_groups:
            t_styles = {
                "show_lines": STYLE_COMMANDS_TABLE_SHOW_LINES,
                "leading": STYLE_COMMANDS_TABLE_LEADING,
                "box": STYLE_COMMANDS_TABLE_BOX,
                "border_style": STYLE_COMMANDS_TABLE_BORDER_STYLE,
                "row_styles": STYLE_COMMANDS_TABLE_ROW_STYLES,
                "pad_edge": STYLE_COMMANDS_TABLE_PAD_EDGE,
                "padding": STYLE_COMMANDS_TABLE_PADDING,
            }
            t_styles.update(cmd_group.get("table_styles", {}))  # type: ignore
            box_style = getattr(box, t_styles.pop("box"), None)  # type: ignore

            commands_table = Table(
                highlight=False,
                show_header=False,
                expand=True,
                box=box_style,  # type: ignore
                **t_styles,  # type: ignore
            )
            # Define formatting in first column, as commands don't match highlighter regex
            commands_table.add_column(style="bold cyan", no_wrap=True)
            for command in cmd_group.get("commands", []):
                # Skip if command does not exist
                if command not in obj.list_commands(ctx):  # type: ignore
                    continue
                cmd = obj.get_command(ctx, command)  # type: ignore
                assert cmd is not None
                if cmd.hidden:
                    continue
                # Use the truncated short text as with vanilla text if requested
                if USE_CLICK_SHORT_HELP:
                    helptext = cmd.get_short_help_str()
                else:
                    # Use short_help function argument if used, or the full help
                    helptext = cmd.short_help or cmd.help or ""
                commands_table.add_row(command, _make_command_help(helptext))
            if commands_table.row_count > 0:
                renderables.append(
                    Panel(
                        commands_table,
                        border_style=STYLE_COMMANDS_PANEL_BORDER,  # type: ignore
                        title=cmd_group.get("name", COMMANDS_PANEL_TITLE),  # type: ignore
                        title_align=ALIGN_COMMANDS_PANEL,  # type: ignore
                        width=MAX_WIDTH,  # type: ignore
                    )
                )

    inputs_table = operation.create_renderable(
        show_operation_name=False,
        show_operation_doc=False,
        show_inputs=True,
        show_outputs_schema=False,
        show_headers=False,
    )

    inputs_panel = Panel(
        inputs_table,
        title="Inputs",
        border_style=STYLE_COMMANDS_PANEL_BORDER,  # type: ignore
        title_align=ALIGN_COMMANDS_PANEL,  # type: ignore
        width=MAX_WIDTH,  # type: ignore
    )
    renderables.append(inputs_panel)

    # Epilogue if we have it
    if obj.epilog:
        # Remove single linebreaks, replace double with single
        lines = obj.epilog.split("\n\n")
        epilogue = "\n".join([x.replace("\n", " ").strip() for x in lines])
        renderables.append(
            Padding(Align(highlighter(epilogue), width=MAX_WIDTH, pad=False), 1)
        )

    # Footer text if we have it
    if FOOTER_TEXT:
        renderables.append(
            Padding(_make_rich_rext(FOOTER_TEXT, STYLE_FOOTER_TEXT), (1, 1, 0, 1))
        )

    group = Group(*renderables)
    terminal_print(group)

concurrency

Classes

ThreadSaveCounter

A thread-safe counter, can be used in kiara modules to update completion percentage.

Source code in kiara/utils/concurrency.py
class ThreadSaveCounter(object):
    """A thread-safe counter, can be used in kiara modules to update completion percentage."""

    def __init__(self):

        self._current = 0
        self._lock = threading.Lock()

    @property
    def current(self):
        return self._current

    def current_percent(self, total: int) -> int:

        return int((self.current / total) * 100)

    def increment(self):

        with self._lock:
            self._current += 1
            return self._current

    def decrement(self):

        with self._lock:
            self._current -= 1
            return self._current
current property readonly
current_percent(self, total)
Source code in kiara/utils/concurrency.py
def current_percent(self, total: int) -> int:

    return int((self.current / total) * 100)
decrement(self)
Source code in kiara/utils/concurrency.py
def decrement(self):

    with self._lock:
        self._current -= 1
        return self._current
increment(self)
Source code in kiara/utils/concurrency.py
def increment(self):

    with self._lock:
        self._current += 1
        return self._current

data

logger
pretty_print_data(kiara, value_id, target_type='terminal_renderable', **render_config)
Source code in kiara/utils/data.py
def pretty_print_data(
    kiara: "Kiara",
    value_id: uuid.UUID,
    target_type="terminal_renderable",
    **render_config: Any,
) -> Any:

    value = kiara.data_registry.get_value(value_id=value_id)

    op_type: PrettyPrintOperationType = kiara.operation_registry.get_operation_type("pretty_print")  # type: ignore

    try:
        op: Union[Operation, None] = op_type.get_operation_for_render_combination(
            source_type=value.value_schema.type, target_type=target_type
        )
    except Exception as e:

        logger.debug(
            "error.pretty_print",
            source_type=value.value_schema.type,
            target_type=target_type,
            error=e,
        )

        op = None
        if target_type == "terminal_renderable":
            try:
                op = op_type.get_operation_for_render_combination(
                    source_type="any", target_type="string"
                )
            except Exception:
                pass

    if op is None:
        raise Exception(
            f"Can't find operation to render '{value.value_schema.type}' as '{target_type}."
        )

    result = op.run(kiara=kiara, inputs={"value": value})
    rendered = result.get_value_data("rendered_value")
    return rendered
render_value(kiara, value_id, target_type='terminal_renderable', render_instruction=None)
Source code in kiara/utils/data.py
def render_value(
    kiara: "Kiara",
    value_id: uuid.UUID,
    target_type="terminal_renderable",
    render_instruction: Union[RenderInstruction, None] = None,
) -> RenderValueResult:

    value = kiara.data_registry.get_value(value_id=value_id)
    op_type: RenderValueOperationType = kiara.operation_registry.get_operation_type("render_value")  # type: ignore

    ops = op_type.get_render_operations_for_source_type(value.data_type_name)
    if target_type not in ops.keys():
        if not ops:
            msg = f"No render operations registered for source type '{value.data_type_name}'."
        else:
            msg = f"Registered target types for source type '{value.data}': {', '.join(ops.keys())}."
        raise Exception(
            f"No render operation for source type '{value.data_type_name}' to target type '{target_type}' registered. {msg}"
        )

    op = ops[target_type]
    result = op.run(
        kiara=kiara, inputs={"value": value, "render_instruction": render_instruction}
    )

    return RenderValueResult(
        rendered=result.get_value_data("rendered_value"),
        metadata=result.get_value_data("render_metadata"),
    )

db

get_kiara_db_url(base_path)
Source code in kiara/utils/db.py
def get_kiara_db_url(base_path: str):

    abs_path = os.path.abspath(os.path.expanduser(base_path))
    db_url = f"sqlite+pysqlite:///{abs_path}/kiara.db"
    return db_url
orm_json_deserialize(obj)
Source code in kiara/utils/db.py
def orm_json_deserialize(obj: str) -> Any:
    return orjson.loads(obj)
orm_json_serialize(obj)
Source code in kiara/utils/db.py
def orm_json_serialize(obj: Any) -> str:

    if hasattr(obj, "json"):
        return obj.json()

    if isinstance(obj, str):
        return obj
    elif isinstance(obj, Mapping):
        return orjson_dumps(obj, default=None)
    else:
        raise Exception(f"Unsupported type for json serialization: {type(obj)}")

debug

DEFAULT_VALUE_MAP_RENDER_CONFIG
create_module_preparation_table(kiara, job_config, job_id, **render_config)
Source code in kiara/utils/debug.py
def create_module_preparation_table(
    kiara: "Kiara", job_config: JobConfig, job_id: uuid.UUID, **render_config: Any
) -> Table:

    dev_config = get_dev_config()
    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("key", style="i")
    table.add_column("value")

    table.add_row("job_id", str(job_id))

    module_details = dev_config.log.pre_run.module_info
    if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
        if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]:
            table.add_row("module", job_config.module_type)
            table.add_row(
                "module desc",
                kiara.context_info.module_types.item_infos[
                    job_config.module_type
                ].documentation.description,
            )
        elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]:
            table.add_row("module", job_config.module_type)
            table.add_row(
                "module doc",
                kiara.context_info.module_types.item_infos[
                    job_config.module_type
                ].documentation.full_doc,
            )
            if module_config_is_empty(job_config.module_config):
                table.add_row("module_config", "-- no config --")
            else:
                module = kiara.module_registry.create_module(manifest=job_config)
                table.add_row("module_config", module.config)

    inputs_details = dev_config.log.pre_run.inputs_info
    if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
        if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
            render_config["show_type"] = False
            value_map_rend = create_value_map_renderable(
                value_map=job_config.inputs, **render_config
            )
            table.add_row("inputs", value_map_rend)
        elif inputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
            value_map = kiara.data_registry.load_values(values=job_config.inputs)
            table.add_row("inputs", value_map.create_renderable(**render_config))

    return table
create_post_run_table(kiara, job, module, job_config, **render_config)
Source code in kiara/utils/debug.py
def create_post_run_table(
    kiara: "Kiara",
    job: ActiveJob,
    module: "KiaraModule",
    job_config: JobConfig,
    **render_config: Any
) -> Table:

    dev_config = get_dev_config()
    table = Table(show_header=False, box=box.SIMPLE)
    table.add_column("key", style="i")
    table.add_column("value")

    table.add_row("job_id", str(job.job_id))
    module_details = dev_config.log.post_run.module_info
    if module_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
        if module_details in [DetailLevel.MINIMAL.value, DetailLevel.MINIMAL]:
            table.add_row("module", module.module_type_name)
            table.add_row(
                "module desc",
                kiara.context_info.module_types.item_infos[
                    module.module_type_name
                ].documentation.description,
            )
        elif module_details in [DetailLevel.FULL.value, DetailLevel.FULL]:
            table.add_row("module", module.module_type_name)
            table.add_row(
                "module doc",
                kiara.context_info.module_types.item_infos[
                    module.module_type_name
                ].documentation.full_doc,
            )
            if module_config_is_empty(module.config.dict()):
                table.add_row("module_config", "-- no config --")
            else:
                table.add_row("module_config", module.config)

    inputs_details = dev_config.log.post_run.inputs_info
    if inputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
        if inputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
            render_config["show_type"] = False
            value_map_rend: RenderableType = create_value_map_renderable(
                value_map=job_config.inputs, **render_config
            )
            table.add_row("inputs", value_map_rend)
        elif inputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
            value_map = kiara.data_registry.load_values(values=job_config.inputs)
            table.add_row("inputs", value_map.create_renderable(**render_config))

    outputs_details = dev_config.log.post_run.outputs_info
    if outputs_details not in [DetailLevel.NONE.value, DetailLevel.NONE]:
        if outputs_details in [DetailLevel.MINIMAL, DetailLevel.MINIMAL.value]:
            render_config["show_type"] = False
            if job.results is None:
                value_map_rend = "-- no results --"
            else:
                value_map_rend = create_value_map_renderable(
                    value_map=job.results, **render_config
                )
            table.add_row("outputs", value_map_rend)
        elif outputs_details in [DetailLevel.FULL, DetailLevel.FULL.value]:
            if job.results is None:
                value_map_rend = "-- no results --"
            else:
                value_map = kiara.data_registry.load_values(values=job.results)
                value_map_rend = value_map.create_renderable(**render_config)
            table.add_row("outputs", value_map_rend)

    return table
create_value_map_renderable(value_map, **render_config)
Source code in kiara/utils/debug.py
def create_value_map_renderable(value_map: Mapping[str, Any], **render_config: Any):

    show_type = render_config.get("show_type", True)

    rc = dict(DEFAULT_VALUE_MAP_RENDER_CONFIG)
    rc.update(render_config)

    table = Table(show_header=True, box=box.SIMPLE)
    table.add_column("field name", style="i")
    if show_type:
        table.add_column("type")
    table.add_column("value")

    for k, v in value_map.items():
        row: List[Any] = [k]
        if isinstance(v, Value):
            if show_type:
                row.append("value object")
            row.append(v.create_renderable(**rc))
        elif isinstance(v, uuid.UUID):
            if show_type:
                row.append("value id")
            row.append(str(v))
        else:
            if show_type:
                row.append("raw data")
            row.append(str(v))

        table.add_row(*row)

    return table
terminal_print_manifest(manifest)
Source code in kiara/utils/debug.py
def terminal_print_manifest(manifest: Manifest):

    terminal_print(manifest.create_renderable())

develop special

KIARA_DEV_SETTINGS

Classes

DetailLevel (Enum)

An enumeration.

Source code in kiara/utils/develop/__init__.py
class DetailLevel(Enum):

    NONE = "none"
    MINIMAL = "minimal"
    FULL = "full"
FULL
MINIMAL
NONE
KiaraDevLogSettings (BaseModel) pydantic-model
Source code in kiara/utils/develop/__init__.py
class KiaraDevLogSettings(BaseModel):

    PROFILES: ClassVar[Dict[str, Any]] = {
        "full": {
            "log_pre_run": True,
            "pre_run": {
                "pipeline_steps": True,
                "module_info": "full",
                "inputs_info": "full",
            },
            "log_post_run": True,
            "post_run": {
                "pipeline_steps": True,
                "module_info": "minimal",
                "inputs_info": "minimal",
                "outputs_info": "full",
            },
        },
        "internal": {
            "pre_run": {"internal_modules": True},
            "post_run": {"internal_modules": True},
        },
    }

    class Config:
        extra = Extra.forbid
        validate_assignment = True
        use_enum_values = True

    exc: DetailLevel = Field(
        description="How detailed to print exceptions", default=DetailLevel.MINIMAL
    )
    log_pre_run: bool = Field(
        description="Print details about a module and its inputs before running it.",
        default=True,
    )
    pre_run: PreRunMsgDetails = Field(
        description="Fine-grained settings about what to display in the pre-run message.",
        default_factory=PreRunMsgDetails,
    )
    log_post_run: bool = Field(
        description="Print details about the results of a module run.", default=True
    )
    post_run: PostRunMsgDetails = Field(
        description="Fine-grained settings aobut what to display in the post-run message.",
        default_factory=PostRunMsgDetails,
    )
Attributes
PROFILES: ClassVar[Dict[str, Any]]
exc: DetailLevel pydantic-field

How detailed to print exceptions

log_post_run: bool pydantic-field

Print details about the results of a module run.

log_pre_run: bool pydantic-field

Print details about a module and its inputs before running it.

post_run: PostRunMsgDetails pydantic-field

Fine-grained settings aobut what to display in the post-run message.

pre_run: PreRunMsgDetails pydantic-field

Fine-grained settings about what to display in the pre-run message.

Config
Source code in kiara/utils/develop/__init__.py
class Config:
    extra = Extra.forbid
    validate_assignment = True
    use_enum_values = True
extra
use_enum_values
validate_assignment
KiaraDevSettings (BaseSettings) pydantic-model
Source code in kiara/utils/develop/__init__.py
class KiaraDevSettings(BaseSettings):
    class Config:

        extra = Extra.forbid
        validate_assignment = True
        env_prefix = "dev_"
        use_enum_values = True
        env_nested_delimiter = "__"

        @classmethod
        def customise_sources(
            cls,
            init_settings,
            env_settings,
            file_secret_settings,
        ):
            return (
                init_settings,
                profile_settings_source,
                dev_config_file_settings_source,
                env_settings,
            )

    log: KiaraDevLogSettings = Field(
        description="Settings about what messages to print in 'develop' mode, and what details to include.",
        default_factory=KiaraDevLogSettings,
    )
    job_cache: bool = Field(
        description="Whether to always disable the job cache (ignores the runtime_job_cache setting in the kiara configuration).",
        default=True,
    )

    def create_renderable(self, **render_config: Any):
        from kiara.utils.output import create_recursive_table_from_model_object

        return create_recursive_table_from_model_object(
            self, render_config=render_config
        )
Attributes
job_cache: bool pydantic-field

Whether to always disable the job cache (ignores the runtime_job_cache setting in the kiara configuration).

log: KiaraDevLogSettings pydantic-field

Settings about what messages to print in 'develop' mode, and what details to include.

Config
Source code in kiara/utils/develop/__init__.py
class Config:

    extra = Extra.forbid
    validate_assignment = True
    env_prefix = "dev_"
    use_enum_values = True
    env_nested_delimiter = "__"

    @classmethod
    def customise_sources(
        cls,
        init_settings,
        env_settings,
        file_secret_settings,
    ):
        return (
            init_settings,
            profile_settings_source,
            dev_config_file_settings_source,
            env_settings,
        )
env_nested_delimiter
env_prefix
extra
use_enum_values
validate_assignment
customise_sources(init_settings, env_settings, file_secret_settings) classmethod
Source code in kiara/utils/develop/__init__.py
@classmethod
def customise_sources(
    cls,
    init_settings,
    env_settings,
    file_secret_settings,
):
    return (
        init_settings,
        profile_settings_source,
        dev_config_file_settings_source,
        env_settings,
    )
create_renderable(self, **render_config)
Source code in kiara/utils/develop/__init__.py
def create_renderable(self, **render_config: Any):
    from kiara.utils.output import create_recursive_table_from_model_object

    return create_recursive_table_from_model_object(
        self, render_config=render_config
    )
PostRunMsgDetails (BaseModel) pydantic-model
Source code in kiara/utils/develop/__init__.py
class PostRunMsgDetails(BaseModel):
    class Config:
        extra = Extra.forbid
        validate_assignment = True
        use_enum_values = True

    pipeline_steps: bool = Field(
        description="Whether to also display information for modules that are run as part of a pipeline",
        default=False,
    )
    module_info: DetailLevel = Field(
        description="Whether to display details about the module that was run.",
        default=DetailLevel.NONE,
    )
    internal_modules: bool = Field(
        description="Whether to also print details about runs of internal module.",
        default=False,
    )
    inputs_info: DetailLevel = Field(
        description="Whether to display details about the run inputs.",
        default=DetailLevel.NONE,
    )
    outputs_info: DetailLevel = Field(
        description="Whether to display details about the run outputs.",
        default=DetailLevel.MINIMAL,
    )
Attributes
inputs_info: DetailLevel pydantic-field

Whether to display details about the run inputs.

internal_modules: bool pydantic-field

Whether to also print details about runs of internal module.

module_info: DetailLevel pydantic-field

Whether to display details about the module that was run.

outputs_info: DetailLevel pydantic-field

Whether to display details about the run outputs.

pipeline_steps: bool pydantic-field

Whether to also display information for modules that are run as part of a pipeline

Config
Source code in kiara/utils/develop/__init__.py
class Config:
    extra = Extra.forbid
    validate_assignment = True
    use_enum_values = True
extra
use_enum_values
validate_assignment
PreRunMsgDetails (BaseModel) pydantic-model
Source code in kiara/utils/develop/__init__.py
class PreRunMsgDetails(BaseModel):
    class Config:
        extra = Extra.forbid
        validate_assignment = True
        use_enum_values = True

    pipeline_steps: bool = Field(
        description="Whether to also display information for modules that are run as part of a pipeline.",
        default=False,
    )
    module_info: DetailLevel = Field(
        description="Whether to display details about the module to be run.",
        default=DetailLevel.MINIMAL,
    )
    internal_modules: bool = Field(
        description="Whether to also print details about runs of internal modules.",
        default=False,
    )
    inputs_info: DetailLevel = Field(
        description="Whether to display details about the run inputs.",
        default=DetailLevel.MINIMAL,
    )
Attributes
inputs_info: DetailLevel pydantic-field

Whether to display details about the run inputs.

internal_modules: bool pydantic-field

Whether to also print details about runs of internal modules.

module_info: DetailLevel pydantic-field

Whether to display details about the module to be run.

pipeline_steps: bool pydantic-field

Whether to also display information for modules that are run as part of a pipeline.

Config
Source code in kiara/utils/develop/__init__.py
class Config:
    extra = Extra.forbid
    validate_assignment = True
    use_enum_values = True
extra
use_enum_values
validate_assignment

Functions

dev_config_file_settings_source(settings)

A simple settings source that loads variables from a JSON file at the project's root.

Here we happen to choose to use the env_file_encoding from Config when reading config.json

Source code in kiara/utils/develop/__init__.py
def dev_config_file_settings_source(settings: BaseSettings) -> Dict[str, Any]:
    """
    A simple settings source that loads variables from a JSON file
    at the project's root.

    Here we happen to choose to use the `env_file_encoding` from Config
    when reading `config.json`
    """

    if os.path.exists(KIARA_DEV_CONFIG_FILE):
        dev_config = get_data_from_file(KIARA_DEV_CONFIG_FILE)
    else:
        dev_config = {}
    return dev_config
log_dev_message(msg, title=None)
Source code in kiara/utils/develop/__init__.py
def log_dev_message(msg: RenderableType, title: Union[str, None] = None):

    if not is_develop():
        return

    if not title:
        title = "Develop-mode message"
    panel = Panel(Group("", msg), title=f"[yellow]{title}[/yellow]", title_align="left")

    from kiara.utils.cli import terminal_print

    terminal_print(panel)
profile_settings_source(settings)
Source code in kiara/utils/develop/__init__.py
def profile_settings_source(settings: BaseSettings) -> Dict[str, Any]:

    profile_name = os.environ.get("DEVELOP", None)
    if not profile_name:
        profile_name = os.environ.get("develop", None)
    if not profile_name:
        profile_name = os.environ.get("DEV", None)
    if not profile_name:
        profile_name = os.environ.get("dev", None)
    if not profile_name:
        profile_name = os.environ.get("DEV_PROFILE", None)
    if not profile_name:
        profile_name = os.environ.get("dev_profile", None)

    result: Dict[str, Any] = {}
    if not profile_name:
        return result

    profile_name = profile_name.lower()

    from pydantic.fields import ModelField

    model: ModelField

    for model in KiaraDevSettings.__fields__.values():
        if not issubclass(model.type_, BaseModel):
            continue

        profiles = getattr(model.type_, "PROFILES", None)
        if not profiles:
            continue

        p = profiles.get(profile_name, None)
        if not p:
            continue
        result[model.name] = p

    return result

dicts

merge_dicts(*dicts)
Source code in kiara/utils/dicts.py
def merge_dicts(*dicts: Mapping[str, Any]) -> Dict[str, Any]:

    if not dicts:
        return {}

    current: Dict[str, Any] = {}
    for d in dicts:
        dpath.util.merge(current, copy.deepcopy(d))

    return current

doc

extract_doc_from_cls(cls, only_first_line=False)
Source code in kiara/utils/doc.py
def extract_doc_from_cls(cls: typing.Type, only_first_line: bool = False):

    doc = cls.__doc__
    if not doc:
        doc = DEFAULT_NO_DESC_VALUE
    else:
        doc = cleandoc(doc)

    if only_first_line:
        return first_line(doc)
    else:
        return doc.strip()

files

yaml
get_data_from_file(path, content_type=None)
Source code in kiara/utils/files.py
def get_data_from_file(
    path: Union[str, Path], content_type: Union[str, None] = None
) -> Any:

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    content = path.read_text()

    if content_type:
        assert content_type in ["json", "yaml"]
    else:
        if path.name.endswith(".json"):
            content_type = "json"
        elif path.name.endswith(".yaml") or path.name.endswith(".yml"):
            content_type = "yaml"
        else:
            raise ValueError(
                "Invalid data format, only 'json' or 'yaml' are supported currently."
            )

    if content_type == "json":
        data = json.loads(content)
    else:
        data = yaml.load(content)

    return data

global_metadata

get_metadata_for_python_module_or_class(module_or_class)
Source code in kiara/utils/global_metadata.py
@lru_cache()
def get_metadata_for_python_module_or_class(
    module_or_class: typing.Union[ModuleType, typing.Type]
) -> typing.List[typing.Dict[str, typing.Any]]:

    metadata: typing.List[typing.Dict[str, typing.Any]] = []

    if isinstance(module_or_class, type):
        if hasattr(module_or_class, KIARA_MODULE_METADATA_ATTRIBUTE):
            md = getattr(module_or_class, KIARA_MODULE_METADATA_ATTRIBUTE)
            assert isinstance(md, typing.Mapping)
            metadata.append(md)  # type: ignore
        _module_or_class: typing.Union[
            str, ModuleType, typing.Type
        ] = module_or_class.__module__
    else:
        _module_or_class = module_or_class

    current_module = _module_or_class
    while current_module:

        if isinstance(current_module, str):
            current_module = importlib.import_module(current_module)

        if hasattr(current_module, KIARA_MODULE_METADATA_ATTRIBUTE):
            md = getattr(current_module, KIARA_MODULE_METADATA_ATTRIBUTE)
            assert isinstance(md, typing.Mapping)
            metadata.append(md)  # type: ignore

        if "." in current_module.__name__:
            current_module = ".".join(current_module.__name__.split(".")[0:-1])
        else:
            current_module = ""

    metadata.reverse()
    return metadata

graphs

print_ascii_graph(graph)
Source code in kiara/utils/graphs.py
def print_ascii_graph(graph: nx.Graph):

    try:
        from asciinet import graph_to_ascii
    except:  # noqa
        print(
            "\nCan't print graph on terminal, package 'asciinet' not available. Please install it into the current virtualenv using:\n\npip install 'git+https://github.com/cosminbasca/asciinet.git#egg=asciinet&subdirectory=pyasciinet'"
        )
        return

    try:
        from asciinet._libutil import check_java

        check_java("Java ")
    except Exception as e:
        print(e)
        print(
            "\nJava is currently necessary to print ascii graph. This might change in the future, but to use this functionality please install a JRE."
        )
        return

    print(graph_to_ascii(graph))

hashfs special

HashFS is a content-addressable file management system. What does that mean? Simply, that HashFS manages a directory where files are saved based on the file's hash.

Typical use cases for this kind of system are ones where:

  • Files are written once and never change (e.g. image storage).
  • It's desirable to have no duplicate files (e.g. user uploads).
  • File metadata is stored elsewhere (e.g. in a database).

Adapted from: https://github.com/dgilland/hashfs

License

The MIT License (MIT)

Copyright (c) 2015, Derrick Gilland

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Classes

HashAddress (HashAddress)

File address containing file's path on disk and it's content hash ID.

Attributes:

Name Type Description
id str

Hash ID (hexdigest) of file contents.

relpath str

Relative path location to :attr:HashFS.root.

abspath str

Absoluate path location of file on disk.

is_duplicate boolean

Whether the hash address created was a duplicate of a previously existing file. Can only be True after a put operation. Defaults to False.

Source code in kiara/utils/hashfs/__init__.py
class HashAddress(
    namedtuple("HashAddress", ["id", "relpath", "abspath", "is_duplicate"])
):
    """File address containing file's path on disk and it's content hash ID.

    Attributes:
        id (str): Hash ID (hexdigest) of file contents.
        relpath (str): Relative path location to :attr:`HashFS.root`.
        abspath (str): Absoluate path location of file on disk.
        is_duplicate (boolean, optional): Whether the hash address created was
            a duplicate of a previously existing file. Can only be ``True``
            after a put operation. Defaults to ``False``.
    """

    def __new__(cls, id, relpath, abspath, is_duplicate=False):
        return super(HashAddress, cls).__new__(cls, id, relpath, abspath, is_duplicate)  # type: ignore
HashFS

Content addressable file manager.

Attributes:

Name Type Description
root str

Directory path used as root of storage space.

depth int

Depth of subfolders to create when saving a file.

width int

Width of each subfolder to create when saving a file.

algorithm str

Hash algorithm to use when computing file hash. Algorithm should be available in hashlib module. Defaults to 'sha256'.

fmode int

File mode permission to set when adding files to directory. Defaults to 0o664 which allows owner/group to read/write and everyone else to read.

dmode int

Directory mode permission to set for subdirectories. Defaults to 0o755 which allows owner/group to read/write and everyone else to read and everyone to execute.

Source code in kiara/utils/hashfs/__init__.py
class HashFS(object):
    """Content addressable file manager.

    Attributes:
        root (str): Directory path used as root of storage space.
        depth (int, optional): Depth of subfolders to create when saving a
            file.
        width (int, optional): Width of each subfolder to create when saving a
            file.
        algorithm (str): Hash algorithm to use when computing file hash.
            Algorithm should be available in ``hashlib`` module. Defaults to
            ``'sha256'``.
        fmode (int, optional): File mode permission to set when adding files to
            directory. Defaults to ``0o664`` which allows owner/group to
            read/write and everyone else to read.
        dmode (int, optional): Directory mode permission to set for
            subdirectories. Defaults to ``0o755`` which allows owner/group to
            read/write and everyone else to read and everyone to execute.
    """

    def __init__(
        self,
        root: str,
        depth: int = 4,
        width: int = 1,
        algorithm: str = "sha256",
        fmode=0o664,
        dmode=0o755,
    ):
        self.root: str = os.path.realpath(root)
        self.depth: int = depth
        self.width: int = width
        self.algorithm: str = algorithm
        self.fmode = fmode
        self.dmode = dmode

    def put(self, file: BinaryIO) -> "HashAddress":
        """Store contents of `file` on disk using its content hash for the
        address.

        Args:
            file (mixed): Readable object or path to file.

        Returns:
            HashAddress: File's hash address.
        """
        stream = Stream(file)

        with closing(stream):
            id = self.computehash(stream)
            filepath, is_duplicate = self._copy(stream, id)

        return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)

    def put_with_precomputed_hash(
        self, file: Union[str, Path, BinaryIO], hash_id: str
    ) -> "HashAddress":

        stream = Stream(file)
        with closing(stream):
            filepath, is_duplicate = self._copy(stream=stream, id=hash_id)

        return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)

    def _copy(self, stream: "Stream", id: str):
        """Copy the contents of `stream` onto disk with an optional file
        extension appended. The copy process uses a temporary file to store the
        initial contents and then moves that file to it's final location.
        """

        filepath = self.idpath(id)

        if not os.path.isfile(filepath):
            # Only move file if it doesn't already exist.
            is_duplicate = False
            fname = self._mktempfile(stream)
            self.makepath(os.path.dirname(filepath))
            shutil.move(fname, filepath)
        else:
            is_duplicate = True

        return (filepath, is_duplicate)

    def _mktempfile(self, stream):
        """Create a named temporary file from a :class:`Stream` object and
        return its filename.
        """
        tmp = NamedTemporaryFile(delete=False)

        if self.fmode is not None:
            oldmask = os.umask(0)

            try:
                os.chmod(tmp.name, self.fmode)
            finally:
                os.umask(oldmask)

        for data in stream:
            tmp.write(to_bytes(data))

        tmp.close()

        return tmp.name

    def get(self, file):
        """Return :class:`HashAdress` from given id or path. If `file` does not
        refer to a valid file, then ``None`` is returned.

        Args:
            file (str): Address ID or path of file.

        Returns:
            HashAddress: File's hash address.
        """
        realpath = self.realpath(file)

        if realpath is None:
            return None
        else:
            return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)

    def open(self, file, mode="rb"):
        """Return open buffer object from given id or path.

        Args:
            file (str): Address ID or path of file.
            mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

        Returns:
            Buffer: An ``io`` buffer dependent on the `mode`.

        Raises:
            IOError: If file doesn't exist.
        """
        realpath = self.realpath(file)
        if realpath is None:
            raise IOError("Could not locate file: {0}".format(file))

        return io.open(realpath, mode)

    def delete(self, file):
        """Delete file using id or path. Remove any empty directories after
        deleting. No exception is raised if file doesn't exist.

        Args:
            file (str): Address ID or path of file.
        """
        realpath = self.realpath(file)
        if realpath is None:
            return

        try:
            os.remove(realpath)
        except OSError:  # pragma: no cover
            pass
        else:
            self.remove_empty(os.path.dirname(realpath))

    def remove_empty(self, subpath):
        """Successively remove all empty folders starting with `subpath` and
        proceeding "up" through directory tree until reaching the :attr:`root`
        folder.
        """
        # Don't attempt to remove any folders if subpath is not a
        # subdirectory of the root directory.
        if not self.haspath(subpath):
            return

        while subpath != self.root:
            if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
                break
            os.rmdir(subpath)
            subpath = os.path.dirname(subpath)

    def files(self):
        """Return generator that yields all files in the :attr:`root`
        directory.
        """
        for folder, subfolders, files in walk(self.root):
            for file in files:
                yield os.path.abspath(os.path.join(folder, file))

    def folders(self):
        """Return generator that yields all folders in the :attr:`root`
        directory that contain files.
        """
        for folder, subfolders, files in walk(self.root):
            if files:
                yield folder

    def count(self):
        """Return count of the number of files in the :attr:`root` directory."""
        count = 0
        for _ in self:
            count += 1
        return count

    def size(self):
        """Return the total size in bytes of all files in the :attr:`root`
        directory.
        """
        total = 0

        for path in self.files():
            total += os.path.getsize(path)

        return total

    def exists(self, file):
        """Check whether a given file id or path exists on disk."""
        return bool(self.realpath(file))

    def haspath(self, path):
        """Return whether `path` is a subdirectory of the :attr:`root`
        directory.
        """
        return issubdir(path, self.root)

    def makepath(self, path):
        """Physically create the folder path on disk."""
        try:
            os.makedirs(path, self.dmode)
        except FileExistsError:
            assert os.path.isdir(path), "expected {} to be a directory".format(path)

    def relpath(self, path):
        """Return `path` relative to the :attr:`root` directory."""
        return os.path.relpath(path, self.root)

    def realpath(self, file):
        """Attempt to determine the real path of a file id or path through
        successive checking of candidate paths. If the real path is stored with
        an extension, the path is considered a match if the basename matches
        the expected file path of the id.
        """
        # Check for absoluate path.
        if os.path.isfile(file):
            return file

        # Check for relative path.
        relpath = os.path.join(self.root, file)
        if os.path.isfile(relpath):
            return relpath

        # Check for sharded path.
        filepath = self.idpath(file)
        if os.path.isfile(filepath):
            return filepath

        # Check for sharded path with any extension.
        paths = glob.glob("{0}.*".format(filepath))
        if paths:
            return paths[0]

        # Could not determine a match.
        return None

    def idpath(self, id):
        """Build the file path for a given hash id. Optionally, append a
        file extension.
        """
        paths = self.shard(id)

        return os.path.join(self.root, *paths)

    def computehash(self, stream) -> str:
        """Compute hash of file using :attr:`algorithm`."""
        hashobj = hashlib.new(self.algorithm)
        for data in stream:
            hashobj.update(to_bytes(data))
        return hashobj.hexdigest()

    def shard(self, id):
        """Shard content ID into subfolders."""
        return shard(id, self.depth, self.width)

    def unshard(self, path):
        """Unshard path to determine hash value."""
        if not self.haspath(path):
            raise ValueError(
                "Cannot unshard path. The path {0!r} is not "
                "a subdirectory of the root directory {1!r}".format(path, self.root)
            )

        return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")

    def repair(self):
        """Repair any file locations whose content address doesn't match it's
        file path.
        """
        repaired = []
        corrupted = tuple(self.corrupted())
        oldmask = os.umask(0)

        try:
            for path, address in corrupted:
                if os.path.isfile(address.abspath):
                    # File already exists so just delete corrupted path.
                    os.remove(path)
                else:
                    # File doesn't exists so move it.
                    self.makepath(os.path.dirname(address.abspath))
                    shutil.move(path, address.abspath)

                os.chmod(address.abspath, self.fmode)
                repaired.append((path, address))
        finally:
            os.umask(oldmask)

        return repaired

    def corrupted(self):
        """Return generator that yields corrupted files as ``(path, address)``
        where ``path`` is the path of the corrupted file and ``address`` is
        the :class:`HashAddress` of the expected location.
        """
        for path in self.files():
            stream = Stream(path)

            with closing(stream):
                id = self.computehash(stream)

            expected_path = self.idpath(id)

            if expected_path != path:
                yield (
                    path,
                    HashAddress(id, self.relpath(expected_path), expected_path),
                )

    def __contains__(self, file):
        """Return whether a given file id or path is contained in the
        :attr:`root` directory.
        """
        return self.exists(file)

    def __iter__(self):
        """Iterate over all files in the :attr:`root` directory."""
        return self.files()

    def __len__(self):
        """Return count of the number of files in the :attr:`root` directory."""
        return self.count()
Methods
computehash(self, stream)

Compute hash of file using :attr:algorithm.

Source code in kiara/utils/hashfs/__init__.py
def computehash(self, stream) -> str:
    """Compute hash of file using :attr:`algorithm`."""
    hashobj = hashlib.new(self.algorithm)
    for data in stream:
        hashobj.update(to_bytes(data))
    return hashobj.hexdigest()
corrupted(self)

Return generator that yields corrupted files as (path, address) where path is the path of the corrupted file and address is the :class:HashAddress of the expected location.

Source code in kiara/utils/hashfs/__init__.py
def corrupted(self):
    """Return generator that yields corrupted files as ``(path, address)``
    where ``path`` is the path of the corrupted file and ``address`` is
    the :class:`HashAddress` of the expected location.
    """
    for path in self.files():
        stream = Stream(path)

        with closing(stream):
            id = self.computehash(stream)

        expected_path = self.idpath(id)

        if expected_path != path:
            yield (
                path,
                HashAddress(id, self.relpath(expected_path), expected_path),
            )
count(self)

Return count of the number of files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
def count(self):
    """Return count of the number of files in the :attr:`root` directory."""
    count = 0
    for _ in self:
        count += 1
    return count
delete(self, file)

Delete file using id or path. Remove any empty directories after deleting. No exception is raised if file doesn't exist.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
Source code in kiara/utils/hashfs/__init__.py
def delete(self, file):
    """Delete file using id or path. Remove any empty directories after
    deleting. No exception is raised if file doesn't exist.

    Args:
        file (str): Address ID or path of file.
    """
    realpath = self.realpath(file)
    if realpath is None:
        return

    try:
        os.remove(realpath)
    except OSError:  # pragma: no cover
        pass
    else:
        self.remove_empty(os.path.dirname(realpath))
exists(self, file)

Check whether a given file id or path exists on disk.

Source code in kiara/utils/hashfs/__init__.py
def exists(self, file):
    """Check whether a given file id or path exists on disk."""
    return bool(self.realpath(file))
files(self)

Return generator that yields all files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
def files(self):
    """Return generator that yields all files in the :attr:`root`
    directory.
    """
    for folder, subfolders, files in walk(self.root):
        for file in files:
            yield os.path.abspath(os.path.join(folder, file))
folders(self)

Return generator that yields all folders in the :attr:root directory that contain files.

Source code in kiara/utils/hashfs/__init__.py
def folders(self):
    """Return generator that yields all folders in the :attr:`root`
    directory that contain files.
    """
    for folder, subfolders, files in walk(self.root):
        if files:
            yield folder
get(self, file)

Return :class:HashAdress from given id or path. If file does not refer to a valid file, then None is returned.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required

Returns:

Type Description
HashAddress

File's hash address.

Source code in kiara/utils/hashfs/__init__.py
def get(self, file):
    """Return :class:`HashAdress` from given id or path. If `file` does not
    refer to a valid file, then ``None`` is returned.

    Args:
        file (str): Address ID or path of file.

    Returns:
        HashAddress: File's hash address.
    """
    realpath = self.realpath(file)

    if realpath is None:
        return None
    else:
        return HashAddress(self.unshard(realpath), self.relpath(realpath), realpath)
haspath(self, path)

Return whether path is a subdirectory of the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
def haspath(self, path):
    """Return whether `path` is a subdirectory of the :attr:`root`
    directory.
    """
    return issubdir(path, self.root)
idpath(self, id)

Build the file path for a given hash id. Optionally, append a file extension.

Source code in kiara/utils/hashfs/__init__.py
def idpath(self, id):
    """Build the file path for a given hash id. Optionally, append a
    file extension.
    """
    paths = self.shard(id)

    return os.path.join(self.root, *paths)
makepath(self, path)

Physically create the folder path on disk.

Source code in kiara/utils/hashfs/__init__.py
def makepath(self, path):
    """Physically create the folder path on disk."""
    try:
        os.makedirs(path, self.dmode)
    except FileExistsError:
        assert os.path.isdir(path), "expected {} to be a directory".format(path)
open(self, file, mode='rb')

Return open buffer object from given id or path.

Parameters:

Name Type Description Default
file str

Address ID or path of file.

required
mode str

Mode to open file in. Defaults to 'rb'.

'rb'

Returns:

Type Description
Buffer

An io buffer dependent on the mode.

Exceptions:

Type Description
IOError

If file doesn't exist.

Source code in kiara/utils/hashfs/__init__.py
def open(self, file, mode="rb"):
    """Return open buffer object from given id or path.

    Args:
        file (str): Address ID or path of file.
        mode (str, optional): Mode to open file in. Defaults to ``'rb'``.

    Returns:
        Buffer: An ``io`` buffer dependent on the `mode`.

    Raises:
        IOError: If file doesn't exist.
    """
    realpath = self.realpath(file)
    if realpath is None:
        raise IOError("Could not locate file: {0}".format(file))

    return io.open(realpath, mode)
put(self, file)

Store contents of file on disk using its content hash for the address.

Parameters:

Name Type Description Default
file mixed

Readable object or path to file.

required

Returns:

Type Description
HashAddress

File's hash address.

Source code in kiara/utils/hashfs/__init__.py
def put(self, file: BinaryIO) -> "HashAddress":
    """Store contents of `file` on disk using its content hash for the
    address.

    Args:
        file (mixed): Readable object or path to file.

    Returns:
        HashAddress: File's hash address.
    """
    stream = Stream(file)

    with closing(stream):
        id = self.computehash(stream)
        filepath, is_duplicate = self._copy(stream, id)

    return HashAddress(id, self.relpath(filepath), filepath, is_duplicate)
put_with_precomputed_hash(self, file, hash_id)
Source code in kiara/utils/hashfs/__init__.py
def put_with_precomputed_hash(
    self, file: Union[str, Path, BinaryIO], hash_id: str
) -> "HashAddress":

    stream = Stream(file)
    with closing(stream):
        filepath, is_duplicate = self._copy(stream=stream, id=hash_id)

    return HashAddress(hash_id, self.relpath(filepath), filepath, is_duplicate)
realpath(self, file)

Attempt to determine the real path of a file id or path through successive checking of candidate paths. If the real path is stored with an extension, the path is considered a match if the basename matches the expected file path of the id.

Source code in kiara/utils/hashfs/__init__.py
def realpath(self, file):
    """Attempt to determine the real path of a file id or path through
    successive checking of candidate paths. If the real path is stored with
    an extension, the path is considered a match if the basename matches
    the expected file path of the id.
    """
    # Check for absoluate path.
    if os.path.isfile(file):
        return file

    # Check for relative path.
    relpath = os.path.join(self.root, file)
    if os.path.isfile(relpath):
        return relpath

    # Check for sharded path.
    filepath = self.idpath(file)
    if os.path.isfile(filepath):
        return filepath

    # Check for sharded path with any extension.
    paths = glob.glob("{0}.*".format(filepath))
    if paths:
        return paths[0]

    # Could not determine a match.
    return None
relpath(self, path)

Return path relative to the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
def relpath(self, path):
    """Return `path` relative to the :attr:`root` directory."""
    return os.path.relpath(path, self.root)
remove_empty(self, subpath)

Successively remove all empty folders starting with subpath and proceeding "up" through directory tree until reaching the :attr:root folder.

Source code in kiara/utils/hashfs/__init__.py
def remove_empty(self, subpath):
    """Successively remove all empty folders starting with `subpath` and
    proceeding "up" through directory tree until reaching the :attr:`root`
    folder.
    """
    # Don't attempt to remove any folders if subpath is not a
    # subdirectory of the root directory.
    if not self.haspath(subpath):
        return

    while subpath != self.root:
        if len(os.listdir(subpath)) > 0 or os.path.islink(subpath):
            break
        os.rmdir(subpath)
        subpath = os.path.dirname(subpath)
repair(self)

Repair any file locations whose content address doesn't match it's file path.

Source code in kiara/utils/hashfs/__init__.py
def repair(self):
    """Repair any file locations whose content address doesn't match it's
    file path.
    """
    repaired = []
    corrupted = tuple(self.corrupted())
    oldmask = os.umask(0)

    try:
        for path, address in corrupted:
            if os.path.isfile(address.abspath):
                # File already exists so just delete corrupted path.
                os.remove(path)
            else:
                # File doesn't exists so move it.
                self.makepath(os.path.dirname(address.abspath))
                shutil.move(path, address.abspath)

            os.chmod(address.abspath, self.fmode)
            repaired.append((path, address))
    finally:
        os.umask(oldmask)

    return repaired
shard(self, id)

Shard content ID into subfolders.

Source code in kiara/utils/hashfs/__init__.py
def shard(self, id):
    """Shard content ID into subfolders."""
    return shard(id, self.depth, self.width)
size(self)

Return the total size in bytes of all files in the :attr:root directory.

Source code in kiara/utils/hashfs/__init__.py
def size(self):
    """Return the total size in bytes of all files in the :attr:`root`
    directory.
    """
    total = 0

    for path in self.files():
        total += os.path.getsize(path)

    return total
unshard(self, path)

Unshard path to determine hash value.

Source code in kiara/utils/hashfs/__init__.py
def unshard(self, path):
    """Unshard path to determine hash value."""
    if not self.haspath(path):
        raise ValueError(
            "Cannot unshard path. The path {0!r} is not "
            "a subdirectory of the root directory {1!r}".format(path, self.root)
        )

    return os.path.splitext(self.relpath(path))[0].replace(os.sep, "")
Stream

Common interface for file-like objects.

The input obj can be a file-like object or a path to a file. If obj is a path to a file, then it will be opened until :meth:close is called. If obj is a file-like object, then it's original position will be restored when :meth:close is called instead of closing the object automatically. Closing of the stream is deferred to whatever process passed the stream in.

Successive readings of the stream is supported without having to manually set it's position back to 0.

Source code in kiara/utils/hashfs/__init__.py
class Stream(object):
    """Common interface for file-like objects.

    The input `obj` can be a file-like object or a path to a file. If `obj` is
    a path to a file, then it will be opened until :meth:`close` is called.
    If `obj` is a file-like object, then it's original position will be
    restored when :meth:`close` is called instead of closing the object
    automatically. Closing of the stream is deferred to whatever process passed
    the stream in.

    Successive readings of the stream is supported without having to manually
    set it's position back to ``0``.
    """

    def __init__(self, obj: Union[BinaryIO, str, Path]):
        if hasattr(obj, "read"):
            pos = obj.tell()  # type: ignore
        elif os.path.isfile(obj):  # type: ignore
            obj = io.open(obj, "rb")  # type: ignore
            pos = None
        else:
            raise ValueError("Object must be a valid file path or a readable object")

        try:
            file_stat = os.stat(obj.name)  # type: ignore
            buffer_size = file_stat.st_blksize
        except Exception:
            buffer_size = 8192

        self._obj = obj
        self._pos = pos
        self._buffer_size = buffer_size

    def __iter__(self):
        """Read underlying IO object and yield results. Return object to
        original position if we didn't open it originally.
        """
        self._obj.seek(0)

        while True:
            data = self._obj.read(self._buffer_size)

            if not data:
                break

            yield data

        if self._pos is not None:
            self._obj.seek(self._pos)

    def close(self):
        """Close underlying IO object if we opened it, else return it to
        original position.
        """
        if self._pos is None:
            self._obj.close()
        else:
            self._obj.seek(self._pos)
Methods
close(self)

Close underlying IO object if we opened it, else return it to original position.

Source code in kiara/utils/hashfs/__init__.py
def close(self):
    """Close underlying IO object if we opened it, else return it to
    original position.
    """
    if self._pos is None:
        self._obj.close()
    else:
        self._obj.seek(self._pos)

Functions

compact(items)

Return only truthy elements of items.

Source code in kiara/utils/hashfs/__init__.py
def compact(items: List[Any]) -> List[Any]:
    """Return only truthy elements of `items`."""
    return [item for item in items if item]
issubdir(subpath, path)

Return whether subpath is a sub-directory of path.

Source code in kiara/utils/hashfs/__init__.py
def issubdir(subpath: str, path: str):
    """Return whether `subpath` is a sub-directory of `path`."""
    # Append os.sep so that paths like /usr/var2/log doesn't match /usr/var.
    path = os.path.realpath(path) + os.sep
    subpath = os.path.realpath(subpath)
    return subpath.startswith(path)
shard(digest, depth, width)
Source code in kiara/utils/hashfs/__init__.py
def shard(digest, depth, width):
    # This creates a list of `depth` number of tokens with width
    # `width` from the first part of the id plus the remainder.
    return compact(
        [digest[i * width : width * (i + 1)] for i in range(depth)]  # noqa
        + [digest[depth * width :]]  # noqa
    )
to_bytes(text)
Source code in kiara/utils/hashfs/__init__.py
def to_bytes(text: Union[str, bytes]):
    if not isinstance(text, bytes):
        text = bytes(text, "utf8")
    return text

hashing

NONE_CID
compute_cid(data, hash_codec='sha2-256', encode='base58btc')
Source code in kiara/utils/hashing.py
def compute_cid(
    data: EncodableType,
    hash_codec: str = "sha2-256",
    encode: str = "base58btc",
) -> Tuple[bytes, CID]:

    encoded = dag_cbor.encode(data)
    hash_func = multihash.get(hash_codec)
    digest = hash_func.digest(encoded)

    cid = CID(encode, 1, codec="dag-cbor", digest=digest)
    return encoded, cid
compute_cid_from_file(file, codec='raw', hash_codec='sha2-256')
Source code in kiara/utils/hashing.py
def compute_cid_from_file(
    file: str, codec: Union[str, int, Multicodec] = "raw", hash_codec: str = "sha2-256"
):

    assert hash_codec == "sha2-256"

    hash_func = hashlib.sha256
    file_hash = hash_func()

    CHUNK_SIZE = 65536
    with open(file, "rb") as f:
        fb = f.read(CHUNK_SIZE)
        while len(fb) > 0:
            file_hash.update(fb)
            fb = f.read(CHUNK_SIZE)

    wrapped = multihash.wrap(file_hash.digest(), "sha2-256")
    return create_cid_digest(digest=wrapped, codec=codec)
create_cid_digest(digest, codec='raw')
Source code in kiara/utils/hashing.py
def create_cid_digest(
    digest: Union[
        str, BytesLike, Tuple[Union[str, int, Multihash], Union[str, BytesLike]]
    ],
    codec: Union[str, int, Multicodec] = "raw",
) -> CID:

    cid = CID("base58btc", 1, codec, digest)
    return cid

json

orjson_dumps(v, *, default=None, **args)
Source code in kiara/utils/json.py
def orjson_dumps(v, *, default=None, **args):
    # orjson.dumps returns bytes, to match standard json.dumps we need to decode

    try:
        return orjson.dumps(v, default=default, **args).decode()
    except Exception as e:
        if is_debug():
            print(f"Error dumping json data: {e}")
            from kiara import dbg

            dbg(v)

        raise e

jupyter

create_image(graph)
Source code in kiara/utils/jupyter.py
def create_image(graph: nx.Graph):

    try:
        import pygraphviz as pgv  # noqa
    except:  # noqa
        return "pygraphviz not available, please install it manually into the current virtualenv"

    # graph = nx.relabel_nodes(graph, lambda x: hash(x))
    G = nx.nx_agraph.to_agraph(graph)

    G.node_attr["shape"] = "box"
    # G.unflatten().layout(prog="dot")
    G.layout(prog="dot")

    b = G.draw(format="png")
    return b
graph_to_image(graph, return_bytes=False)
Source code in kiara/utils/jupyter.py
def graph_to_image(graph: nx.Graph, return_bytes: bool = False):
    b = create_image(graph=graph)
    if return_bytes:
        return b
    else:
        return Image(b)
save_image(graph, path)
Source code in kiara/utils/jupyter.py
def save_image(graph: nx.Graph, path: str):

    graph_b = create_image(graph=graph)
    with open(path, "wb") as f:
        f.write(graph_b)

metadata

find_metadata_models(alias=None, only_for_package=None)
Source code in kiara/utils/metadata.py
def find_metadata_models(
    alias: Union[str, None] = None, only_for_package: Union[str, None] = None
) -> MetadataTypeClassesInfo:

    model_registry = ModelRegistry.instance()
    _group = model_registry.get_models_of_type(ValueMetadata)  # type: ignore

    classes: Dict[str, Type[ValueMetadata]] = {}
    for model_id, info in _group.items():
        classes[model_id] = info.python_class.get_class()  # type: ignore

    group: MetadataTypeClassesInfo = MetadataTypeClassesInfo.create_from_type_items(group_alias=alias, **classes)  # type: ignore

    if only_for_package:
        temp = {}
        for key, info in group.items():
            if info.context.labels.get("package") == only_for_package:
                temp[key] = info

        group = MetadataTypeClassesInfo.construct(
            group_id=group.instance_id, group_alias=group.group_alias, item_infos=temp  # type: ignore
        )

    return group

models

Functions

assemble_subcomponent_tree(data)
Source code in kiara/utils/models.py
def assemble_subcomponent_tree(data: "KiaraModel") -> Union[nx.DiGraph, None]:

    graph = nx.DiGraph()

    def assemble_tree(info_model: KiaraModel, current_node_id, level: int = 0):
        graph.add_node(current_node_id, obj=info_model, level=level)
        scn = info_model.subcomponent_keys
        if not scn:
            return
        for child_path in scn:
            child_obj = info_model.get_subcomponent(child_path)
            new_node_id = f"{current_node_id}.{child_path}"
            graph.add_edge(current_node_id, new_node_id)
            assemble_tree(child_obj, new_node_id, level + 1)

    assemble_tree(data, KIARA_DEFAULT_ROOT_NODE_ID)
    return graph
create_pydantic_model(model_cls, _use_pydantic_construct=False, **field_values)
Source code in kiara/utils/models.py
def create_pydantic_model(
    model_cls: Type[BaseModel],
    _use_pydantic_construct: bool = PYDANTIC_USE_CONSTRUCT,
    **field_values: Any,
):

    if _use_pydantic_construct:
        raise NotImplementedError()
        return model_cls.construct(**field_values)
    else:
        return model_cls(**field_values)
get_subcomponent_from_model(data, path)

Return subcomponents of a model under a specified path.

Source code in kiara/utils/models.py
def get_subcomponent_from_model(data: "KiaraModel", path: str) -> "KiaraModel":
    """Return subcomponents of a model under a specified path."""

    if "." in path:
        first_token, rest = path.split(".", maxsplit=1)
        sc = data.get_subcomponent(first_token)
        return sc.get_subcomponent(rest)

    if hasattr(data, "__custom_root_type__") and data.__custom_root_type__:
        if isinstance(data.__root__, Mapping):  # type: ignore
            if path in data.__root__.keys():  # type: ignore
                return data.__root__[path]  # type: ignore
            else:
                matches = {}
                for k in data.__root__.keys():  # type: ignore
                    if k.startswith(f"{path}."):
                        rest = k[len(path) + 1 :]  # noqa
                        matches[rest] = data.__root__[k]  # type: ignore

                if not matches:
                    raise KeyError(f"No child models under '{path}'.")
                else:
                    raise NotImplementedError()
                    # subcomponent_group = KiaraModelGroup.create_from_child_models(**matches)
                    # return subcomponent_group

        else:
            raise NotImplementedError()
    else:
        if path in data.__fields__.keys():
            return getattr(data, path)
        else:
            raise KeyError(
                f"No subcomponent for key '{path}' in model: {data.instance_id}."
            )
retrieve_data_subcomponent_keys(data)
Source code in kiara/utils/models.py
def retrieve_data_subcomponent_keys(data: Any) -> Iterable[str]:

    if hasattr(data, "__custom_root_type__") and data.__custom_root_type__:
        if isinstance(data.__root__, Mapping):  # type: ignore
            result = set()
            for k, v in data.__root__.items():  # type: ignore
                if isinstance(v, BaseModel):
                    result.add(k.split(".")[0])
            return result
        else:
            return []
    elif isinstance(data, BaseModel):
        matches = []
        for x in data.__fields__.keys():
            _type = data.__fields__[x].type_
            if isinstance(_type, type) and issubclass(_type, BaseModel):
                matches.append(x)
        return matches
    else:
        log_message(
            f"No subcomponents retrieval supported for data of type: {type(data)}"
        )
        return []

modules

module_config_is_empty(config)
Source code in kiara/utils/modules.py
def module_config_is_empty(config: Mapping[str, Any]):

    c = dict(config)
    d = c.pop("defaults", None)
    if d:
        return False
    constants = c.pop("constants", None)
    if constants:
        return False

    return False if c else True

operations

create_operation(module_or_operation, operation_config=None, kiara=None)
Source code in kiara/utils/operations.py
def create_operation(
    module_or_operation: str,
    operation_config: Union[None, Mapping[str, Any]] = None,
    kiara: Union[None, "Kiara"] = None,
) -> Operation:

    operation: Union[Operation, None]

    if kiara is None:
        from kiara.context import Kiara

        kiara = Kiara.instance()

    if module_or_operation in kiara.operation_registry.operation_ids:

        operation = kiara.operation_registry.get_operation(module_or_operation)
        if operation_config:
            raise Exception(
                f"Specified run target '{module_or_operation}' is an operation, additional module configuration is not allowed."
            )

    elif module_or_operation in kiara.module_type_names:

        manifest = Manifest(
            module_type=module_or_operation, module_config=operation_config
        )
        module = kiara.create_module(manifest=manifest)
        operation = Operation.create_from_module(module)

    elif os.path.isfile(module_or_operation):
        data = get_data_from_file(module_or_operation)
        pipeline_name = data.pop("pipeline_name", None)
        if pipeline_name is None:
            pipeline_name = os.path.basename(module_or_operation)

        # self._defaults = data.pop("inputs", {})

        execution_context = ExecutionContext(
            pipeline_dir=os.path.abspath(os.path.dirname(module_or_operation))
        )
        pipeline_config = PipelineConfig.from_config(
            pipeline_name=pipeline_name,
            data=data,
            kiara=kiara,
            execution_context=execution_context,
        )

        manifest = kiara.create_manifest("pipeline", config=pipeline_config.dict())
        module = kiara.create_module(manifest=manifest)
        operation = Operation.create_from_module(module, doc=pipeline_config.doc)

    else:
        raise Exception(
            f"Can't assemble operation, invalid operation/module name: {module_or_operation}. Must be registered module or operation name, or file."
        )
        # manifest = Manifest(
        #     module_type=module_or_operation,
        #     module_config=self._operation_config,
        # )
        # module = self._kiara.create_module(manifest=manifest)
        # operation = Operation.create_from_module(module=module)

    if operation is None:

        merged = set(kiara.module_type_names)
        merged.update(kiara.operation_registry.operation_ids)
        raise NoSuchExecutionTargetException(
            selected_target=module_or_operation,
            msg=f"Invalid run target name '{module_or_operation}'. Must be a path to a pipeline file, or one of the available modules/operations.",
            available_targets=sorted(merged),
        )
    return operation
filter_operations(kiara, pkg_name=None, **operations)
Source code in kiara/utils/operations.py
def filter_operations(
    kiara: "Kiara", pkg_name: Union[str, None] = None, **operations: "Operation"
) -> OperationGroupInfo:

    result: Dict[str, OperationInfo] = {}

    # op_infos = kiara.operation_registry.get_context_metadata(only_for_package=pkg_name)
    modules = kiara.module_registry.get_context_metadata(only_for_package=pkg_name)

    for op_id, op in operations.items():

        if op.module.module_type_name != "pipeline":
            if op.module.module_type_name in modules.keys():
                result[op_id] = OperationInfo.create_from_operation(
                    kiara=kiara, operation=op
                )
                continue
        else:
            package: Union[str, None] = op.metadata.get("labels", {}).get(
                "package", None
            )
            if not pkg_name or (package and package == pkg_name):
                result[op_id] = OperationInfo.create_from_operation(
                    kiara=kiara, operation=op
                )

        # opt_types = kiara.operation_registry.find_all_operation_types(op_id)
        # match = False
        # for ot in opt_types:
        #     if ot in op_infos.keys():
        #         match = True
        #         break
        #
        # if match:
        #     result[op_id] = OperationInfo.create_from_operation(
        #         kiara=kiara, operation=op
        #     )

    return OperationGroupInfo.construct(item_infos=result)  # type: ignore

output

log

Classes

ArrowTabularWrap (TabularWrap)
Source code in kiara/utils/output.py
class ArrowTabularWrap(TabularWrap):
    def __init__(self, table: "ArrowTable"):
        self._table: "ArrowTable" = table
        super().__init__()

    def retrieve_column_names(self) -> Iterable[str]:
        return self._table.column_names

    def retrieve_number_of_rows(self) -> int:
        return self._table.num_rows

    def slice(self, offset: int = 0, length: Union[int, None] = None):
        return self._table.slice(offset=offset, length=length)

    def to_pydict(self) -> Mapping:
        return self._table.to_pydict()
retrieve_column_names(self)
Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:
    return self._table.column_names
retrieve_number_of_rows(self)
Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:
    return self._table.num_rows
slice(self, offset=0, length=None)
Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None):
    return self._table.slice(offset=offset, length=length)
to_pydict(self)
Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:
    return self._table.to_pydict()
DictTabularWrap (TabularWrap)
Source code in kiara/utils/output.py
class DictTabularWrap(TabularWrap):
    def __init__(self, data: Mapping[str, Any]):

        self._data: Mapping[str, Any] = data

    def retrieve_number_of_rows(self) -> int:
        return len(self._data)

    def retrieve_column_names(self) -> Iterable[str]:
        return self._data.keys()

    def to_pydict(self) -> Mapping:
        return self._data

    def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

        result = {}
        start = None
        end = None
        for cn in self._data.keys():
            if start is None:
                if offset > len(self._data):
                    return DictTabularWrap({cn: [] for cn in self._data.keys()})
                start = offset
                if not length:
                    end = len(self._data)
                else:
                    end = start + length
                    if end > len(self._data):
                        end = len(self._data)
            result[cn] = self._data[cn][start:end]
        return DictTabularWrap(result)
retrieve_column_names(self)
Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:
    return self._data.keys()
retrieve_number_of_rows(self)
Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:
    return len(self._data)
slice(self, offset=0, length=None)
Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

    result = {}
    start = None
    end = None
    for cn in self._data.keys():
        if start is None:
            if offset > len(self._data):
                return DictTabularWrap({cn: [] for cn in self._data.keys()})
            start = offset
            if not length:
                end = len(self._data)
            else:
                end = start + length
                if end > len(self._data):
                    end = len(self._data)
        result[cn] = self._data[cn][start:end]
    return DictTabularWrap(result)
to_pydict(self)
Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:
    return self._data
OutputDetails (BaseModel) pydantic-model
Source code in kiara/utils/output.py
class OutputDetails(BaseModel):
    @classmethod
    def from_data(cls, data: Any):

        if isinstance(data, str):
            if "=" in data:
                data = [data]
            else:
                data = [f"format={data}"]

        if isinstance(data, Iterable):
            from kiara.utils.cli import dict_from_cli_args

            data = list(data)
            if len(data) == 1 and isinstance(data[0], str) and "=" not in data[0]:
                data = [f"format={data[0]}"]
            output_details_dict = dict_from_cli_args(*data)
        else:
            raise TypeError(
                f"Can't parse output detail config: invalid input type '{type(data)}'."
            )

        output_details = OutputDetails(**output_details_dict)
        return output_details

    format: str = Field(description="The output format.")
    target: str = Field(description="The output target.")
    config: Dict[str, Any] = Field(
        description="Output configuration.", default_factory=dict
    )

    @root_validator(pre=True)
    def _set_defaults(cls, values):

        target: str = values.pop("target", "terminal")
        format: str = values.pop("format", None)
        if format is None:
            if target == "terminal":
                format = "terminal"
            else:
                if target == "file":
                    format = "json"
                else:
                    ext = target.split(".")[-1]
                    if ext in ["yaml", "json"]:
                        format = ext
                    else:
                        format = "json"
        result = {"format": format, "target": target, "config": dict(values)}

        return result
Attributes
config: Dict[str, Any] pydantic-field

Output configuration.

format: str pydantic-field required

The output format.

target: str pydantic-field required

The output target.

from_data(data) classmethod
Source code in kiara/utils/output.py
@classmethod
def from_data(cls, data: Any):

    if isinstance(data, str):
        if "=" in data:
            data = [data]
        else:
            data = [f"format={data}"]

    if isinstance(data, Iterable):
        from kiara.utils.cli import dict_from_cli_args

        data = list(data)
        if len(data) == 1 and isinstance(data[0], str) and "=" not in data[0]:
            data = [f"format={data[0]}"]
        output_details_dict = dict_from_cli_args(*data)
    else:
        raise TypeError(
            f"Can't parse output detail config: invalid input type '{type(data)}'."
        )

    output_details = OutputDetails(**output_details_dict)
    return output_details
RenderConfig (BaseModel) pydantic-model
Source code in kiara/utils/output.py
class RenderConfig(BaseModel):

    render_format: str = Field(description="The output format.", default="terminal")
Attributes
render_format: str pydantic-field

The output format.

SqliteTabularWrap (TabularWrap)
Source code in kiara/utils/output.py
class SqliteTabularWrap(TabularWrap):
    def __init__(self, engine: "Engine", table_name: str):
        self._engine: Engine = engine
        self._table_name: str = table_name
        super().__init__()

    def retrieve_number_of_rows(self) -> int:

        from sqlalchemy import text

        with self._engine.connect() as con:
            result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
            num_rows = result.fetchone()[0]

        return num_rows

    def retrieve_column_names(self) -> Iterable[str]:

        from sqlalchemy import inspect

        engine = self._engine
        inspector = inspect(engine)
        columns = inspector.get_columns(self._table_name)
        result = [column["name"] for column in columns]
        return result

    def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"
        if length:
            query = f"{query} LIMIT {length}"
        else:
            query = f"{query} LIMIT {self.num_rows}"
        if offset > 0:
            query = f"{query} OFFSET {offset}"
        with self._engine.connect() as con:
            result = con.execute(text(query))
            result_dict: Dict[str, List[Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return DictTabularWrap(result_dict)

    def to_pydict(self) -> Mapping:

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"

        with self._engine.connect() as con:
            result = con.execute(text(query))
            result_dict: Dict[str, List[Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return result_dict
retrieve_column_names(self)
Source code in kiara/utils/output.py
def retrieve_column_names(self) -> Iterable[str]:

    from sqlalchemy import inspect

    engine = self._engine
    inspector = inspect(engine)
    columns = inspector.get_columns(self._table_name)
    result = [column["name"] for column in columns]
    return result
retrieve_number_of_rows(self)
Source code in kiara/utils/output.py
def retrieve_number_of_rows(self) -> int:

    from sqlalchemy import text

    with self._engine.connect() as con:
        result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
        num_rows = result.fetchone()[0]

    return num_rows
slice(self, offset=0, length=None)
Source code in kiara/utils/output.py
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"
    if length:
        query = f"{query} LIMIT {length}"
    else:
        query = f"{query} LIMIT {self.num_rows}"
    if offset > 0:
        query = f"{query} OFFSET {offset}"
    with self._engine.connect() as con:
        result = con.execute(text(query))
        result_dict: Dict[str, List[Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return DictTabularWrap(result_dict)
to_pydict(self)
Source code in kiara/utils/output.py
def to_pydict(self) -> Mapping:

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"

    with self._engine.connect() as con:
        result = con.execute(text(query))
        result_dict: Dict[str, List[Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return result_dict
TabularWrap (ABC)
Source code in kiara/utils/output.py
class TabularWrap(ABC):
    def __init__(self):
        self._num_rows: Union[int, None] = None
        self._column_names: Union[Iterable[str], None] = None

    @property
    def num_rows(self) -> int:
        if self._num_rows is None:
            self._num_rows = self.retrieve_number_of_rows()
        return self._num_rows

    @property
    def column_names(self) -> Iterable[str]:
        if self._column_names is None:
            self._column_names = self.retrieve_column_names()
        return self._column_names

    @abstractmethod
    def retrieve_column_names(self) -> Iterable[str]:
        pass

    @abstractmethod
    def retrieve_number_of_rows(self) -> int:
        pass

    @abstractmethod
    def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
        pass

    @abstractmethod
    def to_pydict(self) -> Mapping:
        pass

    def pretty_print(
        self,
        rows_head: Union[int, None] = None,
        rows_tail: Union[int, None] = None,
        max_row_height: Union[int, None] = None,
        max_cell_length: Union[int, None] = None,
        show_table_header: bool = True,
    ) -> RenderableType:

        rich_table = RichTable(box=box.SIMPLE, show_header=show_table_header)
        for cn in self.retrieve_column_names():
            rich_table.add_column(cn)

        num_split_rows = 2

        if rows_head is not None:

            if rows_head < 0:
                rows_head = 0

            if rows_head > self.retrieve_number_of_rows():
                rows_head = self.retrieve_number_of_rows()
                rows_tail = None
                num_split_rows = 0

            if rows_tail is not None:
                if rows_head + rows_tail >= self.num_rows:  # type: ignore
                    rows_head = self.retrieve_number_of_rows()
                    rows_tail = None
                    num_split_rows = 0
        else:
            num_split_rows = 0

        if rows_head is not None:
            head = self.slice(0, rows_head)
            num_rows = rows_head
        else:
            head = self
            num_rows = self.retrieve_number_of_rows()

        table_dict = head.to_pydict()
        for i in range(0, num_rows):
            row = []
            for cn in self.retrieve_column_names():
                cell = table_dict[cn][i]
                cell_str = str(cell)
                if max_row_height and max_row_height > 0 and "\n" in cell_str:
                    lines = cell_str.split("\n")
                    if len(lines) > max_row_height:
                        if max_row_height == 1:
                            lines = lines[0:1]
                        else:
                            half = int(max_row_height / 2)
                            lines = lines[0:half] + [".."] + lines[-half:]
                    cell_str = "\n".join(lines)

                if max_cell_length and max_cell_length > 0:
                    lines = []
                    for line in cell_str.split("\n"):
                        if len(line) > max_cell_length:
                            line = line[0:max_cell_length] + " ..."
                        else:
                            line = line
                        lines.append(line)
                    cell_str = "\n".join(lines)

                row.append(cell_str)

            rich_table.add_row(*row)

        if num_split_rows:
            for i in range(0, num_split_rows):
                row = []
                for _ in self.retrieve_column_names():
                    row.append("...")
                rich_table.add_row(*row)

        if rows_head:
            if rows_tail is not None:
                if rows_tail < 0:
                    rows_tail = 0

                tail = self.slice(self.retrieve_number_of_rows() - rows_tail)
                table_dict = tail.to_pydict()
                for i in range(0, num_rows):

                    row = []
                    for cn in self.retrieve_column_names():

                        cell = table_dict[cn][i]
                        cell_str = str(cell)

                        if max_row_height and max_row_height > 0 and "\n" in cell_str:
                            lines = cell_str.split("\n")
                            if len(lines) > max_row_height:
                                if max_row_height == 1:
                                    lines = lines[0:1]
                                else:
                                    half = int(len(lines) / 2)
                                    lines = lines[0:half] + [".."] + lines[-half:]
                            cell_str = "\n".join(lines)

                        if max_cell_length and max_cell_length > 0:
                            lines = []
                            for line in cell_str.split("\n"):

                                if len(line) > max_cell_length:
                                    line = line[0:(max_cell_length)] + " ..."
                                else:
                                    line = line
                                lines.append(line)
                            cell_str = "\n".join(lines)

                        row.append(cell_str)

                    rich_table.add_row(*row)

        return rich_table
column_names: Iterable[str] property readonly
num_rows: int property readonly
pretty_print(self, rows_head=None, rows_tail=None, max_row_height=None, max_cell_length=None, show_table_header=True)
Source code in kiara/utils/output.py
def pretty_print(
    self,
    rows_head: Union[int, None] = None,
    rows_tail: Union[int, None] = None,
    max_row_height: Union[int, None] = None,
    max_cell_length: Union[int, None] = None,
    show_table_header: bool = True,
) -> RenderableType:

    rich_table = RichTable(box=box.SIMPLE, show_header=show_table_header)
    for cn in self.retrieve_column_names():
        rich_table.add_column(cn)

    num_split_rows = 2

    if rows_head is not None:

        if rows_head < 0:
            rows_head = 0

        if rows_head > self.retrieve_number_of_rows():
            rows_head = self.retrieve_number_of_rows()
            rows_tail = None
            num_split_rows = 0

        if rows_tail is not None:
            if rows_head + rows_tail >= self.num_rows:  # type: ignore
                rows_head = self.retrieve_number_of_rows()
                rows_tail = None
                num_split_rows = 0
    else:
        num_split_rows = 0

    if rows_head is not None:
        head = self.slice(0, rows_head)
        num_rows = rows_head
    else:
        head = self
        num_rows = self.retrieve_number_of_rows()

    table_dict = head.to_pydict()
    for i in range(0, num_rows):
        row = []
        for cn in self.retrieve_column_names():
            cell = table_dict[cn][i]
            cell_str = str(cell)
            if max_row_height and max_row_height > 0 and "\n" in cell_str:
                lines = cell_str.split("\n")
                if len(lines) > max_row_height:
                    if max_row_height == 1:
                        lines = lines[0:1]
                    else:
                        half = int(max_row_height / 2)
                        lines = lines[0:half] + [".."] + lines[-half:]
                cell_str = "\n".join(lines)

            if max_cell_length and max_cell_length > 0:
                lines = []
                for line in cell_str.split("\n"):
                    if len(line) > max_cell_length:
                        line = line[0:max_cell_length] + " ..."
                    else:
                        line = line
                    lines.append(line)
                cell_str = "\n".join(lines)

            row.append(cell_str)

        rich_table.add_row(*row)

    if num_split_rows:
        for i in range(0, num_split_rows):
            row = []
            for _ in self.retrieve_column_names():
                row.append("...")
            rich_table.add_row(*row)

    if rows_head:
        if rows_tail is not None:
            if rows_tail < 0:
                rows_tail = 0

            tail = self.slice(self.retrieve_number_of_rows() - rows_tail)
            table_dict = tail.to_pydict()
            for i in range(0, num_rows):

                row = []
                for cn in self.retrieve_column_names():

                    cell = table_dict[cn][i]
                    cell_str = str(cell)

                    if max_row_height and max_row_height > 0 and "\n" in cell_str:
                        lines = cell_str.split("\n")
                        if len(lines) > max_row_height:
                            if max_row_height == 1:
                                lines = lines[0:1]
                            else:
                                half = int(len(lines) / 2)
                                lines = lines[0:half] + [".."] + lines[-half:]
                        cell_str = "\n".join(lines)

                    if max_cell_length and max_cell_length > 0:
                        lines = []
                        for line in cell_str.split("\n"):

                            if len(line) > max_cell_length:
                                line = line[0:(max_cell_length)] + " ..."
                            else:
                                line = line
                            lines.append(line)
                        cell_str = "\n".join(lines)

                    row.append(cell_str)

                rich_table.add_row(*row)

    return rich_table
retrieve_column_names(self)
Source code in kiara/utils/output.py
@abstractmethod
def retrieve_column_names(self) -> Iterable[str]:
    pass
retrieve_number_of_rows(self)
Source code in kiara/utils/output.py
@abstractmethod
def retrieve_number_of_rows(self) -> int:
    pass
slice(self, offset=0, length=None)
Source code in kiara/utils/output.py
@abstractmethod
def slice(self, offset: int = 0, length: Union[int, None] = None) -> "TabularWrap":
    pass
to_pydict(self)
Source code in kiara/utils/output.py
@abstractmethod
def to_pydict(self) -> Mapping:
    pass

Functions

create_pipeline_steps_tree(pipeline_structure, pipeline_details)
Source code in kiara/utils/output.py
def create_pipeline_steps_tree(
    pipeline_structure: "PipelineStructure", pipeline_details: "PipelineDetails"
) -> Tree:

    from kiara.models.module.pipeline import StepStatus

    steps = Tree("steps")

    for idx, stage in enumerate(pipeline_structure.processing_stages, start=1):
        stage_node = steps.add(f"stage: [i]{idx}[/i]")
        for step_id in sorted(stage):
            step_node = stage_node.add(f"step: [i]{step_id}[/i]")
            step_details = pipeline_details.step_states[step_id]
            status = step_details.status
            if status is StepStatus.INPUTS_READY:
                step_node.add("status: [yellow]inputs ready[/yellow]")
            elif status is StepStatus.RESULTS_READY:
                step_node.add("status: [green]results ready[/green]")
            else:
                invalid_node = step_node.add("status: [red]inputs invalid[/red]")
                invalid = step_details.invalid_details
                for k, v in invalid.items():
                    invalid_node.add(f"[i]{k}[/i]: {v}")

    return steps
create_recursive_table_from_model_object(model, render_config=None)
Source code in kiara/utils/output.py
def create_recursive_table_from_model_object(
    model: BaseModel,
    render_config: Union[Mapping[str, Any], None] = None,
):

    if render_config is None:
        render_config = {}

    show_lines = render_config.get("show_lines", True)
    show_header = render_config.get("show_header", True)
    model_cls = model.__class__

    table = RichTable(box=box.SIMPLE, show_lines=show_lines, show_header=show_header)
    table.add_column("Field")
    table.add_column("Value")

    props = model_cls.schema().get("properties", {})

    for field_name in sorted(model_cls.__fields__.keys()):

        data = getattr(model, field_name)
        p = props.get(field_name, None)
        p_type = None
        if p is not None:
            p_type = p.get("type", None)
            # TODO: check 'anyOf' keys

        if p_type is not None:
            p_type = f"[i]{p_type}[/i]"

        desc = p.get("description", None)

        if not isinstance(data, BaseModel):
            data_renderable = extract_renderable(data, render_config=render_config)
            sub_model = None
        else:
            sub_model = create_recursive_table_from_model_object(
                data, render_config={"show_lines": True, "show_header": False}
            )
            data_renderable = None

        group = []

        if data_renderable:
            group.append(data_renderable)
            group.append("")
        if desc:
            group.append(f"[i]{desc}[/i]")

        if sub_model:
            group.append(sub_model)

        if p_type:
            field_name = f"[b i]{field_name}[/b i] ([i]{p_type}[/i])"
        else:
            field_name = f"[b i]{field_name}[/b i]"
        table.add_row(field_name, Group(*group))

    return table
create_renderable_from_values(values, config=None)

Create a renderable for this module configuration.

Source code in kiara/utils/output.py
def create_renderable_from_values(
    values: Mapping[str, "Value"], config: Union[Mapping[str, Any], None] = None
) -> RenderableType:
    """Create a renderable for this module configuration."""

    if config is None:
        config = {}

    render_format = config.get("render_format", "terminal")
    if render_format not in ["terminal"]:
        raise Exception(f"Invalid render format: {render_format}")

    show_pedigree = config.get("show_pedigree", False)
    show_data = config.get("show_data", False)
    show_hash = config.get("show_hash", True)
    # show_load_config = config.get("show_load_config", False)

    table = RichTable(show_lines=True, box=box.MINIMAL_DOUBLE_HEAD)
    table.add_column("value_id", "i")
    table.add_column("data_type")
    table.add_column("size")
    if show_hash:
        table.add_column("hash")
    if show_pedigree:
        table.add_column("pedigree")
    if show_data:
        table.add_column("data")

    for id, value in sorted(values.items(), key=lambda item: item[1].value_schema.type):
        row: List[RenderableType] = [id, value.value_schema.type, str(value.value_size)]
        if show_hash:
            row.append(str(value.value_hash))
        if show_pedigree:
            if value.pedigree == ORPHAN:
                pedigree = "-- n/a --"
            else:
                pedigree = value.pedigree.json(option=orjson.OPT_INDENT_2)
            row.append(pedigree)
        if show_data:
            data = value._data_registry.pretty_print_data(
                value_id=value.value_id, target_type="terminal_renderable", **config
            )
            row.append(data)
        # if show_load_config:
        #     load_config = value.retrieve_load_config()
        #     if load_config is None:
        #         load_config_str: RenderableType = "-- not stored (yet) --"
        #     else:
        #         load_config_str = load_config.create_renderable()
        #     row.append(load_config_str)
        table.add_row(*row)

    return table
create_table_from_base_model_cls(model_cls)
Source code in kiara/utils/output.py
def create_table_from_base_model_cls(model_cls: Type[BaseModel]):

    table = RichTable(box=box.SIMPLE, show_lines=True)
    table.add_column("Field")
    table.add_column("Type")
    table.add_column("Description")
    table.add_column("Required")
    table.add_column("Default")

    props = model_cls.schema().get("properties", {})

    for field_name, field in sorted(model_cls.__fields__.items()):
        row = [field_name]
        p = props.get(field_name, None)
        p_type = None
        if p is not None:
            p_type = p.get("type", None)
            # TODO: check 'anyOf' keys

        if p_type is None:
            p_type = "-- check source --"
        row.append(p_type)
        desc = p.get("description", "")
        row.append(desc)
        row.append("yes" if field.required else "no")
        default = field.default
        if callable(default):
            default = default()

        if default is None:
            default = ""
        else:
            try:
                default = json.dumps(default, indent=2)
            except Exception:
                default = str(default)
        row.append(default)
        table.add_row(*row)

    return table
create_table_from_field_schemas(fields, _add_default=True, _add_required=True, _show_header=False, _constants=None)
Source code in kiara/utils/output.py
def create_table_from_field_schemas(
    fields: Mapping[str, "ValueSchema"],
    _add_default: bool = True,
    _add_required: bool = True,
    _show_header: bool = False,
    _constants: Union[Mapping[str, Any], None] = None,
) -> RichTable:

    table = RichTable(box=box.SIMPLE, show_header=_show_header)
    table.add_column("field name", style="i", overflow="fold")
    table.add_column("type")
    table.add_column("description")

    if _add_required:
        table.add_column("Required")
    if _add_default:
        if _constants:
            table.add_column("Default / Constant")
        else:
            table.add_column("Default")
    for field_name, schema in fields.items():

        row: List[RenderableType] = [field_name, schema.type, schema.doc]

        if _add_required:
            req = schema.is_required()
            if not req:
                req_str = "no"
            else:
                if schema.default in [
                    None,
                    SpecialValue.NO_VALUE,
                    SpecialValue.NOT_SET,
                ]:
                    req_str = "[b]yes[b]"
                else:
                    req_str = "no"
            row.append(req_str)

        if _add_default:
            if _constants and field_name in _constants.keys():
                d = f"[b]{_constants[field_name]}[/b] (constant)"
            else:
                if schema.default in [
                    None,
                    SpecialValue.NO_VALUE,
                    SpecialValue.NOT_SET,
                ]:
                    d = "-- no default --"
                else:
                    d = str(schema.default)
            row.append(d)

        table.add_row(*row)

    return table
create_table_from_model_object(model, render_config=None, exclude_fields=None)
Source code in kiara/utils/output.py
def create_table_from_model_object(
    model: BaseModel,
    render_config: Union[Mapping[str, Any], None] = None,
    exclude_fields: Union[Set[str], None] = None,
):

    model_cls = model.__class__

    table = RichTable(box=box.SIMPLE, show_lines=True)
    table.add_column("Field")
    table.add_column("Type")
    table.add_column("Value")
    table.add_column("Description")

    props = model_cls.schema().get("properties", {})

    for field_name, field in sorted(model_cls.__fields__.items()):
        if exclude_fields and field_name in exclude_fields:
            continue
        row = [field_name]

        p = props.get(field_name, None)
        p_type = None
        if p is not None:
            p_type = p.get("type", None)
            # TODO: check 'anyOf' keys

        if p_type is None:
            p_type = "-- check source --"
        row.append(p_type)

        data = getattr(model, field_name)
        row.append(extract_renderable(data, render_config=render_config))

        desc = p.get("description", "")
        row.append(desc)
        table.add_row(*row)

    return table
create_value_map_status_renderable(inputs, render_config=None, fields=None)
Source code in kiara/utils/output.py
def create_value_map_status_renderable(
    inputs: ValueMap,
    render_config: Union[Mapping[str, Any], None] = None,
    fields: Union[None, Iterable[str]] = None,
) -> RichTable:

    if render_config is None:
        render_config = {}

    show_description: bool = render_config.get("show_description", True)
    show_type: bool = render_config.get("show_type", True)
    show_required: bool = render_config.get("show_required", True)
    show_default: bool = render_config.get("show_default", True)
    show_value_ids: bool = render_config.get("show_value_ids", False)

    table = RichTable(box=box.SIMPLE, show_header=True)
    table.add_column("field name", style="i")
    table.add_column("status", style="b")
    if show_type:
        table.add_column("type")
    if show_description:
        table.add_column("description")

    if show_required:
        table.add_column("required")

    if show_default:
        table.add_column("default")

    if show_value_ids:
        table.add_column("value id", overflow="fold")

    invalid = inputs.check_invalid()

    if fields:
        field_order = fields
    else:
        field_order = sorted(inputs.keys())

    for field_name in field_order:

        value = inputs.get(field_name, None)
        if value is None:
            log.debug(
                "ignore.field", field_name=field_name, available_fields=inputs.keys()
            )
            continue

        row: List[RenderableType] = [field_name]

        if field_name in invalid.keys():
            row.append(f"[red]{invalid[field_name]}[/red]")
        else:
            row.append("[green]valid[/green]")

        value_schema = inputs.values_schema[field_name]

        if show_type:
            row.append(value_schema.type)

        if show_description:
            row.append(value_schema.doc.description)

        if show_required:
            req = value_schema.is_required()
            if not req:
                req_str = "no"
            else:
                if value_schema.default in [
                    None,
                    SpecialValue.NO_VALUE,
                    SpecialValue.NOT_SET,
                ]:
                    req_str = "[b]yes[b]"
                else:
                    req_str = "no"
            row.append(req_str)

        if show_default:
            default = value_schema.default
            if callable(default):
                default_val = default()
            else:
                default_val = default

            if default_val in [None, SpecialValue.NOT_SET, SpecialValue.NO_VALUE]:
                default_str = ""
            else:
                default_str = str(default_val)

            row.append(default_str)

        if show_value_ids:
            row.append(str(inputs.get_value_obj(field_name=field_name).value_id))

        table.add_row(*row)

    return table
extract_renderable(item, render_config=None)

Try to automatically find and extract or create an object that is renderable by the 'rich' library.

Source code in kiara/utils/output.py
def extract_renderable(item: Any, render_config: Union[Mapping[str, Any], None] = None):
    """Try to automatically find and extract or create an object that is renderable by the 'rich' library."""

    if render_config is None:
        render_config = {}
    else:
        render_config = dict(render_config)

    inline_models_as_json = render_config.setdefault("inline_models_as_json", True)

    if hasattr(item, "create_renderable"):
        return item.create_renderable(**render_config)
    elif isinstance(item, (ConsoleRenderable, RichCast, str)):
        return item
    elif isinstance(item, BaseModel) and not inline_models_as_json:
        return create_table_from_model_object(item)
    elif isinstance(item, BaseModel):
        return item.json(indent=2)
    elif isinstance(item, Mapping) and not inline_models_as_json:
        table = RichTable(show_header=False, box=box.SIMPLE)
        table.add_column("Key", style="i")
        table.add_column("Value")
        for k, v in item.items():
            table.add_row(k, extract_renderable(v, render_config=render_config))
        return table
    elif isinstance(item, Mapping):
        result = {}
        for k, v in item.items():
            if isinstance(v, BaseModel):
                v = v.dict()
            result[k] = v
        return orjson_dumps(
            result, option=orjson.OPT_INDENT_2 | orjson.OPT_NON_STR_KEYS
        )
    elif isinstance(item, Iterable):
        _all = []
        for i in item:
            _all.append(extract_renderable(i))
        rg = Group(*_all)
        return rg
    elif isinstance(item, Enum):
        return item.value
    else:
        return str(item)

pipelines

Functions

check_doc_sidecar(path, data)
Source code in kiara/utils/pipelines.py
def check_doc_sidecar(
    path: Union[Path, str], data: Mapping[str, Any]
) -> Mapping[str, Any]:

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    _doc = data["data"].get("documentation", None)
    if _doc is None:
        _doc_path = Path(path.as_posix() + ".md")
        if _doc_path.is_file():
            doc = _doc_path.read_text()
            if doc:
                data["data"]["documentation"] = doc

    return data
create_step_value_address(value_address_config, default_field_name)
Source code in kiara/utils/pipelines.py
def create_step_value_address(
    value_address_config: Union[str, Mapping[str, Any]],
    default_field_name: str,
) -> "StepValueAddress":

    if isinstance(value_address_config, StepValueAddress):
        return value_address_config

    sub_value: Union[Mapping[str, Any], None] = None

    if isinstance(value_address_config, str):

        tokens = value_address_config.split(".")
        if len(tokens) == 1:
            step_id = value_address_config
            output_name = default_field_name
        elif len(tokens) == 2:
            step_id = tokens[0]
            output_name = tokens[1]
        elif len(tokens) == 3:
            step_id = tokens[0]
            output_name = tokens[1]
            sub_value = {"config": tokens[2]}
        else:
            raise NotImplementedError()

    elif isinstance(value_address_config, Mapping):

        step_id = value_address_config["step_id"]
        output_name = value_address_config["value_name"]
        sub_value = value_address_config.get("sub_value", None)
    else:
        raise TypeError(
            f"Invalid type for creating step value address: {type(value_address_config)}"
        )

    if sub_value is not None and not isinstance(sub_value, Mapping):
        raise ValueError(
            f"Invalid type '{type(sub_value)}' for sub_value (step_id: {step_id}, value name: {output_name}): {sub_value}"
        )

    input_link = StepValueAddress(
        step_id=step_id, value_name=output_name, sub_value=sub_value
    )
    return input_link
ensure_step_value_addresses(link, default_field_name)
Source code in kiara/utils/pipelines.py
def ensure_step_value_addresses(
    link: Union[str, Mapping, Iterable], default_field_name: str
) -> List["StepValueAddress"]:

    if isinstance(link, (str, Mapping)):
        input_links: List[StepValueAddress] = [
            create_step_value_address(
                value_address_config=link, default_field_name=default_field_name
            )
        ]

    elif isinstance(link, Iterable):
        input_links = []
        for o in link:
            il = create_step_value_address(
                value_address_config=o, default_field_name=default_field_name
            )
            input_links.append(il)
    else:
        raise TypeError(f"Can't parse input map, invalid type for output: {link}")

    return input_links
get_pipeline_details_from_path(path, module_type_name=None, base_module=None)

Load a pipeline description, save it's content, and determine it the pipeline base name.

Parameters:

Name Type Description Default
path Union[str, pathlib.Path]

the path to the pipeline file

required
module_type_name Optional[str]

if specifies, overwrites any auto-detected or assigned pipeline name

None
base_module Optional[str]

overrides the base module the assembled pipeline module will be located in the python hierarchy

None
Source code in kiara/utils/pipelines.py
def get_pipeline_details_from_path(
    path: Union[str, Path],
    module_type_name: Union[str, None] = None,
    base_module: Union[str, None] = None,
) -> Mapping[str, Any]:
    """Load a pipeline description, save it's content, and determine it the pipeline base name.

    Arguments:
        path: the path to the pipeline file
        module_type_name: if specifies, overwrites any auto-detected or assigned pipeline name
        base_module: overrides the base module the assembled pipeline module will be located in the python hierarchy

    """

    if isinstance(path, str):
        path = Path(os.path.expanduser(path))

    if not path.is_file():
        raise Exception(
            f"Can't add pipeline description '{path.as_posix()}': not a file"
        )

    data = get_data_from_file(path)

    if not data:
        raise Exception(
            f"Can't register pipeline file '{path.as_posix()}': no content."
        )

    if module_type_name:
        data[MODULE_TYPE_NAME_KEY] = module_type_name

    if not isinstance(data, Mapping):
        raise Exception("Not a dictionary type.")

    # filename = path.name
    # name = data.get(MODULE_TYPE_NAME_KEY, None)
    # if name is None:
    #     name = filename.split(".", maxsplit=1)[0]

    result = {"data": data, "source": path.as_posix(), "source_type": "file"}
    if base_module:
        result["base_module"] = base_module
    return result

string_vars

log
create_var_regex(delimiter_start=None, delimiter_end=None)
Source code in kiara/utils/string_vars.py
def create_var_regex(
    delimiter_start: Union[str, None] = None, delimiter_end: Union[str, None] = None
) -> Pattern:

    if delimiter_start is None:
        delimiter_start = "\\$\\{"

    # TODO: make this smarter
    if delimiter_end is None:
        delimiter_end = "\\}"

    regex = re.compile(delimiter_start + "\\s*(.+?)\\s*" + delimiter_end)
    return regex
find_regex_matches_in_obj(source_obj, regex, current=None)
Source code in kiara/utils/string_vars.py
def find_regex_matches_in_obj(
    source_obj: Any, regex: Pattern, current: Union[Set[str], None] = None
) -> Set[str]:

    if current is None:
        current = set()

    if not source_obj:
        return current

    if isinstance(source_obj, Mapping):
        for k, v in source_obj.items():
            find_regex_matches_in_obj(k, regex=regex, current=current)
            find_regex_matches_in_obj(v, regex=regex, current=current)
    elif isinstance(source_obj, str):

        matches = regex.findall(source_obj)
        current.update(matches)

    elif isinstance(source_obj, Sequence):

        for item in source_obj:
            find_regex_matches_in_obj(item, regex=regex, current=current)

    return current
find_var_names_in_obj(template_obj, delimiter=None, delimiter_end=None)
Source code in kiara/utils/string_vars.py
def find_var_names_in_obj(
    template_obj: Any,
    delimiter: Union[Pattern, str, None] = None,
    delimiter_end: Union[str, None] = None,
) -> Set[str]:

    if isinstance(delimiter, Pattern):
        regex = delimiter
    else:
        regex = create_var_regex(delimiter_start=delimiter, delimiter_end=delimiter_end)

    var_names = find_regex_matches_in_obj(template_obj, regex=regex)

    return var_names
replace_var_names_in_obj(template_obj, repl_dict, delimiter=None, delimiter_end=None, ignore_missing_keys=False)
Source code in kiara/utils/string_vars.py
def replace_var_names_in_obj(
    template_obj: Any,
    repl_dict: typing.Mapping[str, Any],
    delimiter: Union[Pattern, str, None] = None,
    delimiter_end: Union[str, None] = None,
    ignore_missing_keys: bool = False,
) -> Any:

    if isinstance(delimiter, Pattern):
        regex = delimiter
    else:
        regex = create_var_regex(delimiter_start=delimiter, delimiter_end=delimiter_end)

    if not template_obj:
        return template_obj

    if isinstance(template_obj, Mapping):
        result: Any = {}
        for k, v in template_obj.items():
            key = replace_var_names_in_obj(
                template_obj=k,
                repl_dict=repl_dict,
                delimiter=regex,
                ignore_missing_keys=ignore_missing_keys,
            )
            value = replace_var_names_in_obj(
                template_obj=v,
                repl_dict=repl_dict,
                delimiter=regex,
                ignore_missing_keys=ignore_missing_keys,
            )
            result[key] = value
    elif isinstance(template_obj, str):
        result = replace_var_names_in_string(
            template_obj,
            repl_dict=repl_dict,
            regex=regex,
            ignore_missing_keys=ignore_missing_keys,
        )
    elif isinstance(template_obj, Sequence):
        result = []
        for item in template_obj:
            r = replace_var_names_in_obj(
                item,
                repl_dict=repl_dict,
                delimiter=regex,
                ignore_missing_keys=ignore_missing_keys,
            )
            result.append(r)
    else:
        result = template_obj

    return result
replace_var_names_in_string(template_string, repl_dict, regex, ignore_missing_keys=False)
Source code in kiara/utils/string_vars.py
def replace_var_names_in_string(
    template_string: str,
    repl_dict: typing.Mapping[str, Any],
    regex: Pattern,
    ignore_missing_keys: bool = False,
) -> str:
    def sub(match):

        key = match.groups()[0]

        if key not in repl_dict.keys():
            if not ignore_missing_keys:
                raise Exception(
                    msg=f"Can't insert variable '{key}'. Key not in provided input values, available keys: {', '.join(repl_dict.keys())}",
                )
            else:
                return match[0]
        else:
            result = repl_dict[key]
            return result

    result = regex.sub(sub, template_string)

    return result

values

augment_values(values, schemas, constants=None)
Source code in kiara/utils/values.py
def augment_values(
    values: Mapping[str, Any],
    schemas: Mapping[str, ValueSchema],
    constants: Union[Mapping[str, ValueSchema], None] = None,
) -> Dict[str, Any]:

    # TODO: check if extra fields were provided

    if constants:
        for k, v in constants.items():
            if k in values.keys():
                raise Exception(f"Invalid input: value provided for constant '{k}'")

    values_new = {}

    if constants:
        for field_name, schema in constants.items():
            v = schema.default
            assert v not in [None, SpecialValue.NO_VALUE, SpecialValue.NOT_SET]
            if callable(v):
                values_new[field_name] = v()
            else:
                values_new[field_name] = copy.deepcopy(v)

    for field_name, schema in schemas.items():

        if field_name in values_new.keys():
            raise Exception(
                f"Duplicate field '{field_name}', this is most likely a bug."
            )

        if field_name not in values.keys():
            if schema.default != SpecialValue.NOT_SET:
                if callable(schema.default):
                    values_new[field_name] = schema.default()
                else:
                    values_new[field_name] = copy.deepcopy(schema.default)
            else:
                values_new[field_name] = SpecialValue.NOT_SET
        else:
            value = values[field_name]
            if value is None:
                value = SpecialValue.NO_VALUE
            values_new[field_name] = value

    return values_new
create_schema_dict(schema_config)
Source code in kiara/utils/values.py
def create_schema_dict(
    schema_config: Mapping[str, Union[ValueSchema, Mapping[str, Any]]],
) -> Mapping[str, ValueSchema]:

    invalid = check_valid_field_names(*schema_config.keys())
    if invalid:
        raise Exception(
            f"Can't assemble schema because it contains invalid input field name(s) '{', '.join(invalid)}'. Change the input schema to not contain any of the reserved keywords: {', '.join(INVALID_VALUE_NAMES)}"
        )

    result = {}
    for k, v in schema_config.items():

        if isinstance(v, ValueSchema):
            result[k] = v
        elif isinstance(v, Mapping):
            _v = dict(v)
            if "doc" not in _v.keys():
                _v["doc"] = DEFAULT_NO_DESC_VALUE
            schema = ValueSchema(**_v)

            result[k] = schema
        else:
            if v is None:
                msg = "None"
            else:
                msg = v.__class__
            raise Exception(
                f"Invalid return type '{msg}' for field '{k}' when trying to create schema."
            )

    return result
extract_raw_value(kiara, value_id)
Source code in kiara/utils/values.py
def extract_raw_value(kiara: "Kiara", value_id: uuid.UUID):
    value = kiara.data_registry.get_value(value_id=value_id)
    if value.pedigree != ORPHAN:
        # TODO: find alias?
        return f'"value:{value_id}"'
    else:
        if value.value_schema.type == "string":
            return f'"{value.data}"'
        elif value.value_schema.type == "list":
            return value.data.list_data
        else:
            return value.data
extract_raw_values(kiara, **value_ids)
Source code in kiara/utils/values.py
def extract_raw_values(kiara: "Kiara", **value_ids: uuid.UUID) -> Dict[str, Any]:

    result = {}
    for field_name, value_id in value_ids.items():
        result[field_name] = extract_raw_value(kiara=kiara, value_id=value_id)
    return result
overlay_constants_and_defaults(schemas, defaults, constants)
Source code in kiara/utils/values.py
def overlay_constants_and_defaults(
    schemas: Mapping[str, ValueSchema],
    defaults: Mapping[str, Any],
    constants: Mapping[str, Any],
):

    for k, v in schemas.items():

        default_value = defaults.get(k, None)
        constant_value = constants.get(k, None)

        # value_to_test = None
        if default_value is not None and constant_value is not None:
            raise Exception(
                f"Module configuration error. Value '{k}' set in both 'constants' and 'defaults', this is not allowed."
            )

        # TODO: perform validation for constants/defaults

        if default_value is not None:
            schemas[k].default = default_value

        if constant_value is not None:
            schemas[k].default = constant_value
            schemas[k].is_constant = True

    input_schemas = {}
    constants = {}
    for k, v in schemas.items():
        if v.is_constant:
            constants[k] = v
        else:
            input_schemas[k] = v

    return input_schemas, constants

yaml

StringYAML (YAML)
Source code in kiara/utils/yaml.py
class StringYAML(YAML):
    def dump(self, data, stream=None, **kw):
        inefficient = False
        if stream is None:
            inefficient = True
            stream = StringIO()
        YAML.dump(self, data, stream, **kw)
        if inefficient:
            return stream.getvalue()
dump(self, data, stream=None, **kw)
Source code in kiara/utils/yaml.py
def dump(self, data, stream=None, **kw):
    inefficient = False
    if stream is None:
        inefficient = True
        stream = StringIO()
    YAML.dump(self, data, stream, **kw)
    if inefficient:
        return stream.getvalue()