module_types
network_data.filters
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
filter_n… string The name yes
of the
filter.
Python class
python_class_name TableFiltersModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class TableFiltersModule(FilterModule):
_module_type_name = "network_data.filters"
@classmethod
def retrieve_supported_type(cls) -> Union[Dict…
return "network_data"
def create_filter_inputs(self, filter_name: st…
if filter_name == "component":
return {
"component_id": {
"type": "string",
"doc": "The id of the componen…
"default": "0",
},
"component_column": {
"type": "string",
"doc": "The name of the colum …
"default": COMPONENT_ID_COLUMN…
},
}
return None
def filter__component(self, value: Value, filt…
"""Retrieve a single sub-component from a …
component_id = filter_inputs["component_id…
component_column = filter_inputs["componen…
network_data: NetworkData = value.data
if component_column not in network_data.no…
msg = f"Component column `{component_c…
for attr in network_data.nodes.column_…
msg += f" - {attr}\n"
if component_column == COMPONENT_ID_CO…
msg = f"{msg}\n\nTry to run the `n…
raise KiaraProcessingException(msg)
network_data.nodes.arrow_table.column(comp…
# filter_item = pa.scalar(component_id, ty…
query = f"select {NODE_ID_COLUMN_NAME} fro…
node_result = network_data.query_nodes(que…
network_data = NetworkData.from_filtered_n…
network_data=network_data,
nodes_list=node_result.column(NODE_ID_…
)
return network_data
─────────────────────────────────────────────────────
export.network_data
Documentation
Export network data items.
Author(s)
Markus Binsteiner markus@frkl.io
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
source_t… string The type yes
of the
source
data that
is going
to be
exported.
target_p… string The name yes
of the
target
profile.
Used to
distingu…
different
target
formats
for the
same data
type.
Python class
python_class_name ExportNetworkDataModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class ExportNetworkDataModule(DataExportModule):
"""Export network data items."""
_module_type_name = "export.network_data"
def export__network_data__as__graphml_file(
self, value: NetworkData, base_path: str, …
):
"""Export network data as graphml file."""
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_graphml(graph, target_path)
return {"files": target_path}
def export__network_data__as__gexf_file(
self, value: NetworkData, base_path: str, …
):
"""Export network data as gexf file."""
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_gexf(graph, target_path)
return {"files": target_path}
def export__network_data__as__adjlist_file(
self, value: NetworkData, base_path: str, …
):
"""Export network data as adjacency list f…
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_adjlist(graph, target_path)
return {"files": target_path}
def export__network_data__as__multiline_adjlis…
self, value: NetworkData, base_path: str, …
):
"""Export network data as multiline adjace…
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_multiline_adjlist(graph, target_p…
return {"files": target_path}
def export__network_data__as__edgelist_file(
self, value: NetworkData, base_path: str, …
):
"""Export network data as edgelist file."""
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_edgelist(graph, target_path)
return {"files": target_path}
def export__network_data__as__network_text_fil…
self, value: NetworkData, base_path: str, …
):
"""Export network data as network text fil…
import networkx as nx
target_path = os.path.join(base_path, f"{n…
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(
nx.DiGraph, incl_node_attributes=True,…
)
nx.write_network_text(graph, target_path)
return {"files": target_path}
─────────────────────────────────────────────────────
create.network_data
Documentation
-- n/a --
Author(s)
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
constants object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
ignore_e… boolean Whether no false
to
ignore
convert
errors
and omit
the
failed
items.
source_t… string The yes
value
type of
the
source
value.
target_t… string The yes
value
type of
the
target.
Python class
python_class_name CreateNetworkDataModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class CreateNetworkDataModule(CreateFromModule):
_module_type_name = "create.network_data"
_config_cls = CreateNetworkDataModuleConfig
def create__network_data__from__file(self, sou…
"""Create a table from a file, trying to a…
Supported file formats (at the moment):
- gml
- gexf
- graphml (uses the standard xml library p…
- pajek
- leda
- graph6
- sparse6
"""
source_file: KiaraFile = source_value.data
# the name of the attribute kiara should u…
label_attr_name: Union[str, None] = None
# attributes to ignore when creating the n…
# mostly useful if we know that the file c…
# or for 'label', if we don't want to dupl…
ignore_node_attributes = None
if source_file.file_name.endswith(".gml"):
import networkx as nx
# we use 'lable="id"' here because net…
# we use the 'label' attribute for the…
graph = nx.read_gml(source_file.path, …
label_attr_name = "label"
ignore_node_attributes = ["label"]
elif source_file.file_name.endswith(".gexf…
import networkx as nx
graph = nx.read_gexf(source_file.path)
elif source_file.file_name.endswith(".grap…
import networkx as nx
graph = nx.read_graphml(source_file.pa…
elif source_file.file_name.endswith(".paje…
".net"
):
import networkx as nx
graph = nx.read_pajek(source_file.path)
elif source_file.file_name.endswith(".leda…
import networkx as nx
graph = nx.read_leda(source_file.path)
elif source_file.file_name.endswith(
".graph6"
) or source_file.file_name.endswith(".g6"):
import networkx as nx
graph = nx.read_graph6(source_file.pat…
elif source_file.file_name.endswith(
".sparse6"
) or source_file.file_name.endswith(".s6"):
import networkx as nx
graph = nx.read_sparse6(source_file.pa…
else:
supported_file_estensions = [
"gml",
"gexf",
"graphml",
"pajek",
"leda",
"graph6",
"g6",
"sparse6",
"s6",
]
msg = f"Can't create network data for …
raise KiaraProcessingException(msg)
return NetworkData.create_from_networkx_gr…
graph=graph,
label_attr_name=label_attr_name,
ignore_node_attributes=ignore_node_att…
)
─────────────────────────────────────────────────────
network_data.redefine_edges
Documentation
Redefine edges by merging duplicate edges and
applying aggregation functions to certain edge
attributes.
Author(s)
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
Python class
python_class_name RedefineNetworkEdgesModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class RedefineNetworkEdgesModule(KiaraModule):
"""Redefine edges by merging duplicate edges a…
_module_type_name = "network_data.redefine_edg…
def create_inputs_schema(
self,
) -> ValueMapSchema:
result = {
"network_data": {
"type": "network_data",
"doc": "The network data to flatte…
},
"attribute_map_strategies": {
"type": "kiara_model_list",
"type_config": {"kiara_model_id": …
"doc": "A list of specs on how to …
"optional": True,
},
}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {}
result["network_data"] = {
"type": "network_data",
"doc": "The network_data, with a new c…
}
return result
def process(self, inputs: ValueMap, outputs: V…
import duckdb
import pyarrow as pa
network_data_obj = inputs.get_value_obj("n…
network_data: NetworkData = network_data_o…
edges_table = network_data.edges.arrow_tab…
attr_map_strategies: Union[
None, KiaraModelList[AttributeMapStrat…
] = inputs.get_value_data("attribute_map_s…
if attr_map_strategies:
invalid_columns = set()
for strategy in attr_map_strategies.li…
if strategy.source_column_name == …
raise KiaraProcessingException(
msg=f"Can't redefine edges…
)
if strategy.source_column_name == …
raise KiaraProcessingException(
msg=f"Can't redefine edges…
)
if strategy.source_column_name not…
invalid_columns.add(strategy.s…
if invalid_columns:
msg = f"Can't redefine edges with …
msg = f"{msg}\n\nAvailable column …
for col_name in (
x for x in network_data.edges.…
):
msg = f"{msg}\n - {col_name}"
raise KiaraProcessingException(msg…
sql_tokens: List[str] = []
group_bys = [SOURCE_COLUMN_NAME, TARGET_CO…
if attr_map_strategies:
for strategy in attr_map_strategies.li…
if not strategy.transform_function:
column_type = edges_table.fiel…
if pa.types.is_integer(column_…
column_type
):
transform_function = "SUM"
else:
transform_function = "LIST"
else:
transform_function = strategy.…
transform_function = transform_fun…
if transform_function == "group_by…
group_bys.append(strategy.sour…
sql_token = None
elif transform_function == "string…
sql_token = f"STRING_AGG({stra…
else:
sql_token = f"{transform_funct…
if sql_token:
sql_tokens.append(sql_token)
query = f"""
SELECT
{', '.join(group_bys)},
{', '.join(sql_tokens)}
FROM edges_table
GROUP BY {', '.join(group_bys)}
"""
result = duckdb.sql(query)
new_edges_table = result.arrow()
network_data = NetworkData.create_network_…
nodes_table=network_data.nodes.arrow_t…
edges_table=new_edges_table,
augment_tables=True,
)
outputs.set_values(network_data=network_da…
─────────────────────────────────────────────────────
Documentation
Extract the largest connected component from this
network data.
This module analyses network data and checks if it
contains clusters, and if so, how many. If all
nodes are connected, all nodes will have '0' as
value in the component_id field.
Otherwise, the nodes will be assigned
'component_id'-s according to the component they
belong to, with the largest component having '0'
as component_id, the second largest '1' and so on.
If two components have the same size, who gets the
higher component_id is not determinate.
Author(s)
Markus Binsteiner markus@frkl.io
Caitlin Burge caitlin.burge@uni.lu
Lena Jaskov helena.jaskov@uni.lu
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
Python class
python_class_name ExtractLargestComponentModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class ExtractLargestComponentModule(KiaraModule):
"""Extract the largest connected component fro…
This module analyses network data and checks i…
Otherwise, the nodes will be assigned 'compone…
"""
_module_type_name = "network_data.extract_comp…
def create_inputs_schema(
self,
) -> ValueMapSchema:
result = {
"network_data": {
"type": "network_data",
"doc": "The network data to analyz…
}
}
return result
def create_outputs_schema(
self,
) -> ValueMapSchema:
result: Dict[str, Dict[str, Any]] = {}
result["network_data"] = {
"type": "network_data",
"doc": "The network_data, with a new c…
}
result["number_of_components"] = {
"type": "integer",
"doc": "The number of components in th…
}
result["is_connected"] = {
"type": "boolean",
"doc": "Whether the graph is connected…
}
return result
def process(self, inputs: ValueMap, outputs: V…
import pyarrow as pa
import rustworkx as rx
network_value = inputs.get_value_obj("netw…
network_data: NetworkData = network_value.…
# TODO: maybe this can be done directly in…
# for memory usage
undir_graph = network_data.as_rustworkx_gr…
graph_type=rx.PyGraph,
multigraph=False,
omit_self_loops=False,
attach_node_id_map=True,
)
undir_components = rx.connected_components…
nodes_columns_metadata = {
COMPONENT_ID_COLUMN_NAME: {
ATTRIBUTE_PROPERTY_KEY: COMPONENT_…
}
}
if len(undir_components) == 1:
nodes = network_data.nodes.arrow_table
components_column = pa.array([0] * len…
nodes = nodes.append_column(COMPONENT_…
network_data = NetworkData.create_netw…
nodes_table=nodes,
edges_table=network_data.edges.arr…
augment_tables=False,
nodes_column_metadata=nodes_column…
)
outputs.set_values(
network_data=network_data,
number_of_components=1,
is_connected=True,
)
return
number_of_components = len(undir_component…
is_connected = False
node_id_map = undir_graph.attrs["node_id_m…
node_components = {}
for idx, component in enumerate(
sorted(undir_components, key=len, reve…
):
for node in component:
node_id = node_id_map[node]
node_components[node_id] = idx
if len(node_components) != network_data.nu…
raise KiaraException(
"Number of nodes in component map …
)
components_column = pa.array(
(node_components[node_id] for node_id …
type=pa.int64(),
)
nodes = network_data.nodes.arrow_table
nodes = nodes.append_column(COMPONENT_ID_C…
network_data = NetworkData.create_network_…
nodes_table=nodes,
edges_table=network_data.edges.arrow_t…
augment_tables=False,
nodes_column_metadata=nodes_columns_me…
)
outputs.set_values(
is_connected=is_connected,
number_of_components=number_of_compone…
network_data=network_data,
)
─────────────────────────────────────────────────────
Documentation
Create a list of nodes that are cut-points.
Cut-points are any node in a network whose removal
disconnects members of the network, creating one or
more new distinct components.
Uses the rustworkx.articulation_points function.
Author(s)
Markus Binsteiner markus@frkl.io
Caitlin Burge caitlin.burge@uni.lu
Lena Jaskov helena.jaskov@uni.lu
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descript… Required Default
─────────────────────────────────────────────────────
constants object Value no
constants
for this
module.
defaults object Value no
defaults
for this
module.
Python class
python_class_name CutPointsList
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class CutPointsList(KiaraModule):
"""Create a list of nodes that are cut-points.
Cut-points are any node in a network whose rem…
Uses the [rustworkx.articulation_points](https…
"""
_module_type_name = "network_data.extract_cut_…
def create_inputs_schema(self):
return {
"network_data": {
"type": "network_data",
"doc": "The network graph being qu…
}
}
def create_outputs_schema(self):
return {
"network_data": {
"type": "network_data",
"doc": """The network_data, with a…
}
}
def process(self, inputs, outputs):
import pyarrow as pa
import rustworkx as rx
network_value = inputs.get_value_obj("netw…
network_data: NetworkData = network_value.…
# TODO: maybe this can be done directly in…
# for memory usage
undir_graph = network_data.as_rustworkx_gr…
graph_type=rx.PyGraph,
multigraph=False,
omit_self_loops=False,
attach_node_id_map=True,
)
node_id_map = undir_graph.attrs["node_id_m…
cut_points = rx.articulation_points(undir_…
translated_cut_points = [node_id_map[x] fo…
if not cut_points:
raise NotImplementedError()
cut_points_column = [
x in translated_cut_points for x in ra…
]
nodes = network_data.nodes.arrow_table
nodes = nodes.append_column(
IS_CUTPOINT_COLUMN_NAME, pa.array(cut_…
)
nodes_columns_metadata = {
IS_CUTPOINT_COLUMN_NAME: {
ATTRIBUTE_PROPERTY_KEY: CUT_POINTS…
}
}
network_data = NetworkData.create_network_…
nodes_table=nodes,
edges_table=network_data.edges.arrow_t…
augment_tables=False,
nodes_column_metadata=nodes_columns_me…
)
outputs.set_values(network_data=network_da…
─────────────────────────────────────────────────────
assemble.network_data
Documentation
Create a 'network_data' instance from one or two
tables.
This module needs at least one table as input,
providing the edges of the resulting network data
set. If no further table is created, basic node
information will be automatically created by using
unique values from the edges source and target
columns.
If no source_column_name (and/or
target_column_name) is provided, kiara will try to
auto-detect the most likely of the existing columns
to use. If that is not possible, an error will be
raised.
Author(s)
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Markus Binsteiner markus@frkl.io
Lena Jaskov helena.jaskov@uni.lu
Context
Tags network_analysis
Labels package: kiara_plugin.network_analysis
References source_repo:
https://github.com/DHARPA-Project/kia…
documentation:
https://DHARPA-Project.github.io/kiar…
Module config schema
Field Type Descrip… Required Default
─────────────────────────────────────────────────────
constan… object Value no
constan…
for this
module.
defaults object Value no
defaults
for this
module.
label_c… array Alias no [
strings "label…
to test "node_…
(in ]
order)
for
auto-de…
the node
label
column.
node_id… array Alias no [
strings "id",
to test "node_…
(in ]
order)
for
auto-de…
the node
id
column.
source_… array Alias no [
strings "sourc…
to test "sourc…
(in "sourc…
order) "from",
for "sende…
auto-de… ]
the
source
column.
target_… array Alias no [
strings "targe…
to test "targe…
(in "targe…
order) "to",
for "recei…
auto-de… ]
the
target
column.
Python class
python_class_name AssembleGraphFromTablesModule
python_module_name kiara_plugin.network_analysis…
full_name kiara_plugin.network_analysis…
Processing source code ─────────────────────────────────────────────────────
class AssembleGraphFromTablesModule(KiaraModule):
"""Create a 'network_data' instance from one o…
This module needs at least one table as input,…
If no further table is created, basic node inf…
If no `source_column_name` (and/or `target_col…
"""
_module_type_name = "assemble.network_data"
_config_cls = AssembleNetworkDataModuleConfig
def create_inputs_schema(
self,
) -> ValueMapSchema:
inputs: Mapping[str, Any] = {
"edges": {
"type": "table",
"doc": "A table that contains the …
"optional": False,
},
"source_column": {
"type": "string",
"doc": "The name of the source col…
"optional": True,
},
"target_column": {
"type": "string",
"doc": "The name of the target col…
"optional": True,
},
"edges_column_map": {
"type": "dict",
"doc": "An optional map of origina…
"optional": True,
},
"nodes": {
"type": "table",
"doc": "A table that contains the …
"optional": True,
},
"id_column": {
"type": "string",
"doc": "The name (before any poten…
"optional": True,
},
"label_column": {
"type": "string",
"doc": "The name of a column that …
"optional": True,
},
"nodes_column_map": {
"type": "dict",
"doc": "An optional map of origina…
"optional": True,
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueMapSchema:
outputs: Mapping[str, Any] = {
"network_data": {"type": "network_data…
}
return outputs
def process(self, inputs: ValueMap, outputs: V…
import polars as pl
# process nodes
nodes = inputs.get_value_obj("nodes")
# the nodes column map can be used to rena…
nodes_column_map: Dict[str, str] = inputs.…
if nodes_column_map is None:
nodes_column_map = {}
# we need to process the nodes first, beca…
# id to the new, internal, integer-based o…
if nodes.is_set:
job_log.add_log("processing nodes tabl…
nodes_table: KiaraTable = nodes.data
assert nodes_table is not None
nodes_column_names = nodes_table.colum…
# the most important column is the id …
id_column_name = inputs.get_value_data…
if id_column_name is None:
# try to auto-detect the id column
column_names_to_test = self.get_co…
for col_name in nodes_column_names:
if col_name.lower() in column_…
id_column_name = col_name
break
job_log.add_log(f"auto-detected id…
if id_column_name is None:
raise KiaraProcessingException(
f"Could not auto-determine…
)
if id_column_name not in nodes_column_…
raise KiaraProcessingException(
f"Could not find id column '{i…
)
nodes_column_map[id_column_name] = NOD…
if id_column_name in nodes_column_map.…
if nodes_column_map[id_column_name…
raise KiaraProcessingException(
f"Existing mapping of id c…
)
else:
nodes_column_map[id_column_name] =…
# the label is optional, if not specif…
label_column_name = inputs.get_value_d…
if label_column_name is None:
job_log.add_log("auto-detecting la…
column_names_to_test = self.get_co…
for col_name in nodes_column_names:
if col_name.lower() in column_…
label_column_name = col_na…
job_log.add_log(
f"auto-detected label …
)
break
if label_column_name and label_column_…
raise KiaraProcessingException(
f"Could not find id column '{i…
)
nodes_arrow_dataframe = nodes_table.to…
else:
nodes_arrow_dataframe = None
label_column_name = None
# process edges
job_log.add_log("processing edges table")
edges = inputs.get_value_obj("edges")
edges_table: KiaraTable = edges.data
edges_source_column_name = inputs.get_valu…
edges_target_column_name = inputs.get_valu…
edges_arrow_dataframe = edges_table.to_pol…
edges_column_names = edges_arrow_dataframe…
if edges_source_column_name is None:
job_log.add_log("auto-detecting source…
column_names_to_test = self.get_config…
for item in edges_column_names:
if item.lower() in column_names_to…
edges_source_column_name = item
job_log.add_log(
f"auto-detected source col…
)
break
if edges_target_column_name is None:
job_log.add_log("auto-detecting target…
column_names_to_test = self.get_config…
for item in edges_column_names:
if item.lower() in column_names_to…
edges_target_column_name = item
job_log.add_log(
f"auto-detected target col…
)
break
if not edges_source_column_name or not edg…
if not edges_source_column_name and no…
if len(edges_column_names) == 2:
job_log.add_log(
"using first two columns a…
)
edges_source_column_name = edg…
edges_target_column_name = edg…
else:
raise KiaraProcessingException(
f"Could not auto-detect so…
)
if not edges_source_column_name:
raise KiaraProcessingException(
f"Could not auto-detect source…
)
if not edges_target_column_name:
raise KiaraProcessingException(
f"Could not auto-detect target…
)
edges_column_map: Dict[str, str] = inputs.…
if edges_column_map is None:
edges_column_map = {}
if edges_source_column_name in edges_colum…
if edges_column_map[edges_source_colum…
raise KiaraProcessingException(
f"Existing mapping of source c…
)
else:
edges_column_map[edges_source_column_n…
if edges_target_column_name in edges_colum…
if edges_column_map[edges_target_colum…
raise KiaraProcessingException(
msg="Edges and source column n…
)
if edges_column_map[edges_target_colum…
raise KiaraProcessingException(
f"Existing mapping of target c…
)
else:
edges_column_map[edges_target_column_n…
if edges_source_column_name not in edges_c…
raise KiaraProcessingException(
f"Edges table does not contain sou…
)
if edges_target_column_name not in edges_c…
raise KiaraProcessingException(
f"Edges table does not contain tar…
)
source_column_old = edges_arrow_dataframe.…
target_column_old = edges_arrow_dataframe.…
job_log.add_log("generating node id map an…
# fill out the node id map
unique_node_ids_old = (
pl.concat([source_column_old, target_c…
.unique()
.sort()
)
if nodes_arrow_dataframe is None:
new_node_ids = range(0, len(unique_nod…
node_id_map = dict(zip(unique_node_ids…
# node_id_map = {
# node_id: new_node_id
# for node_id, new_node_id in
# }
nodes_arrow_dataframe = pl.DataFrame(
{
NODE_ID_COLUMN_NAME: new_node_…
LABEL_COLUMN_NAME: (str(x) for…
"id": unique_node_ids_old,
}
)
else:
id_column_old = nodes_arrow_dataframe.…
unique_node_ids_nodes_table = id_colum…
if len(unique_node_ids_old) > len(uniq…
~(unique_node_ids_old.is_in(unique…
raise NotImplementedError("MISSING…
else:
new_node_ids = range(0, len(id_col…
node_id_map = dict(zip(id_column_o…
# node_id_map = {
# node_id: new_node_id
# for node_id, new_node_id in
# }
new_idx_series = pl.Series(
name=NODE_ID_COLUMN_NAME, valu…
)
nodes_arrow_dataframe.insert_at_id…
if not label_column_name:
label_column_name = NODE_ID_CO…
# we create a copy of the label co…
label_column = nodes_arrow_datafra…
label_column_name
).rename(LABEL_COLUMN_NAME)
if label_column.dtype != pl.Utf8:
label_column = label_column.ca…
if label_column.null_count() != 0:
raise KiaraProcessingException(
f"Label column '{label_col…
)
nodes_arrow_dataframe = nodes_arro…
1, label_column
)
# TODO: deal with different types if node …
try:
source_column_mapped = source_column_o…
node_id_map, default=None
).rename(SOURCE_COLUMN_NAME)
except Exception:
raise KiaraProcessingException(
"Could not map node ids onto edges…
)
if source_column_mapped.is_null().any():
raise KiaraProcessingException(
"The source column contains values…
)
try:
target_column_mapped = target_column_o…
node_id_map, default=None
).rename(TARGET_COLUMN_NAME)
except Exception:
raise KiaraProcessingException(
"Could not map node ids onto edges…
)
if target_column_mapped.is_null().any():
raise KiaraProcessingException(
"The target column contains values…
)
edges_arrow_dataframe.insert_at_idx(0, sou…
edges_arrow_dataframe.insert_at_idx(1, tar…
edges_arrow_dataframe = edges_arrow_datafr…
edges_arrow_dataframe = edges_arrow_datafr…
edges_arrow_table = edges_arrow_dataframe.…
# edges_table_augmented = augment_edges_ta…
# # TODO: also index the other columns?
# edges_data_schema = create_sqlite_schema…
# table=edges_arrow_dataframe,
# index_columns=[SOURCE_COLUMN_NAME, T…
# column_map=edges_column_map,
# )
nodes_arrow_table = nodes_arrow_dataframe.…
job_log.add_log("creating network data ins…
network_data = NetworkData.create_network_…
nodes_table=nodes_arrow_table, edges_t…
)
outputs.set_value("network_data", network_…
─────────────────────────────────────────────────────