29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | @click.command()
@click.argument("module_or_operation", nargs=1, metavar="MODULE_OR_OPERATION")
@click.argument("inputs", nargs=-1, required=False)
@click.option(
"--module-config",
"-c",
required=False,
help="(Optional) module configuration, only valid when run target is a module name.",
multiple=True,
)
@click.option(
"--explain",
"-e",
help="Display information about the selected operation and exit.",
is_flag=True,
)
@click.option(
"--output", "-o", help="The output format and configuration.", multiple=True
)
@click.option(
"--save",
"-s",
help="Save one or several of the outputs of this run. If the argument contains a '=', the format is [output_name]=[alias], if not, the values will be saved as '[alias]-[output_name]'.",
required=False,
multiple=True,
)
@click.option("--help", "-h", help="Show this message and exit.", is_flag=True)
@click.pass_context
@handle_exception()
def run(
ctx,
module_or_operation: str,
module_config: Iterable[str],
inputs: Iterable[str],
output: Iterable[str],
explain: bool,
save: Iterable[str],
help: bool,
):
"""Run a kiara operation."""
# =========================================================================
# initialize a few variables
if module_config:
module_config = dict_from_cli_args(*module_config)
else:
module_config = {}
save_results = _validate_save_option(save)
output_details = OutputDetails.from_data(output)
silent = False
if output_details.format == "silent":
silent = True
force_overwrite = output_details.config.get("force", False)
if output_details.target != "terminal":
if output_details.target == "file":
target_dir = Path.cwd()
target_file = target_dir / f"{module_or_operation}.{output_details.format}"
else:
target_file = Path(
os.path.realpath(os.path.expanduser(output_details.target))
)
if target_file.exists() and not force_overwrite:
print()
print(
f"Can't run workflow, the target files already exist, and '--output force=true' not specified: {target_file}"
)
sys.exit(1)
kiara_obj: Kiara = ctx.obj["kiara"]
cmd_arg = ctx.params["module_or_operation"]
cmd_help = f"[yellow bold]Usage: [/yellow bold][bold]kiara run [OPTIONS] [i]{cmd_arg}[/i] [INPUTS][/bold]"
kiara_op = validate_operation_in_terminal(
kiara=kiara_obj,
module_or_operation=module_or_operation,
module_config=module_config,
)
final_aliases = calculate_aliases(kiara_op=kiara_op, alias_tokens=save)
set_and_validate_inputs(
kiara_op=kiara_op,
inputs=inputs,
explain=explain,
print_help=help,
click_context=ctx,
cmd_help=cmd_help,
)
execute_job(
kiara_op=kiara_op,
silent=silent,
save_results=save_results,
aliases=final_aliases,
)
|