Skip to content

module_types

import.file

                                                                                
 Documentation                                                                  
                          A generic module that imports a file from one of      
                          several possible sources.                             
                                                                                
 Author(s)                                                                      
                          Markus Binsteiner   markus@frkl.io                    
                                                                                
 Context                                                                        
                          Tags         onboarding                               
                          Labels       package: kiara_plugin.onboarding         
                          References   source_repo:                             
                                       https://github.com/DHARPA-Project/kia…   
                                       documentation:                           
                                       https://DHARPA-Project.github.io/kiar…   
                                                                                
 Module config schema                                                           
                          Field       Type      Descrip…   Required   Default   
                         ─────────────────────────────────────────────────────  
                          attach_m…   boolean   Whether    no                   
                                                to                              
                                                attach                          
                                                metadat…                        
                                                                                
                          constants   object    Value      no                   
                                                constan…                        
                                                for this                        
                                                module.                         
                                                                                
                          defaults    object    Value      no                   
                                                defaults                        
                                                for this                        
                                                module.                         
                                                                                
                          onboard_…   string    The name   no                   
                                                of the                          
                                                type of                         
                                                onboard…                        
                                                                                
 Python class                                                                   
                          python_class_name    OnboardFileModule                
                          python_module_name   kiara_plugin.onboarding.modul…   
                          full_name            kiara_plugin.onboarding.modul…   
                                                                                
 Processing source code  ─────────────────────────────────────────────────────  
                          class OnboardFileModule(KiaraModule):                 
                              """A generic module that imports a file from o…   
                                                                                
                              _module_type_name = "import.file"                 
                              _config_cls = OnboardFileConfig                   
                                                                                
                              def create_inputs_schema(                         
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  result = {                                    
                                      "source": {                               
                                          "type": "string",                     
                                          "doc": "The source uri of the file…   
                                          "optional": False,                    
                                      },                                        
                                      "file_name": {                            
                                          "type": "string",                     
                                          "doc": "The file name to use for t…   
                                          "optional": True,                     
                                      },                                        
                                  }                                             
                                                                                
                                  if self.get_config_value("attach_metadata"   
                                      result["attach_metadata"] = {             
                                          "type": "boolean",                    
                                          "doc": "Whether to attach onboardi…   
                                          "default": True,                      
                                      }                                         
                                                                                
                                  onboard_model_cls = self.get_onboard_model…   
                                  if not onboard_model_cls:                     
                                                                                
                                      available = (                             
                                          ModelRegistry.instance()              
                                          .get_models_of_type(OnboardDataMod…   
                                          .item_infos.keys()                    
                                      )                                         
                                                                                
                                      if not available:                         
                                          raise KiaraException(msg="No onboa…   
                                                                                
                                      idx = len(ONBOARDING_MODEL_NAME_PREFIX)   
                                      allowed = sorted((x[idx:] for x in ava…   
                                                                                
                                      result["onboard_type"] = {                
                                          "type": "string",                     
                                          "type_config": {"allowed_strings":…   
                                          "doc": "The type of onboarding to …   
                                              ", ".join(allowed)                
                                          ),                                    
                                          "optional": True,                     
                                      }                                         
                                  elif onboard_model_cls.get_config_fields():   
                                      result = {                                
                                          "onboard_config": {                   
                                              "type": "kiara_model",            
                                              "type_config": {                  
                                                  "kiara_model_id": self.get…   
                                              },                                
                                          }                                     
                                      }                                         
                                                                                
                                  return result                                 
                                                                                
                              def create_outputs_schema(                        
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  result = {"file": {"type": "file", "doc":    
                                  return result                                 
                                                                                
                              @lru_cache(maxsize=1)                             
                              def get_onboard_model_cls(self) -> Union[None,…   
                                                                                
                                  onboard_type: Union[str, None] = self.get_…   
                                  if not onboard_type:                          
                                      return None                               
                                                                                
                                  model_registry = ModelRegistry.instance()     
                                  model_cls = model_registry.get_model_cls(o…   
                                  return model_cls  # type: ignore              
                                                                                
                              def find_matching_onboard_models(                 
                                  self, uri: str                                
                              ) -> Mapping[Type[OnboardDataModel], Tuple[boo…   
                                                                                
                                  model_registry = ModelRegistry.instance()     
                                  onboard_models = model_registry.get_models…   
                                      OnboardDataModel                          
                                  ).item_infos.values()                         
                                                                                
                                  result = {}                                   
                                  onboard_model: Type[OnboardDataModel]         
                                  for onboard_model in onboard_models:  # ty…   
                                                                                
                                      python_cls: Type[OnboardDataModel] = o…   
                                      result[python_cls] = python_cls.accept…   
                                                                                
                                  return result                                 
                                                                                
                              def process(self, inputs: ValueMap, outputs: V…   
                                                                                
                                  onboard_type = self.get_config_value("onbo…   
                                                                                
                                  source: str = inputs.get_value_data("sourc…   
                                  file_name: Union[str, None] = inputs.get_v…   
                                                                                
                                  if not onboard_type:                          
                                                                                
                                      user_input_onboard_type = inputs.get_v…   
                                                                                
                                      if not user_input_onboard_type:           
                                          model_clsses = self.find_matching_…   
                                          matches = [k for k, v in model_cls…   
                                          if not matches:                       
                                              raise KiaraProcessingException(   
                                                  msg=f"Can't onboard file f…   
                                              )                                 
                                          elif len(matches) > 1:                
                                              msg = "Valid onboarding types …   
                                              for k, v in model_clsses.items…   
                                                  if not v[0]:                  
                                                      continue                  
                                                  msg += f"  - {k._kiara_mod…   
                                              raise KiaraProcessingException(   
                                                  msg=f"Can't onboard file f…   
                                              )                                 
                                                                                
                                          model_cls: Type[OnboardDataModel]    
                                      else:                                     
                                          full_onboard_type = (                 
                                              f"{ONBOARDING_MODEL_NAME_PREFI…   
                                          )                                     
                                          model_registry = ModelRegistry.ins…   
                                          model_cls = model_registry.get_mod…   
                                          valid, msg = model_cls.accepts_uri…   
                                          if not valid:                         
                                              raise KiaraProcessingException…   
                                  else:                                         
                                      model_cls = self.get_onboard_model_cls…   
                                      if not model_cls:                         
                                          raise KiaraProcessingException(msg   
                                                                                
                                      valid, msg = model_cls.accepts_uri(sou…   
                                      if not valid:                             
                                          raise KiaraProcessingException(msg   
                                                                                
                                  if not model_cls.get_config_fields():         
                                      model = model_cls()                       
                                  else:                                         
                                      raise NotImplementedError()               
                                                                                
                                  attach_metadata = self.get_config_value("a…   
                                  if attach_metadata is None:                   
                                      attach_metadata = inputs.get_value_dat…   
                                                                                
                                  result = model.retrieve(                      
                                      uri=source, file_name=file_name, attac…   
                                  )                                             
                                  if not result:                                
                                      raise KiaraProcessingException(msg=f"C…   
                                                                                
                                  if isinstance(result, str):                   
                                      data = KiaraFile.load_file(result, fil…   
                                  elif not isinstance(result, KiaraFile):       
                                      raise KiaraProcessingException(           
                                          "Can't onboard file: onboard model…   
                                      )                                         
                                  else:                                         
                                      data = result                             
                                                                                
                                  outputs.set_value("file", data)               
                                                                                
                         ─────────────────────────────────────────────────────  
                                                                                

import.file_bundle

                                                                                
 Documentation                                                                  
                          A generic module that imports a file from one of      
                          several possible sources.                             
                                                                                
 Author(s)                                                                      
                          Markus Binsteiner   markus@frkl.io                    
                                                                                
 Context                                                                        
                          Tags         onboarding                               
                          Labels       package: kiara_plugin.onboarding         
                          References   source_repo:                             
                                       https://github.com/DHARPA-Project/kia…   
                                       documentation:                           
                                       https://DHARPA-Project.github.io/kiar…   
                                                                                
 Module config schema                                                           
                          Field      Type      Descript…   Required   Default   
                         ─────────────────────────────────────────────────────  
                          attach_…   boolean   Whether     no                   
                                               to attach                        
                                               onboardi…                        
                                               metadata.                        
                                                                                
                          constan…   object    Value       no                   
                                               constants                        
                                               for this                         
                                               module.                          
                                                                                
                          defaults   object    Value       no                   
                                               defaults                         
                                               for this                         
                                               module.                          
                                                                                
                          exclude…   array     File        no                   
                                               types to                         
                                               include.                         
                                                                                
                          include…   array     File        no                   
                                               types to                         
                                               include.                         
                                                                                
                          onboard…   string    The name    no                   
                                               of the                           
                                               type of                          
                                               onboardi…                        
                                                                                
                          sub_path   string    The sub     no                   
                                               path to                          
                                               use.                             
                                                                                
 Python class                                                                   
                          python_class_name    OnboardFileBundleModule          
                          python_module_name   kiara_plugin.onboarding.modul…   
                          full_name            kiara_plugin.onboarding.modul…   
                                                                                
 Processing source code  ─────────────────────────────────────────────────────  
                          class OnboardFileBundleModule(KiaraModule):           
                              """A generic module that imports a file from o…   
                                                                                
                              _module_type_name = "import.file_bundle"          
                              _config_cls = OnboardFileBundleConfig             
                                                                                
                              def create_inputs_schema(                         
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  result = {                                    
                                      "source": {                               
                                          "type": "string",                     
                                          "doc": "The source uri of the file…   
                                          "optional": False,                    
                                      }                                         
                                  }                                             
                                                                                
                                  if self.get_config_value("attach_metadata"   
                                      result["attach_metadata"] = {             
                                          "type": "boolean",                    
                                          "doc": "Whether to attach onboardi…   
                                          "default": True,                      
                                      }                                         
                                  if self.get_config_value("sub_path") is No…   
                                      result["sub_path"] = {                    
                                          "type": "string",                     
                                          "doc": "The sub path to use. If no…   
                                          "optional": True,                     
                                      }                                         
                                  if self.get_config_value("include_file_typ…   
                                      result["include_file_types"] = {          
                                          "type": "list",                       
                                          "doc": "A list of file extensions …   
                                          "optional": True,                     
                                      }                                         
                                                                                
                                  if self.get_config_value("exclude_file_typ…   
                                      result["exclude_file_types"] = {          
                                          "type": "list",                       
                                          "doc": "A list of file extensions …   
                                          "optional": True,                     
                                      }                                         
                                                                                
                                  onboard_model_cls = self.get_onboard_model…   
                                  if not onboard_model_cls:                     
                                                                                
                                      available = (                             
                                          ModelRegistry.instance()              
                                          .get_models_of_type(OnboardDataMod…   
                                          .item_infos.keys()                    
                                      )                                         
                                                                                
                                      if not available:                         
                                          raise KiaraException(msg="No onboa…   
                                                                                
                                      idx = len(ONBOARDING_MODEL_NAME_PREFIX)   
                                      allowed = sorted((x[idx:] for x in ava…   
                                                                                
                                      result["onboard_type"] = {                
                                          "type": "string",                     
                                          "type_config": {"allowed_strings":…   
                                          "doc": "The type of onboarding to …   
                                              ", ".join(allowed)                
                                          ),                                    
                                          "optional": True,                     
                                      }                                         
                                  elif onboard_model_cls.get_config_fields():   
                                      result = {                                
                                          "onboard_config": {                   
                                              "type": "kiara_model",            
                                              "type_config": {                  
                                                  "kiara_model_id": self.get…   
                                              },                                
                                          }                                     
                                      }                                         
                                                                                
                                  return result                                 
                                                                                
                              def create_outputs_schema(                        
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  result = {                                    
                                      "file_bundle": {                          
                                          "type": "file_bundle",                
                                          "doc": "The file_bundle that was o…   
                                      }                                         
                                  }                                             
                                  return result                                 
                                                                                
                              @lru_cache(maxsize=1)                             
                              def get_onboard_model_cls(self) -> Union[None,…   
                                                                                
                                  onboard_type: Union[str, None] = self.get_…   
                                  if not onboard_type:                          
                                      return None                               
                                                                                
                                  model_registry = ModelRegistry.instance()     
                                  model_cls = model_registry.get_model_cls(o…   
                                  return model_cls  # type: ignore              
                                                                                
                              def find_matching_onboard_models(                 
                                  self, uri: str                                
                              ) -> Mapping[Type[OnboardDataModel], Tuple[boo…   
                                                                                
                                  model_registry = ModelRegistry.instance()     
                                  onboard_models = model_registry.get_models…   
                                      OnboardDataModel                          
                                  ).item_infos.values()                         
                                                                                
                                  result = {}                                   
                                  onboard_model: Type[OnboardDataModel]         
                                  for onboard_model in onboard_models:  # ty…   
                                                                                
                                      python_cls: Type[OnboardDataModel] = o…   
                                      result[python_cls] = python_cls.accept…   
                                                                                
                                  return result                                 
                                                                                
                              def process(self, inputs: ValueMap, outputs: V…   
                                                                                
                                  onboard_type = self.get_config_value("onbo…   
                                                                                
                                  source: str = inputs.get_value_data("sourc…   
                                                                                
                                  if not onboard_type:                          
                                                                                
                                      user_input_onboard_type = inputs.get_v…   
                                      if not user_input_onboard_type:           
                                          model_clsses = self.find_matching_…   
                                          matches = [k for k, v in model_cls…   
                                          if not matches:                       
                                              raise KiaraProcessingException(   
                                                  msg=f"Can't onboard file f…   
                                              )                                 
                                          elif len(matches) > 1:                
                                              msg = "Valid onboarding types …   
                                              for k, v in model_clsses.items…   
                                                  if not v[0]:                  
                                                      continue                  
                                                  msg += f"  - {k._kiara_mod…   
                                              raise KiaraProcessingException(   
                                                  msg=f"Can't onboard file f…   
                                              )                                 
                                                                                
                                          model_cls: Type[OnboardDataModel]    
                                      else:                                     
                                          full_onboard_type = (                 
                                              f"{ONBOARDING_MODEL_NAME_PREFI…   
                                          )                                     
                                          model_registry = ModelRegistry.ins…   
                                          model_cls = model_registry.get_mod…   
                                          valid, msg = model_cls.accepts_bun…   
                                          if not valid:                         
                                              raise KiaraProcessingException…   
                                  else:                                         
                                      model_cls = self.get_onboard_model_cls…   
                                      if not model_cls:                         
                                          raise KiaraProcessingException(msg   
                                      valid, msg = model_cls.accepts_bundle_…   
                                      if not valid:                             
                                          raise KiaraProcessingException(msg   
                                                                                
                                  if not model_cls.get_config_fields():         
                                      model = model_cls()                       
                                  else:                                         
                                      raise NotImplementedError()               
                                                                                
                                  sub_path = self.get_config_value("sub_path…   
                                  if sub_path is None:                          
                                      sub_path = inputs.get_value_data("sub_…   
                                                                                
                                  include = self.get_config_value("include_f…   
                                  if include is None:                           
                                      include = inputs.get_value_data("inclu…   
                                  exclude = self.get_config_value("exclude_f…   
                                  if exclude is None:                           
                                      exclude = inputs.get_value_data("exclu…   
                                                                                
                                  import_config = FolderImportConfig(           
                                      sub_path=sub_path, include_files=inclu…   
                                  )                                             
                                  attach_metadata = self.get_config_value("a…   
                                  if attach_metadata is None:                   
                                      attach_metadata = inputs.get_value_dat…   
                                                                                
                                  try:                                          
                                      result: Union[None, KiaraFileBundle] =   
                                          uri=source, import_config=import_c…   
                                      )                                         
                                                                                
                                      if not result:                            
                                          raise KiaraProcessingException(msg   
                                                                                
                                      if isinstance(result, str):               
                                          result = KiaraFileBundle.import_fo…   
                                                                                
                                  except NotImplementedError:                   
                                      result = None                             
                                                                                
                                  if not result:                                
                                      result_file = model.retrieve(             
                                          uri=source, file_name=None, attach…   
                                      )                                         
                                      if not result_file:                       
                                          raise KiaraProcessingException(msg   
                                                                                
                                      if isinstance(result, str):               
                                          imported_bundle_file = KiaraFile.l…   
                                      elif not isinstance(result_file, Kiara…   
                                          raise KiaraProcessingException(       
                                              "Can't onboard file: onboard m…   
                                          )                                     
                                      else:                                     
                                          imported_bundle_file = result_file    
                                                                                
                                      imported_bundle = KiaraFileBundle.from…   
                                          imported_bundle_file, import_confi…   
                                      )                                         
                                  else:                                         
                                      imported_bundle = result                  
                                                                                
                                  outputs.set_value("file_bundle", imported_…   
                                                                                
                         ─────────────────────────────────────────────────────  
                                                                                

onboard.zenodo_record

                                                                                
 Documentation                                                                  
                          Download a dataset from zenodo.org.                   
                                                                                
 Author(s)                                                                      
                          Markus Binsteiner   markus@frkl.io                    
                                                                                
 Context                                                                        
                          Tags         onboarding                               
                          Labels       package: kiara_plugin.onboarding         
                          References   source_repo:                             
                                       https://github.com/DHARPA-Project/kia…   
                                       documentation:                           
                                       https://DHARPA-Project.github.io/kiar…   
                                                                                
 Module config schema                                                           
                          Field      Type     Descript…   Required   Default    
                         ─────────────────────────────────────────────────────  
                          constan…   object   Value       no                    
                                              constants                         
                                              for this                          
                                              module.                           
                                                                                
                          defaults   object   Value       no                    
                                              defaults                          
                                              for this                          
                                              module.                           
                                                                                
                          metadat…   string   The         no         "metada…   
                                              filename                          
                                              for the                           
                                              zenodo                            
                                              metadata.                         
                                                                                
 Python class                                                                   
                          python_class_name    ZenodoDownload                   
                          python_module_name   kiara_plugin.onboarding.modul…   
                          full_name            kiara_plugin.onboarding.modul…   
                                                                                
 Processing source code  ─────────────────────────────────────────────────────  
                          class ZenodoDownload(KiaraModule):                    
                              """Download a dataset from zenodo.org."""         
                                                                                
                              _module_type_name = "onboard.zenodo_record"       
                              _config_cls = ZenodoDownloadConfig                
                                                                                
                              def create_inputs_schema(                         
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  metadata_filename = self.get_config_value(   
                                  return {                                      
                                      "doi": {"type": "string", "doc": "The …   
                                      "include_metadata": {                     
                                          "type": "boolean",                    
                                          "doc": f"Whether to write the reco…   
                                          "default": True,                      
                                      },                                        
                                  }                                             
                                                                                
                              def create_outputs_schema(                        
                                  self,                                         
                              ) -> ValueMapSchema:                              
                                                                                
                                  return {                                      
                                      "file_bundle": {                          
                                          "type": "file_bundle",                
                                      }                                         
                                  }                                             
                                                                                
                              def download_file(self, file_data: Mapping[str…   
                                                                                
                                  import httpx                                  
                                                                                
                                  url = file_data["links"]["self"]              
                                  file_name = file_data["key"]                  
                                  checksum = file_data["checksum"][4:]          
                                                                                
                                  target_file = target_path / file_name         
                                                                                
                                  if target_file.exists():                      
                                      raise KiaraProcessingException(           
                                          f"Can't download file, target path…   
                                      )                                         
                                                                                
                                  hash_md5 = hashlib.md5()  # noqa              
                                                                                
                                  with open(target_file, "ab") as file2:        
                                      with httpx.Client() as client:            
                                          with client.stream("GET", url) as   
                                              for chunk in resp.iter_bytes():   
                                                  hash_md5.update(chunk)        
                                                  file2.write(chunk)            
                                                                                
                                  if checksum != hash_md5.hexdigest():          
                                      raise KiaraProcessingException(           
                                          f"Can't downloda file '{file_name}…   
                                      )                                         
                                                                                
                                  return target_file                            
                                                                                
                              def process(self, inputs: ValueMap, outputs: V…   
                                                                                
                                  import pyzenodo3                              
                                                                                
                                  include_metadata = inputs.get_value_data("…   
                                                                                
                                  doi = inputs.get_value_data("doi")            
                                  zen = pyzenodo3.Zenodo()                      
                                                                                
                                  record = zen.find_record_by_doi(doi)          
                                                                                
                                  path = KiaraFileBundle.create_tmp_dir()       
                                  shutil.rmtree(path, ignore_errors=True)       
                                  path.mkdir()                                  
                                  for file_data in record.data["files"]:        
                                      self.download_file(file_data, path)       
                                                                                
                                  if include_metadata:                          
                                      metadata_filename = self.get_config_va…   
                                      metadata_file = path / metadata_filena…   
                                      metadata_file.write_bytes(orjson.dumps…   
                                                                                
                                  bundle = KiaraFileBundle.import_folder(pat…   
                                  outputs.set_value("file_bundle", bundle)      
                                                                                
                         ─────────────────────────────────────────────────────