25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141 | class CreateFromModule(KiaraModule):
_module_type_name: str = None # type: ignore
_config_cls = CreateFromModuleConfig
@classmethod
def retrieve_supported_create_combinations(cls) -> Iterable[Mapping[str, str]]:
result = []
for attr in dir(cls):
if (
len(attr) <= 16
or not attr.startswith("create__")
or "__from__" not in attr
):
continue
tokens = attr.split("__")
if len(tokens) != 4:
continue
source_type = tokens[3]
target_type = tokens[1]
data = {
"source_type": source_type,
"target_type": target_type,
"func": attr,
}
result.append(data)
return result
def create_optional_inputs(
self, source_type: str, target_type
) -> Union[Mapping[str, Mapping[str, Any]], None]:
return None
def create_inputs_schema(
self,
) -> Mapping[str, Union[ValueSchema, Mapping[str, Any]]]:
source_type = self.get_config_value("source_type")
assert source_type not in ["target", "base_name"]
target_type = self.get_config_value("target_type")
optional = self.create_optional_inputs(
source_type=source_type, target_type=target_type
)
schema = {
source_type: {
"type": source_type,
"doc": f"The source value (of type '{source_type}').",
},
}
if optional:
for field, field_schema in optional.items():
field_schema = dict(field_schema)
if field in schema.keys():
raise Exception(
f"Can't create inputs schema for '{self.module_type_name}': duplicate field '{field}'."
)
if field == source_type:
raise Exception(
f"Can't create inputs schema for '{self.module_type_name}': invalid field name '{field}'."
)
optional = field_schema.get("optional", True)
if not optional:
raise Exception(
f"Can't create inputs schema for '{self.module_type_name}': non-optional field '{field}' specified."
)
field_schema["optional"] = True
schema[field] = field_schema
return schema
def create_outputs_schema(
self,
) -> Mapping[str, Union[ValueSchema, Mapping[str, Any]]]:
return {
self.get_config_value("target_type"): {
"type": self.get_config_value("target_type"),
"doc": f"The result value (of type '{self.get_config_value('target_type')}').",
}
}
def process(self, inputs: ValueMap, outputs: ValueMap, job_log: JobLog) -> None:
source_type = self.get_config_value("source_type")
target_type = self.get_config_value("target_type")
func_name = f"create__{target_type}__from__{source_type}"
func = getattr(self, func_name)
source_value = inputs.get_value_obj(source_type)
signature = inspect.signature(func)
args: Dict[str, Any] = {"source_value": source_value}
if "optional" in signature.parameters:
optional: Dict[str, Value] = {}
op_schemas = {}
for field, schema in self.inputs_schema.items():
if field == source_type:
continue
optional[field] = inputs.get_value_obj(field)
op_schemas[field] = schema
args["optional"] = ValueMapReadOnly(
value_items=optional, values_schema=op_schemas
)
if "job_log" in signature.parameters:
args["job_log"] = job_log
result = func(**args)
outputs.set_value(target_type, result)
|