module_types
import.file
Documentation
A generic module that imports a file from one of
several possible sources.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags onboarding
Labels package: kiara_plugin.onboarding
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
attach_m… boolean Whether no
to
attach
metadat…
constants object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
onboard_… string The name no
of the
type of
onboard…
Python class
python_class_name OnboardFileModule
python_module_name kiara_plugin.onboarding.modul…
full_name kiara_plugin.onboarding.modul…
Processing source code ─────────────────────────────────────────────────────
class OnboardFileModule(KiaraModule):
"""A generic module that imports a file from o…
_module_type_name = "import.file"
_config_cls = OnboardFileConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
result = {
"source": {
"type": "string",
"doc": "The source uri of the file…
"optional": False,
},
"file_name": {
"type": "string",
"doc": "The file name to use for t…
"optional": True,
},
}
if self.get_config_value("attach_metadata"…
result["attach_metadata"] = {
"type": "boolean",
"doc": "Whether to attach onboardi…
"default": True,
}
onboard_model_cls = self.get_onboard_model…
if not onboard_model_cls:
available = (
ModelRegistry.instance()
.get_models_of_type(OnboardDataMod…
.item_infos.keys()
)
if not available:
raise KiaraException(msg="No onboa…
idx = len(ONBOARDING_MODEL_NAME_PREFIX)
allowed = sorted((x[idx:] for x in ava…
result["onboard_type"] = {
"type": "string",
"type_config": {"allowed_strings":…
"doc": "The type of onboarding to …
", ".join(allowed)
),
"optional": True,
}
elif onboard_model_cls.get_config_fields():
result = {
"onboard_config": {
"type": "kiara_model",
"type_config": {
"kiara_model_id": self.get…
},
}
}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
result = {"file": {"type": "file", "doc": …
return result
@lru_cache(maxsize=1)
def get_onboard_model_cls(self) -> Union[None,…
onboard_type: Union[str, None] = self.get_…
if not onboard_type:
return None
model_registry = ModelRegistry.instance()
model_cls = model_registry.get_model_cls(o…
return model_cls # type: ignore
def find_matching_onboard_models(
self, uri: str
) -> Mapping[Type[OnboardDataModel], Tuple[boo…
model_registry = ModelRegistry.instance()
onboard_models = model_registry.get_models…
OnboardDataModel
).item_infos.values()
result = {}
onboard_model: Type[OnboardDataModel]
for onboard_model in onboard_models: # ty…
python_cls: Type[OnboardDataModel] = o…
result[python_cls] = python_cls.accept…
return result
def process(self, inputs: ValueMap, outputs: V…
onboard_type = self.get_config_value("onbo…
source: str = inputs.get_value_data("sourc…
file_name: Union[str, None] = inputs.get_v…
if not onboard_type:
user_input_onboard_type = inputs.get_v…
if not user_input_onboard_type:
model_clsses = self.find_matching_…
matches = [k for k, v in model_cls…
if not matches:
raise KiaraProcessingException(
msg=f"Can't onboard file f…
)
elif len(matches) > 1:
msg = "Valid onboarding types …
for k, v in model_clsses.items…
if not v[0]:
continue
msg += f" - {k._kiara_mod…
raise KiaraProcessingException(
msg=f"Can't onboard file f…
)
model_cls: Type[OnboardDataModel] …
else:
full_onboard_type = (
f"{ONBOARDING_MODEL_NAME_PREFI…
)
model_registry = ModelRegistry.ins…
model_cls = model_registry.get_mod…
valid, msg = model_cls.accepts_uri…
if not valid:
raise KiaraProcessingException…
else:
model_cls = self.get_onboard_model_cls…
if not model_cls:
raise KiaraProcessingException(msg…
valid, msg = model_cls.accepts_uri(sou…
if not valid:
raise KiaraProcessingException(msg…
if not model_cls.get_config_fields():
model = model_cls()
else:
raise NotImplementedError()
attach_metadata = self.get_config_value("a…
if attach_metadata is None:
attach_metadata = inputs.get_value_dat…
result = model.retrieve(
uri=source, file_name=file_name, attac…
)
if not result:
raise KiaraProcessingException(msg=f"C…
if isinstance(result, str):
data = KiaraFile.load_file(result, fil…
elif not isinstance(result, KiaraFile):
raise KiaraProcessingException(
"Can't onboard file: onboard model…
)
else:
data = result
outputs.set_value("file", data)
─────────────────────────────────────────────────────
import.file_bundle
Documentation
A generic module that imports a file from one of
several possible sources.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags onboarding
Labels package: kiara_plugin.onboarding
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
attach_… boolean Whether no
to attach
onboardi…
metadata.
constan… object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
exclude… array File no
types to
include.
include… array File no
types to
include.
onboard… string The name no
of the
type of
onboardi…
sub_path string The sub no
path to
use.
Python class
python_class_name OnboardFileBundleModule
python_module_name kiara_plugin.onboarding.modul…
full_name kiara_plugin.onboarding.modul…
Processing source code ─────────────────────────────────────────────────────
class OnboardFileBundleModule(KiaraModule):
"""A generic module that imports a file from o…
_module_type_name = "import.file_bundle"
_config_cls = OnboardFileBundleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
result = {
"source": {
"type": "string",
"doc": "The source uri of the file…
"optional": False,
}
}
if self.get_config_value("attach_metadata"…
result["attach_metadata"] = {
"type": "boolean",
"doc": "Whether to attach onboardi…
"default": True,
}
if self.get_config_value("sub_path") is No…
result["sub_path"] = {
"type": "string",
"doc": "The sub path to use. If no…
"optional": True,
}
if self.get_config_value("include_file_typ…
result["include_file_types"] = {
"type": "list",
"doc": "A list of file extensions …
"optional": True,
}
if self.get_config_value("exclude_file_typ…
result["exclude_file_types"] = {
"type": "list",
"doc": "A list of file extensions …
"optional": True,
}
onboard_model_cls = self.get_onboard_model…
if not onboard_model_cls:
available = (
ModelRegistry.instance()
.get_models_of_type(OnboardDataMod…
.item_infos.keys()
)
if not available:
raise KiaraException(msg="No onboa…
idx = len(ONBOARDING_MODEL_NAME_PREFIX)
allowed = sorted((x[idx:] for x in ava…
result["onboard_type"] = {
"type": "string",
"type_config": {"allowed_strings":…
"doc": "The type of onboarding to …
", ".join(allowed)
),
"optional": True,
}
elif onboard_model_cls.get_config_fields():
result = {
"onboard_config": {
"type": "kiara_model",
"type_config": {
"kiara_model_id": self.get…
},
}
}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
result = {
"file_bundle": {
"type": "file_bundle",
"doc": "The file_bundle that was o…
}
}
return result
@lru_cache(maxsize=1)
def get_onboard_model_cls(self) -> Union[None,…
onboard_type: Union[str, None] = self.get_…
if not onboard_type:
return None
model_registry = ModelRegistry.instance()
model_cls = model_registry.get_model_cls(o…
return model_cls # type: ignore
def find_matching_onboard_models(
self, uri: str
) -> Mapping[Type[OnboardDataModel], Tuple[boo…
model_registry = ModelRegistry.instance()
onboard_models = model_registry.get_models…
OnboardDataModel
).item_infos.values()
result = {}
onboard_model: Type[OnboardDataModel]
for onboard_model in onboard_models: # ty…
python_cls: Type[OnboardDataModel] = o…
result[python_cls] = python_cls.accept…
return result
def process(self, inputs: ValueMap, outputs: V…
onboard_type = self.get_config_value("onbo…
source: str = inputs.get_value_data("sourc…
if not onboard_type:
user_input_onboard_type = inputs.get_v…
if not user_input_onboard_type:
model_clsses = self.find_matching_…
matches = [k for k, v in model_cls…
if not matches:
raise KiaraProcessingException(
msg=f"Can't onboard file f…
)
elif len(matches) > 1:
msg = "Valid onboarding types …
for k, v in model_clsses.items…
if not v[0]:
continue
msg += f" - {k._kiara_mod…
raise KiaraProcessingException(
msg=f"Can't onboard file f…
)
model_cls: Type[OnboardDataModel] …
else:
full_onboard_type = (
f"{ONBOARDING_MODEL_NAME_PREFI…
)
model_registry = ModelRegistry.ins…
model_cls = model_registry.get_mod…
valid, msg = model_cls.accepts_bun…
if not valid:
raise KiaraProcessingException…
else:
model_cls = self.get_onboard_model_cls…
if not model_cls:
raise KiaraProcessingException(msg…
valid, msg = model_cls.accepts_bundle_…
if not valid:
raise KiaraProcessingException(msg…
if not model_cls.get_config_fields():
model = model_cls()
else:
raise NotImplementedError()
sub_path = self.get_config_value("sub_path…
if sub_path is None:
sub_path = inputs.get_value_data("sub_…
include = self.get_config_value("include_f…
if include is None:
include = inputs.get_value_data("inclu…
exclude = self.get_config_value("exclude_f…
if exclude is None:
exclude = inputs.get_value_data("exclu…
import_config = FolderImportConfig(
sub_path=sub_path, include_files=inclu…
)
attach_metadata = self.get_config_value("a…
if attach_metadata is None:
attach_metadata = inputs.get_value_dat…
try:
result: Union[None, KiaraFileBundle] =…
uri=source, import_config=import_c…
)
if not result:
raise KiaraProcessingException(msg…
if isinstance(result, str):
result = KiaraFileBundle.import_fo…
except NotImplementedError:
result = None
if not result:
result_file = model.retrieve(
uri=source, file_name=None, attach…
)
if not result_file:
raise KiaraProcessingException(msg…
if isinstance(result, str):
imported_bundle_file = KiaraFile.l…
elif not isinstance(result_file, Kiara…
raise KiaraProcessingException(
"Can't onboard file: onboard m…
)
else:
imported_bundle_file = result_file
imported_bundle = KiaraFileBundle.from…
imported_bundle_file, import_confi…
)
else:
imported_bundle = result
outputs.set_value("file_bundle", imported_…
─────────────────────────────────────────────────────
onboard.zenodo_record
Documentation
Download a dataset from zenodo.org.
Author(s)
Markus Binsteiner markus@frkl.io
Context
Tags onboarding
Labels package: kiara_plugin.onboarding
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constan… object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
metadat… string The no "metada…
filename
for the
zenodo
metadata.
Python class
python_class_name ZenodoDownload
python_module_name kiara_plugin.onboarding.modul…
full_name kiara_plugin.onboarding.modul…
Processing source code ─────────────────────────────────────────────────────
class ZenodoDownload(KiaraModule):
"""Download a dataset from zenodo.org."""
_module_type_name = "onboard.zenodo_record"
_config_cls = ZenodoDownloadConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
metadata_filename = self.get_config_value(…
return {
"doi": {"type": "string", "doc": "The …
"include_metadata": {
"type": "boolean",
"doc": f"Whether to write the reco…
"default": True,
},
}
def create_outputs_schema(
self,
) -> ValueMapSchema:
return {
"file_bundle": {
"type": "file_bundle",
}
}
def download_file(self, file_data: Mapping[str…
import httpx
url = file_data["links"]["self"]
file_name = file_data["key"]
checksum = file_data["checksum"][4:]
target_file = target_path / file_name
if target_file.exists():
raise KiaraProcessingException(
f"Can't download file, target path…
)
hash_md5 = hashlib.md5() # noqa
with open(target_file, "ab") as file2:
with httpx.Client() as client:
with client.stream("GET", url) as …
for chunk in resp.iter_bytes():
hash_md5.update(chunk)
file2.write(chunk)
if checksum != hash_md5.hexdigest():
raise KiaraProcessingException(
f"Can't downloda file '{file_name}…
)
return target_file
def process(self, inputs: ValueMap, outputs: V…
import pyzenodo3
include_metadata = inputs.get_value_data("…
doi = inputs.get_value_data("doi")
zen = pyzenodo3.Zenodo()
record = zen.find_record_by_doi(doi)
path = KiaraFileBundle.create_tmp_dir()
shutil.rmtree(path, ignore_errors=True)
path.mkdir()
for file_data in record.data["files"]:
self.download_file(file_data, path)
if include_metadata:
metadata_filename = self.get_config_va…
metadata_file = path / metadata_filena…
metadata_file.write_bytes(orjson.dumps…
bundle = KiaraFileBundle.import_folder(pat…
outputs.set_value("file_bundle", bundle)
─────────────────────────────────────────────────────