network_analysis
Top-level package for kiara_plugin.network_analysis.
KIARA_METADATA
¶
find_data_types: Union[Type, Tuple, Callable]
¶
find_model_classes: Union[Type, Tuple, Callable]
¶
find_modules: Union[Type, Tuple, Callable]
¶
find_pipelines: Union[Type, Tuple, Callable]
¶
get_version()
¶
Source code in network_analysis/__init__.py
def get_version():
from pkg_resources import DistributionNotFound, get_distribution
try:
# Change here if project is renamed and does not equal the package name
dist_name = __name__
__version__ = get_distribution(dist_name).version
except DistributionNotFound:
try:
version_file = os.path.join(os.path.dirname(__file__), "version.txt")
if os.path.exists(version_file):
with open(version_file, encoding="utf-8") as vf:
__version__ = vf.read()
else:
__version__ = "unknown"
except (Exception):
pass
if __version__ is None:
__version__ = "unknown"
return __version__
Modules¶
data_types
¶
This module contains the value type classes that are used in the kiara_plugin.network_analysis
package.
Classes¶
NetworkDataType (DatabaseType)
¶
Data that can be assembled into a graph.
Internally, this is backed by a sqlite database, using https://github.com/dpapathanasiou/simple-graph .
Source code in network_analysis/data_types.py
class NetworkDataType(DatabaseType):
"""Data that can be assembled into a graph.
Internally, this is backed by a sqlite database, using https://github.com/dpapathanasiou/simple-graph .
"""
_data_type_name = "network_data"
@classmethod
def python_class(cls) -> Type:
return NetworkData
def parse_python_obj(self, data: Any) -> NetworkData:
if isinstance(data, str):
# TODO: check path exists
return NetworkData(db_file_path=data)
return data
def _validate(cls, value: Any) -> None:
if isinstance(value, KiaraDatabase):
value = NetworkData(db_file_path=value.db_file_path)
if not isinstance(value, NetworkData):
raise ValueError(
f"Invalid type '{type(value)}': must be of 'NetworkData' (or a sub-class)."
)
network_data: NetworkData = value
table_names = network_data.table_names
for tn in ["edges", "nodes"]:
if tn not in table_names:
raise Exception(
f"Invalid 'network_data' value: database does not contain table '{tn}'"
)
table_names = network_data.table_names
if "edges" not in table_names:
raise Exception(
"Invalid 'network_data' value: database does not contain table 'edges'"
)
if "nodes" not in table_names:
raise Exception(
"Invalid 'network_data' value: database does not contain table 'nodes'"
)
edges_columns = network_data.edges_schema.columns
if SOURCE_COLUMN_NAME not in edges_columns.keys():
raise Exception(
f"Invalid 'network_data' value: 'edges' table does not contain a '{SOURCE_COLUMN_NAME}' column. Available columns: {', '.join(edges_columns.keys())}."
)
if TARGET_COLUMN_NAME not in edges_columns.keys():
raise Exception(
f"Invalid 'network_data' value: 'edges' table does not contain a '{TARGET_COLUMN_NAME}' column. Available columns: {', '.join(edges_columns.keys())}."
)
nodes_columns = network_data.nodes_schema.columns
if ID_COLUMN_NAME not in nodes_columns.keys():
raise Exception(
f"Invalid 'network_data' value: 'nodes' table does not contain a '{ID_COLUMN_NAME}' column. Available columns: {', '.join(nodes_columns.keys())}."
)
if LABEL_COLUMN_NAME not in nodes_columns.keys():
raise Exception(
f"Invalid 'network_data' value: 'nodes' table does not contain a '{LABEL_COLUMN_NAME}' column. Available columns: {', '.join(nodes_columns.keys())}."
)
def render_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Optional[int] = None
if max_rows:
half_lines = int(max_rows / 2)
db: NetworkData = value.data
result: List[Any] = [""]
atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.EDGES)
pretty = atw.pretty_print(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.EDGES.value}[/i]")
result.append(pretty)
atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.NODES)
pretty = atw.pretty_print(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.NODES.value}[/i]")
result.append(pretty)
return Group(*result)
Methods¶
parse_python_obj(self, data)
¶Parse a value into a supported python type.
This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
the value |
required |
Returns:
Type | Description |
---|---|
NetworkData |
'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object |
Source code in network_analysis/data_types.py
def parse_python_obj(self, data: Any) -> NetworkData:
if isinstance(data, str):
# TODO: check path exists
return NetworkData(db_file_path=data)
return data
python_class()
classmethod
¶Source code in network_analysis/data_types.py
@classmethod
def python_class(cls) -> Type:
return NetworkData
render_as__terminal_renderable(self, value, render_config)
¶Source code in network_analysis/data_types.py
def render_as__terminal_renderable(
self, value: Value, render_config: Mapping[str, Any]
) -> Any:
max_rows = render_config.get(
"max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
)
max_row_height = render_config.get(
"max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
)
max_cell_length = render_config.get(
"max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
)
half_lines: Optional[int] = None
if max_rows:
half_lines = int(max_rows / 2)
db: NetworkData = value.data
result: List[Any] = [""]
atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.EDGES)
pretty = atw.pretty_print(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.EDGES.value}[/i]")
result.append(pretty)
atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.NODES)
pretty = atw.pretty_print(
rows_head=half_lines,
rows_tail=half_lines,
max_row_height=max_row_height,
max_cell_length=max_cell_length,
)
result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.NODES.value}[/i]")
result.append(pretty)
return Group(*result)
defaults
¶
Attributes¶
DEFAULT_NETWORK_DATA_CHUNK_SIZE
¶
ID_COLUMN_NAME
¶
KIARA_PLUGIN_NETWORK_BASE_FOLDER
¶
Marker to indicate the base folder for the kiara network module package.
KIARA_PLUGIN_NETWORK_RESOURCES_FOLDER
¶
Default resources folder for this package.
LABEL_COLUMN_NAME
¶
SOURCE_COLUMN_NAME
¶
TARGET_COLUMN_NAME
¶
TEMPLATES_FOLDER
¶
Classes¶
models
¶
This module contains the metadata (and other) models that are used in the kiara_plugin.network_analysis
package.
Those models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata -- but also other type of models -- that is attached to data, as well as kiara modules.
Metadata models must be a sub-class of [kiara.metadata.MetadataModel][]. Other models usually sub-class a pydantic BaseModel or implement custom base classes.
Classes¶
GraphType (Enum)
¶
All possible graph types.
Source code in network_analysis/models.py
class GraphType(Enum):
"""All possible graph types."""
UNDIRECTED = "undirected"
DIRECTED = "directed"
UNDIRECTED_MULTI = "undirected-multi"
DIRECTED_MULTI = "directed-multi"
GraphTypesEnum (Enum)
¶
NetworkData (KiaraDatabase)
pydantic-model
¶
A helper class to access and query network datasets.
This class provides different ways to access the underlying network data, most notably via sql and as networkx Graph object.
Internally, network data is stored in a sqlite database with the edges stored in a table called 'edges' and the nodes, well, in a table aptly called 'nodes'.
Source code in network_analysis/models.py
class NetworkData(KiaraDatabase):
"""A helper class to access and query network datasets.
This class provides different ways to access the underlying network data, most notably via sql and as networkx Graph object.
Internally, network data is stored in a sqlite database with the edges stored in a table called 'edges' and the nodes, well,
in a table aptly called 'nodes'.
"""
_kiara_model_id = "instance.network_data"
@classmethod
def create_from_networkx_graph(cls, graph: "nx.Graph") -> "NetworkData":
"""Create a `NetworkData` instance from a networkx Graph object."""
edges_table = extract_edges_as_table(graph)
edges_schema = create_sqlite_schema_data_from_arrow_table(edges_table)
nodes_table = extract_nodes_as_table(graph)
nodes_schema = create_sqlite_schema_data_from_arrow_table(nodes_table)
network_data = NetworkData.create_in_temp_dir(
edges_schema=edges_schema, nodes_schema=nodes_schema, keep_unlocked=True
)
insert_table_data_into_network_graph(
network_data=network_data,
edges_table=edges_table,
nodes_table=nodes_table,
chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
)
network_data._lock_db()
return network_data
@classmethod
def create_in_temp_dir(
cls,
edges_schema: Union[None, SqliteTableSchema, Mapping] = None,
nodes_schema: Union[None, SqliteTableSchema, Mapping] = None,
keep_unlocked: bool = False,
):
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "network_data.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = cls(
db_file_path=db_path, edges_schema=edges_schema, nodes_schema=nodes_schema
)
db.create_if_not_exists()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
db.edges_schema.create_table(
table_name=NetworkDataTableType.EDGES.value, engine=engine
)
db.nodes_schema.create_table(
table_name=NetworkDataTableType.NODES.value, engine=engine
)
if not keep_unlocked:
db._lock_db()
return db
edges_schema: SqliteTableSchema = Field(
description="The schema information for the edges table."
)
nodes_schema: SqliteTableSchema = Field(
description="The schema information for the nodes table."
)
@root_validator(pre=True)
def pre_validate(cls, values):
_edges_schema = values.get("edges_schema", None)
_nodes_schema = values.get("nodes_schema", None)
if _edges_schema is None:
suggested_id_type = "TEXT"
if _nodes_schema is not None:
if isinstance(_nodes_schema, Mapping):
suggested_id_type = _nodes_schema.get(ID_COLUMN_NAME, "TEXT")
elif isinstance(_nodes_schema, SqliteTableSchema):
suggested_id_type = _nodes_schema.columns.get(
ID_COLUMN_NAME, "TEXT"
)
edges_schema = SqliteTableSchema.construct(
columns={
SOURCE_COLUMN_NAME: suggested_id_type,
TARGET_COLUMN_NAME: suggested_id_type,
}
)
else:
if isinstance(_edges_schema, Mapping):
edges_schema = SqliteTableSchema(**_edges_schema)
elif not isinstance(_edges_schema, SqliteTableSchema):
raise ValueError(
f"Invalid data type for edges schema: {type(_edges_schema)}"
)
else:
edges_schema = _edges_schema
if (
edges_schema.columns[SOURCE_COLUMN_NAME]
!= edges_schema.columns[TARGET_COLUMN_NAME]
):
raise ValueError(
f"Invalid edges schema, source and edges columns have different type: {edges_schema[SOURCE_COLUMN_NAME]} != {edges_schema[TARGET_COLUMN_NAME]}"
)
if _nodes_schema is None:
_nodes_schema = SqliteTableSchema.construct(
columns={
ID_COLUMN_NAME: edges_schema.columns[SOURCE_COLUMN_NAME],
LABEL_COLUMN_NAME: "TEXT",
}
)
if isinstance(_nodes_schema, Mapping):
nodes_schema = SqliteTableSchema(**_nodes_schema)
elif isinstance(_nodes_schema, SqliteTableSchema):
nodes_schema = _nodes_schema
else:
raise ValueError(
f"Invalid data type for nodes schema: {type(_edges_schema)}"
)
if ID_COLUMN_NAME not in nodes_schema.columns.keys():
raise ValueError(
f"Invalid nodes schema: missing '{ID_COLUMN_NAME}' column."
)
if LABEL_COLUMN_NAME not in nodes_schema.columns.keys():
nodes_schema.columns[LABEL_COLUMN_NAME] = "TEXT"
else:
if nodes_schema.columns[LABEL_COLUMN_NAME] != "TEXT":
raise ValueError(
f"Invalid nodes schema, '{LABEL_COLUMN_NAME}' column must be of type 'TEXT', not '{nodes_schema.columns[LABEL_COLUMN_NAME]}'."
)
if (
nodes_schema.columns[ID_COLUMN_NAME]
!= edges_schema.columns[SOURCE_COLUMN_NAME]
):
raise ValueError(
f"Invalid nodes schema, id column has different type to edges source/target columns: {nodes_schema.columns[ID_COLUMN_NAME]} != {edges_schema.columns[SOURCE_COLUMN_NAME]}"
)
values["edges_schema"] = edges_schema
values["nodes_schema"] = nodes_schema
return values
_nodes_table_obj: Optional[Table] = PrivateAttr(default=None)
_edges_table_obj: Optional[Table] = PrivateAttr(default=None)
_nx_graph = PrivateAttr(default={})
def _invalidate_other(self):
self._nodes_table_obj = None
self._edges_table_obj = None
def get_sqlalchemy_nodes_table(self) -> Table:
"""Return the sqlalchemy nodes table instance for this network datab."""
if self._nodes_table_obj is not None:
return self._nodes_table_obj
self._nodes_table_obj = Table(
NetworkDataTableType.NODES.value,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
return self._nodes_table_obj
def get_sqlalchemy_edges_table(self) -> Table:
"""Return the sqlalchemy edges table instance for this network datab."""
if self._edges_table_obj is not None:
return self._edges_table_obj
self._edges_table_obj = Table(
NetworkDataTableType.EDGES.value,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
return self._edges_table_obj
def insert_nodes(self, *nodes: Mapping[str, Any]):
"""Add nodes to a network data item.
Arguments:
nodes: a list of dicts with the nodes
"""
engine = self.get_sqlalchemy_engine()
nodes_table = self.get_sqlalchemy_nodes_table()
with engine.connect() as conn:
with conn.begin():
conn.execute(nodes_table.insert(), nodes)
def insert_edges(
self,
*edges: Mapping[str, Any],
existing_node_ids: Iterable[int] = None,
) -> Set[int]:
"""Add edges to a network data item.
All the edges need to have their node-ids registered already.
Arguments:
edges: a list of dicts with the edges
existing_node_ids: a set of ids that can be assumed to already exist, this is mainly for performance reasons
Returns:
a unique set of all node ids contained in source and target columns
"""
if existing_node_ids is None:
# TODO: run query
existing_node_ids = set()
else:
existing_node_ids = set(existing_node_ids)
required_node_ids = set((edge[SOURCE_COLUMN_NAME] for edge in edges))
required_node_ids.update(edge[TARGET_COLUMN_NAME] for edge in edges)
node_ids = list(required_node_ids.difference(existing_node_ids))
if node_ids:
self.insert_nodes(
*(
{ID_COLUMN_NAME: node_id, LABEL_COLUMN_NAME: str(node_id)}
for node_id in node_ids
)
)
engine = self.get_sqlalchemy_engine()
with engine.connect() as conn:
with conn.begin():
conn.execute(self.get_sqlalchemy_edges_table().insert(), edges)
return required_node_ids
def as_networkx_graph(self, graph_type: Type["nx.Graph"]) -> "nx.Graph":
"""Return the network data as a networkx graph object.
Arguments:
graph_type: the networkx Graph class to use
"""
if graph_type in self._nx_graph.keys():
return self._nx_graph[graph_type]
graph = graph_type()
engine = self.get_sqlalchemy_engine()
nodes = self.get_sqlalchemy_nodes_table()
edges = self.get_sqlalchemy_edges_table()
with engine.connect() as conn:
with conn.begin():
result = conn.execute(nodes.select())
for r in result:
row = dict(r)
node_id = row.pop(ID_COLUMN_NAME)
graph.add_node(node_id, **row)
result = conn.execute(edges.select())
for r in result:
row = dict(r)
source = row.pop(SOURCE_COLUMN_NAME)
target = row.pop(TARGET_COLUMN_NAME)
graph.add_edge(source, target, **row)
self._nx_graph[graph_type] = graph
return self._nx_graph[graph_type]
Attributes¶
edges_schema: SqliteTableSchema
pydantic-field
required
¶The schema information for the edges table.
nodes_schema: SqliteTableSchema
pydantic-field
required
¶The schema information for the nodes table.
Methods¶
as_networkx_graph(self, graph_type)
¶Return the network data as a networkx graph object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
graph_type |
Type[nx.Graph] |
the networkx Graph class to use |
required |
Source code in network_analysis/models.py
def as_networkx_graph(self, graph_type: Type["nx.Graph"]) -> "nx.Graph":
"""Return the network data as a networkx graph object.
Arguments:
graph_type: the networkx Graph class to use
"""
if graph_type in self._nx_graph.keys():
return self._nx_graph[graph_type]
graph = graph_type()
engine = self.get_sqlalchemy_engine()
nodes = self.get_sqlalchemy_nodes_table()
edges = self.get_sqlalchemy_edges_table()
with engine.connect() as conn:
with conn.begin():
result = conn.execute(nodes.select())
for r in result:
row = dict(r)
node_id = row.pop(ID_COLUMN_NAME)
graph.add_node(node_id, **row)
result = conn.execute(edges.select())
for r in result:
row = dict(r)
source = row.pop(SOURCE_COLUMN_NAME)
target = row.pop(TARGET_COLUMN_NAME)
graph.add_edge(source, target, **row)
self._nx_graph[graph_type] = graph
return self._nx_graph[graph_type]
create_from_networkx_graph(graph)
classmethod
¶Create a NetworkData
instance from a networkx Graph object.
Source code in network_analysis/models.py
@classmethod
def create_from_networkx_graph(cls, graph: "nx.Graph") -> "NetworkData":
"""Create a `NetworkData` instance from a networkx Graph object."""
edges_table = extract_edges_as_table(graph)
edges_schema = create_sqlite_schema_data_from_arrow_table(edges_table)
nodes_table = extract_nodes_as_table(graph)
nodes_schema = create_sqlite_schema_data_from_arrow_table(nodes_table)
network_data = NetworkData.create_in_temp_dir(
edges_schema=edges_schema, nodes_schema=nodes_schema, keep_unlocked=True
)
insert_table_data_into_network_graph(
network_data=network_data,
edges_table=edges_table,
nodes_table=nodes_table,
chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
)
network_data._lock_db()
return network_data
create_in_temp_dir(edges_schema=None, nodes_schema=None, keep_unlocked=False)
classmethod
¶Source code in network_analysis/models.py
@classmethod
def create_in_temp_dir(
cls,
edges_schema: Union[None, SqliteTableSchema, Mapping] = None,
nodes_schema: Union[None, SqliteTableSchema, Mapping] = None,
keep_unlocked: bool = False,
):
temp_f = tempfile.mkdtemp()
db_path = os.path.join(temp_f, "network_data.sqlite")
def cleanup():
shutil.rmtree(db_path, ignore_errors=True)
atexit.register(cleanup)
db = cls(
db_file_path=db_path, edges_schema=edges_schema, nodes_schema=nodes_schema
)
db.create_if_not_exists()
db._unlock_db()
engine = db.get_sqlalchemy_engine()
db.edges_schema.create_table(
table_name=NetworkDataTableType.EDGES.value, engine=engine
)
db.nodes_schema.create_table(
table_name=NetworkDataTableType.NODES.value, engine=engine
)
if not keep_unlocked:
db._lock_db()
return db
get_sqlalchemy_edges_table(self)
¶Return the sqlalchemy edges table instance for this network datab.
Source code in network_analysis/models.py
def get_sqlalchemy_edges_table(self) -> Table:
"""Return the sqlalchemy edges table instance for this network datab."""
if self._edges_table_obj is not None:
return self._edges_table_obj
self._edges_table_obj = Table(
NetworkDataTableType.EDGES.value,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
return self._edges_table_obj
get_sqlalchemy_nodes_table(self)
¶Return the sqlalchemy nodes table instance for this network datab.
Source code in network_analysis/models.py
def get_sqlalchemy_nodes_table(self) -> Table:
"""Return the sqlalchemy nodes table instance for this network datab."""
if self._nodes_table_obj is not None:
return self._nodes_table_obj
self._nodes_table_obj = Table(
NetworkDataTableType.NODES.value,
self.get_sqlalchemy_metadata(),
autoload_with=self.get_sqlalchemy_engine(),
)
return self._nodes_table_obj
insert_edges(self, *edges, *, existing_node_ids=None)
¶Add edges to a network data item.
All the edges need to have their node-ids registered already.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
edges |
Mapping[str, Any] |
a list of dicts with the edges |
() |
existing_node_ids |
Iterable[int] |
a set of ids that can be assumed to already exist, this is mainly for performance reasons |
None |
Returns:
Type | Description |
---|---|
Set[int] |
a unique set of all node ids contained in source and target columns |
Source code in network_analysis/models.py
def insert_edges(
self,
*edges: Mapping[str, Any],
existing_node_ids: Iterable[int] = None,
) -> Set[int]:
"""Add edges to a network data item.
All the edges need to have their node-ids registered already.
Arguments:
edges: a list of dicts with the edges
existing_node_ids: a set of ids that can be assumed to already exist, this is mainly for performance reasons
Returns:
a unique set of all node ids contained in source and target columns
"""
if existing_node_ids is None:
# TODO: run query
existing_node_ids = set()
else:
existing_node_ids = set(existing_node_ids)
required_node_ids = set((edge[SOURCE_COLUMN_NAME] for edge in edges))
required_node_ids.update(edge[TARGET_COLUMN_NAME] for edge in edges)
node_ids = list(required_node_ids.difference(existing_node_ids))
if node_ids:
self.insert_nodes(
*(
{ID_COLUMN_NAME: node_id, LABEL_COLUMN_NAME: str(node_id)}
for node_id in node_ids
)
)
engine = self.get_sqlalchemy_engine()
with engine.connect() as conn:
with conn.begin():
conn.execute(self.get_sqlalchemy_edges_table().insert(), edges)
return required_node_ids
insert_nodes(self, *nodes)
¶Add nodes to a network data item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
nodes |
Mapping[str, Any] |
a list of dicts with the nodes |
() |
Source code in network_analysis/models.py
def insert_nodes(self, *nodes: Mapping[str, Any]):
"""Add nodes to a network data item.
Arguments:
nodes: a list of dicts with the nodes
"""
engine = self.get_sqlalchemy_engine()
nodes_table = self.get_sqlalchemy_nodes_table()
with engine.connect() as conn:
with conn.begin():
conn.execute(nodes_table.insert(), nodes)
pre_validate(values)
classmethod
¶Source code in network_analysis/models.py
@root_validator(pre=True)
def pre_validate(cls, values):
_edges_schema = values.get("edges_schema", None)
_nodes_schema = values.get("nodes_schema", None)
if _edges_schema is None:
suggested_id_type = "TEXT"
if _nodes_schema is not None:
if isinstance(_nodes_schema, Mapping):
suggested_id_type = _nodes_schema.get(ID_COLUMN_NAME, "TEXT")
elif isinstance(_nodes_schema, SqliteTableSchema):
suggested_id_type = _nodes_schema.columns.get(
ID_COLUMN_NAME, "TEXT"
)
edges_schema = SqliteTableSchema.construct(
columns={
SOURCE_COLUMN_NAME: suggested_id_type,
TARGET_COLUMN_NAME: suggested_id_type,
}
)
else:
if isinstance(_edges_schema, Mapping):
edges_schema = SqliteTableSchema(**_edges_schema)
elif not isinstance(_edges_schema, SqliteTableSchema):
raise ValueError(
f"Invalid data type for edges schema: {type(_edges_schema)}"
)
else:
edges_schema = _edges_schema
if (
edges_schema.columns[SOURCE_COLUMN_NAME]
!= edges_schema.columns[TARGET_COLUMN_NAME]
):
raise ValueError(
f"Invalid edges schema, source and edges columns have different type: {edges_schema[SOURCE_COLUMN_NAME]} != {edges_schema[TARGET_COLUMN_NAME]}"
)
if _nodes_schema is None:
_nodes_schema = SqliteTableSchema.construct(
columns={
ID_COLUMN_NAME: edges_schema.columns[SOURCE_COLUMN_NAME],
LABEL_COLUMN_NAME: "TEXT",
}
)
if isinstance(_nodes_schema, Mapping):
nodes_schema = SqliteTableSchema(**_nodes_schema)
elif isinstance(_nodes_schema, SqliteTableSchema):
nodes_schema = _nodes_schema
else:
raise ValueError(
f"Invalid data type for nodes schema: {type(_edges_schema)}"
)
if ID_COLUMN_NAME not in nodes_schema.columns.keys():
raise ValueError(
f"Invalid nodes schema: missing '{ID_COLUMN_NAME}' column."
)
if LABEL_COLUMN_NAME not in nodes_schema.columns.keys():
nodes_schema.columns[LABEL_COLUMN_NAME] = "TEXT"
else:
if nodes_schema.columns[LABEL_COLUMN_NAME] != "TEXT":
raise ValueError(
f"Invalid nodes schema, '{LABEL_COLUMN_NAME}' column must be of type 'TEXT', not '{nodes_schema.columns[LABEL_COLUMN_NAME]}'."
)
if (
nodes_schema.columns[ID_COLUMN_NAME]
!= edges_schema.columns[SOURCE_COLUMN_NAME]
):
raise ValueError(
f"Invalid nodes schema, id column has different type to edges source/target columns: {nodes_schema.columns[ID_COLUMN_NAME]} != {edges_schema.columns[SOURCE_COLUMN_NAME]}"
)
values["edges_schema"] = edges_schema
values["nodes_schema"] = nodes_schema
return values
NetworkGraphProperties (ValueMetadata)
pydantic-model
¶
File stats.
Source code in network_analysis/models.py
class NetworkGraphProperties(ValueMetadata):
"""File stats."""
_metadata_key = "graph_properties"
number_of_nodes: int = Field(description="Number of nodes in the network graph.")
properties_by_graph_type: List[PropertiesByGraphType] = Field(
description="Properties of the network data, by graph type."
)
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["network_data"]
@classmethod
def create_value_metadata(cls, value: Value) -> "NetworkGraphProperties":
from sqlalchemy import text
network_data: NetworkData = value.data
with network_data.get_sqlalchemy_engine().connect() as con:
result = con.execute(text("SELECT count(*) from nodes"))
num_rows = result.fetchone()[0]
result = con.execute(text("SELECT count(*) from edges"))
num_rows_eges = result.fetchone()[0]
result = con.execute(
text("SELECT COUNT(*) FROM (SELECT DISTINCT source, target FROM edges)")
)
num_edges_directed = result.fetchone()[0]
query = "SELECT COUNT(*) FROM edges WHERE rowid in (SELECT DISTINCT MIN(rowid) FROM (SELECT rowid, source, target from edges UNION ALL SELECT rowid, target, source from edges) GROUP BY source, target)"
result = con.execute(text(query))
num_edges_undirected = result.fetchone()[0]
directed = PropertiesByGraphType(
graph_type=GraphType.DIRECTED, number_of_edges=num_edges_directed
)
undirected = PropertiesByGraphType(
graph_type=GraphType.UNDIRECTED, number_of_edges=num_edges_undirected
)
directed_multi = PropertiesByGraphType(
graph_type=GraphType.DIRECTED_MULTI, number_of_edges=num_rows_eges
)
undirected_multi = PropertiesByGraphType(
graph_type=GraphType.UNDIRECTED_MULTI, number_of_edges=num_rows_eges
)
return cls(
number_of_nodes=num_rows,
properties_by_graph_type=[
directed,
undirected,
directed_multi,
undirected_multi,
],
)
Attributes¶
number_of_nodes: int
pydantic-field
required
¶Number of nodes in the network graph.
properties_by_graph_type: List[kiara_plugin.network_analysis.models.PropertiesByGraphType]
pydantic-field
required
¶Properties of the network data, by graph type.
create_value_metadata(value)
classmethod
¶Source code in network_analysis/models.py
@classmethod
def create_value_metadata(cls, value: Value) -> "NetworkGraphProperties":
from sqlalchemy import text
network_data: NetworkData = value.data
with network_data.get_sqlalchemy_engine().connect() as con:
result = con.execute(text("SELECT count(*) from nodes"))
num_rows = result.fetchone()[0]
result = con.execute(text("SELECT count(*) from edges"))
num_rows_eges = result.fetchone()[0]
result = con.execute(
text("SELECT COUNT(*) FROM (SELECT DISTINCT source, target FROM edges)")
)
num_edges_directed = result.fetchone()[0]
query = "SELECT COUNT(*) FROM edges WHERE rowid in (SELECT DISTINCT MIN(rowid) FROM (SELECT rowid, source, target from edges UNION ALL SELECT rowid, target, source from edges) GROUP BY source, target)"
result = con.execute(text(query))
num_edges_undirected = result.fetchone()[0]
directed = PropertiesByGraphType(
graph_type=GraphType.DIRECTED, number_of_edges=num_edges_directed
)
undirected = PropertiesByGraphType(
graph_type=GraphType.UNDIRECTED, number_of_edges=num_edges_undirected
)
directed_multi = PropertiesByGraphType(
graph_type=GraphType.DIRECTED_MULTI, number_of_edges=num_rows_eges
)
undirected_multi = PropertiesByGraphType(
graph_type=GraphType.UNDIRECTED_MULTI, number_of_edges=num_rows_eges
)
return cls(
number_of_nodes=num_rows,
properties_by_graph_type=[
directed,
undirected,
directed_multi,
undirected_multi,
],
)
retrieve_supported_data_types()
classmethod
¶Source code in network_analysis/models.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
return ["network_data"]
PropertiesByGraphType (BaseModel)
pydantic-model
¶
Properties of graph data, if interpreted as a specific graph type.
Source code in network_analysis/models.py
class PropertiesByGraphType(BaseModel):
"""Properties of graph data, if interpreted as a specific graph type."""
graph_type: GraphType = Field(description="The graph type name.")
number_of_edges: int = Field(description="The number of edges.")
modules
special
¶
Classes¶
CreateGraphFromTablesModule (KiaraModule)
¶
Create a graph object from one or two tables.
Source code in network_analysis/modules/__init__.py
class CreateGraphFromTablesModule(KiaraModule):
"""Create a graph object from one or two tables."""
_module_type_name = "create.network_data.from.tables"
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Mapping[str, Any] = {
"edges": {
"type": "table",
"doc": "A table that contains the edges data.",
"optional": False,
},
"source_column_name": {
"type": "string",
"doc": "The name of the source column name in the edges table.",
"default": SOURCE_COLUMN_NAME,
},
"target_column_name": {
"type": "string",
"doc": "The name of the target column name in the edges table.",
"default": TARGET_COLUMN_NAME,
},
"edges_column_map": {
"type": "dict",
"doc": "An optional map of original column name to desired.",
"optional": True,
},
"nodes": {
"type": "table",
"doc": "A table that contains the nodes data.",
"optional": True,
},
"id_column_name": {
"type": "string",
"doc": "The name (before any potential column mapping) of the node-table column that contains the node identifier (used in the edges table).",
"default": ID_COLUMN_NAME,
},
"label_column_name": {
"type": "string",
"doc": "The name of a column that contains the node label (before any potential column name mapping). If not specified, the value of the id value will be used as label.",
"optional": True,
},
"nodes_column_map": {
"type": "dict",
"doc": "An optional map of original column name to desired.",
"optional": True,
},
}
return inputs
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs: Mapping[str, Any] = {
"network_data": {"type": "network_data", "doc": "The network/graph data."}
}
return outputs
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
pass
edges = inputs.get_value_obj("edges")
edges_table: KiaraTable = edges.data
edges_source_column_name = inputs.get_value_data("source_column_name")
edges_target_column_name = inputs.get_value_data("target_column_name")
edges_columns = edges_table.column_names
if edges_source_column_name not in edges_columns:
raise KiaraProcessingException(
f"Edges table does not contain source column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
)
if edges_target_column_name not in edges_columns:
raise KiaraProcessingException(
f"Edges table does not contain target column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
)
nodes = inputs.get_value_obj("nodes")
id_column_name = inputs.get_value_data("id_column_name")
label_column_name = inputs.get_value_data("label_column_name")
nodes_column_map: Dict[str, str] = inputs.get_value_data("nodes_column_map")
if nodes_column_map is None:
nodes_column_map = {}
edges_column_map: Dict[str, str] = inputs.get_value_data("edges_column_map")
if edges_column_map is None:
edges_column_map = {}
if edges_source_column_name in edges_column_map.keys():
raise KiaraProcessingException(
"The value of the 'source_column_name' argument is not allowed in the edges column map."
)
if edges_target_column_name in edges_column_map.keys():
raise KiaraProcessingException(
"The value of the 'source_column_name' argument is not allowed in the edges column map."
)
edges_column_map[edges_source_column_name] = SOURCE_COLUMN_NAME
edges_column_map[edges_target_column_name] = TARGET_COLUMN_NAME
edges_data_schema = create_sqlite_schema_data_from_arrow_table(
table=edges_table.arrow_table,
index_columns=[SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME],
column_map=edges_column_map,
)
nodes_table: Optional[KiaraTable] = None
if nodes.is_set:
if (
id_column_name in nodes_column_map.keys()
and nodes_column_map[id_column_name] != ID_COLUMN_NAME
):
raise KiaraProcessingException(
"The value of the 'id_column_name' argument is not allowed in the node column map."
)
nodes_column_map[id_column_name] = ID_COLUMN_NAME
nodes_table = nodes.data
extra_schema = []
if label_column_name is None:
label_column_name = LABEL_COLUMN_NAME
for cn in nodes_table.column_names:
if cn.lower() == LABEL_COLUMN_NAME.lower():
label_column_name = cn
break
if LABEL_COLUMN_NAME in nodes_table.column_names:
if label_column_name != LABEL_COLUMN_NAME:
raise KiaraProcessingException(
f"Can't create database for graph data: original data contains column called 'label', which is a protected column name. If this column can be used as a label, remove your '{label_column_name}' input value for the 'label_column_name' input and re-run this module."
)
if label_column_name in nodes_table.column_names:
if label_column_name in nodes_column_map.keys():
raise KiaraProcessingException(
"The value of the 'label_column_name' argument is not allowed in the node column map."
)
else:
extra_schema.append(" label TEXT")
nodes_column_map[label_column_name] = LABEL_COLUMN_NAME
nullable_columns = list(nodes_table.column_names)
if ID_COLUMN_NAME in nullable_columns:
nullable_columns.remove(ID_COLUMN_NAME)
nodes_data_schema = create_sqlite_schema_data_from_arrow_table(
table=nodes_table.arrow_table,
index_columns=[ID_COLUMN_NAME],
column_map=nodes_column_map,
nullable_columns=[],
unique_columns=[ID_COLUMN_NAME],
)
else:
nodes_data_schema = None
network_data = NetworkData.create_in_temp_dir(
edges_schema=edges_data_schema,
nodes_schema=nodes_data_schema,
keep_unlocked=True,
)
insert_table_data_into_network_graph(
network_data=network_data,
edges_table=edges_table.arrow_table,
edges_column_map=edges_column_map,
nodes_table=None if nodes_table is None else nodes_table.arrow_table,
nodes_column_map=nodes_column_map,
chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
)
network_data._lock_db()
outputs.set_value("network_data", network_data)
Methods¶
create_inputs_schema(self)
¶Return the schema for this types' inputs.
Source code in network_analysis/modules/__init__.py
def create_inputs_schema(
self,
) -> ValueSetSchema:
inputs: Mapping[str, Any] = {
"edges": {
"type": "table",
"doc": "A table that contains the edges data.",
"optional": False,
},
"source_column_name": {
"type": "string",
"doc": "The name of the source column name in the edges table.",
"default": SOURCE_COLUMN_NAME,
},
"target_column_name": {
"type": "string",
"doc": "The name of the target column name in the edges table.",
"default": TARGET_COLUMN_NAME,
},
"edges_column_map": {
"type": "dict",
"doc": "An optional map of original column name to desired.",
"optional": True,
},
"nodes": {
"type": "table",
"doc": "A table that contains the nodes data.",
"optional": True,
},
"id_column_name": {
"type": "string",
"doc": "The name (before any potential column mapping) of the node-table column that contains the node identifier (used in the edges table).",
"default": ID_COLUMN_NAME,
},
"label_column_name": {
"type": "string",
"doc": "The name of a column that contains the node label (before any potential column name mapping). If not specified, the value of the id value will be used as label.",
"optional": True,
},
"nodes_column_map": {
"type": "dict",
"doc": "An optional map of original column name to desired.",
"optional": True,
},
}
return inputs
create_outputs_schema(self)
¶Return the schema for this types' outputs.
Source code in network_analysis/modules/__init__.py
def create_outputs_schema(
self,
) -> ValueSetSchema:
outputs: Mapping[str, Any] = {
"network_data": {"type": "network_data", "doc": "The network/graph data."}
}
return outputs
process(self, inputs, outputs)
¶Source code in network_analysis/modules/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:
pass
edges = inputs.get_value_obj("edges")
edges_table: KiaraTable = edges.data
edges_source_column_name = inputs.get_value_data("source_column_name")
edges_target_column_name = inputs.get_value_data("target_column_name")
edges_columns = edges_table.column_names
if edges_source_column_name not in edges_columns:
raise KiaraProcessingException(
f"Edges table does not contain source column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
)
if edges_target_column_name not in edges_columns:
raise KiaraProcessingException(
f"Edges table does not contain target column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
)
nodes = inputs.get_value_obj("nodes")
id_column_name = inputs.get_value_data("id_column_name")
label_column_name = inputs.get_value_data("label_column_name")
nodes_column_map: Dict[str, str] = inputs.get_value_data("nodes_column_map")
if nodes_column_map is None:
nodes_column_map = {}
edges_column_map: Dict[str, str] = inputs.get_value_data("edges_column_map")
if edges_column_map is None:
edges_column_map = {}
if edges_source_column_name in edges_column_map.keys():
raise KiaraProcessingException(
"The value of the 'source_column_name' argument is not allowed in the edges column map."
)
if edges_target_column_name in edges_column_map.keys():
raise KiaraProcessingException(
"The value of the 'source_column_name' argument is not allowed in the edges column map."
)
edges_column_map[edges_source_column_name] = SOURCE_COLUMN_NAME
edges_column_map[edges_target_column_name] = TARGET_COLUMN_NAME
edges_data_schema = create_sqlite_schema_data_from_arrow_table(
table=edges_table.arrow_table,
index_columns=[SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME],
column_map=edges_column_map,
)
nodes_table: Optional[KiaraTable] = None
if nodes.is_set:
if (
id_column_name in nodes_column_map.keys()
and nodes_column_map[id_column_name] != ID_COLUMN_NAME
):
raise KiaraProcessingException(
"The value of the 'id_column_name' argument is not allowed in the node column map."
)
nodes_column_map[id_column_name] = ID_COLUMN_NAME
nodes_table = nodes.data
extra_schema = []
if label_column_name is None:
label_column_name = LABEL_COLUMN_NAME
for cn in nodes_table.column_names:
if cn.lower() == LABEL_COLUMN_NAME.lower():
label_column_name = cn
break
if LABEL_COLUMN_NAME in nodes_table.column_names:
if label_column_name != LABEL_COLUMN_NAME:
raise KiaraProcessingException(
f"Can't create database for graph data: original data contains column called 'label', which is a protected column name. If this column can be used as a label, remove your '{label_column_name}' input value for the 'label_column_name' input and re-run this module."
)
if label_column_name in nodes_table.column_names:
if label_column_name in nodes_column_map.keys():
raise KiaraProcessingException(
"The value of the 'label_column_name' argument is not allowed in the node column map."
)
else:
extra_schema.append(" label TEXT")
nodes_column_map[label_column_name] = LABEL_COLUMN_NAME
nullable_columns = list(nodes_table.column_names)
if ID_COLUMN_NAME in nullable_columns:
nullable_columns.remove(ID_COLUMN_NAME)
nodes_data_schema = create_sqlite_schema_data_from_arrow_table(
table=nodes_table.arrow_table,
index_columns=[ID_COLUMN_NAME],
column_map=nodes_column_map,
nullable_columns=[],
unique_columns=[ID_COLUMN_NAME],
)
else:
nodes_data_schema = None
network_data = NetworkData.create_in_temp_dir(
edges_schema=edges_data_schema,
nodes_schema=nodes_data_schema,
keep_unlocked=True,
)
insert_table_data_into_network_graph(
network_data=network_data,
edges_table=edges_table.arrow_table,
edges_column_map=edges_column_map,
nodes_table=None if nodes_table is None else nodes_table.arrow_table,
nodes_column_map=nodes_column_map,
chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
)
network_data._lock_db()
outputs.set_value("network_data", network_data)
ExportNetworkDataModule (DataExportModule)
¶
Export network data items.
Source code in network_analysis/modules/__init__.py
class ExportNetworkDataModule(DataExportModule):
"""Export network data items."""
_module_type_name = "export.network_data"
def export_as__graphml_file(self, value: NetworkData, base_path: str, name: str):
import networkx as nx
target_path = os.path.join(base_path, f"{name}.graphml")
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(nx.DiGraph)
nx.write_graphml(graph, target_path)
return {"files": target_path}
def export__network_data__as__sqlite_db(
self, value: NetworkData, base_path: str, name: str
):
target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
shutil.copy2(value.db_file_path, target_path)
return {"files": target_path}
def export__network_data__as__sql_dump(
self, value: NetworkData, base_path: str, name: str
):
import sqlite_utils
db = sqlite_utils.Database(value.db_file_path)
target_path = Path(os.path.join(base_path, f"{name}.sql"))
with target_path.open("wt") as f:
for line in db.conn.iterdump():
f.write(line + "\n")
return {"files": target_path}
def export__network_data__as__csv_files(
self, value: NetworkData, base_path: str, name: str
):
import sqlite3
files = []
for table_name in value.table_names:
target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
os.makedirs(os.path.dirname(target_path), exist_ok=True)
# copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
con = sqlite3.connect(value.db_file_path)
outfile = open(target_path, "wt")
outcsv = csv.writer(outfile)
cursor = con.execute(f"select * from {table_name}")
# dump column titles (optional)
outcsv.writerow(x[0] for x in cursor.description)
# dump rows
outcsv.writerows(cursor.fetchall())
outfile.close()
files.append(target_path)
return {"files": files}
export__network_data__as__csv_files(self, value, base_path, name)
¶Source code in network_analysis/modules/__init__.py
def export__network_data__as__csv_files(
self, value: NetworkData, base_path: str, name: str
):
import sqlite3
files = []
for table_name in value.table_names:
target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
os.makedirs(os.path.dirname(target_path), exist_ok=True)
# copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
con = sqlite3.connect(value.db_file_path)
outfile = open(target_path, "wt")
outcsv = csv.writer(outfile)
cursor = con.execute(f"select * from {table_name}")
# dump column titles (optional)
outcsv.writerow(x[0] for x in cursor.description)
# dump rows
outcsv.writerows(cursor.fetchall())
outfile.close()
files.append(target_path)
return {"files": files}
export__network_data__as__sql_dump(self, value, base_path, name)
¶Source code in network_analysis/modules/__init__.py
def export__network_data__as__sql_dump(
self, value: NetworkData, base_path: str, name: str
):
import sqlite_utils
db = sqlite_utils.Database(value.db_file_path)
target_path = Path(os.path.join(base_path, f"{name}.sql"))
with target_path.open("wt") as f:
for line in db.conn.iterdump():
f.write(line + "\n")
return {"files": target_path}
export__network_data__as__sqlite_db(self, value, base_path, name)
¶Source code in network_analysis/modules/__init__.py
def export__network_data__as__sqlite_db(
self, value: NetworkData, base_path: str, name: str
):
target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
shutil.copy2(value.db_file_path, target_path)
return {"files": target_path}
export_as__graphml_file(self, value, base_path, name)
¶Source code in network_analysis/modules/__init__.py
def export_as__graphml_file(self, value: NetworkData, base_path: str, name: str):
import networkx as nx
target_path = os.path.join(base_path, f"{name}.graphml")
# TODO: can't just assume digraph
graph: nx.Graph = value.as_networkx_graph(nx.DiGraph)
nx.write_graphml(graph, target_path)
return {"files": target_path}
pipelines
special
¶
Default (empty) module that is used as a base path for pipelines contained in this package.
utils
¶
NetworkDataTabularWrap (TabularWrap)
¶
Source code in network_analysis/utils.py
class NetworkDataTabularWrap(TabularWrap):
def __init__(self, db: "NetworkData", table_type: NetworkDataTableType):
self._db: NetworkData = db
self._table_type: NetworkDataTableType = table_type
super().__init__()
@property
def _table_name(self):
return self._table_type.value
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
def retrieve_column_names(self) -> typing.Iterable[str]:
from sqlalchemy import inspect
engine = self._db.get_sqlalchemy_engine()
inspector = inspect(engine)
columns = inspector.get_columns(self._table_type.value)
result = [column["name"] for column in columns]
return result
def slice(
self, offset: int = 0, length: typing.Optional[int] = None
) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
def to_pydict(self) -> typing.Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
retrieve_column_names(self)
¶Source code in network_analysis/utils.py
def retrieve_column_names(self) -> typing.Iterable[str]:
from sqlalchemy import inspect
engine = self._db.get_sqlalchemy_engine()
inspector = inspect(engine)
columns = inspector.get_columns(self._table_type.value)
result = [column["name"] for column in columns]
return result
retrieve_number_of_rows(self)
¶Source code in network_analysis/utils.py
def retrieve_number_of_rows(self) -> int:
from sqlalchemy import text
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
num_rows = result.fetchone()[0]
return num_rows
slice(self, offset=0, length=None)
¶Source code in network_analysis/utils.py
def slice(
self, offset: int = 0, length: typing.Optional[int] = None
) -> "TabularWrap":
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
if length:
query = f"{query} LIMIT {length}"
else:
query = f"{query} LIMIT {self.num_rows}"
if offset > 0:
query = f"{query} OFFSET {offset}"
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return DictTabularWrap(result_dict)
to_pydict(self)
¶Source code in network_analysis/utils.py
def to_pydict(self) -> typing.Mapping:
from sqlalchemy import text
query = f"SELECT * FROM {self._table_name}"
with self._db.get_sqlalchemy_engine().connect() as con:
result = con.execute(text(query))
result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
for cn in self.column_names:
result_dict[cn] = []
for r in result:
for i, cn in enumerate(self.column_names):
result_dict[cn].append(r[i])
return result_dict
convert_graphml_type_to_sqlite(data_type)
¶
Source code in network_analysis/utils.py
def convert_graphml_type_to_sqlite(data_type: str) -> str:
type_map = {
"boolean": "INTEGER",
"int": "INTEGER",
"long": "INTEGER",
"float": "REAL",
"double": "REAL",
"string": "TEXT",
}
return type_map[data_type]
extract_edges_as_table(graph)
¶
Source code in network_analysis/utils.py
def extract_edges_as_table(graph: "nx.Graph"):
# adapted from networx code
# License: 3-clause BSD license
# Copyright (C) 2004-2022, NetworkX Developers
import networkx as nx
import pyarrow as pa
edgelist = graph.edges(data=True)
source_nodes = [s for s, _, _ in edgelist]
target_nodes = [t for _, t, _ in edgelist]
all_attrs: typing.Set[str] = set().union(*(d.keys() for _, _, d in edgelist)) # type: ignore
if SOURCE_COLUMN_NAME in all_attrs:
raise nx.NetworkXError(
f"Source name {SOURCE_COLUMN_NAME} is an edge attribute name"
)
if SOURCE_COLUMN_NAME in all_attrs:
raise nx.NetworkXError(
f"Target name {SOURCE_COLUMN_NAME} is an edge attribute name"
)
nan = float("nan")
edge_attr = {k: [d.get(k, nan) for _, _, d in edgelist] for k in all_attrs}
edge_lists = {
SOURCE_COLUMN_NAME: source_nodes,
TARGET_COLUMN_NAME: target_nodes,
}
edge_lists.update(edge_attr)
edges_table = pa.Table.from_pydict(mapping=edge_lists)
return edges_table
extract_nodes_as_table(graph)
¶
Source code in network_analysis/utils.py
def extract_nodes_as_table(graph: "nx.Graph"):
# adapted from networx code
# License: 3-clause BSD license
# Copyright (C) 2004-2022, NetworkX Developers
import networkx as nx
import pyarrow as pa
nodelist = graph.nodes(data=True)
node_ids = [n for n, _ in nodelist]
all_attrs: typing.Set[str] = set().union(*(d.keys() for _, d in nodelist)) # type: ignore
if ID_COLUMN_NAME in all_attrs:
raise nx.NetworkXError(
f"Id column name {ID_COLUMN_NAME} is an node attribute name"
)
if SOURCE_COLUMN_NAME in all_attrs:
raise nx.NetworkXError(
f"Target name {SOURCE_COLUMN_NAME} is an edge attribute name"
)
nan = float("nan")
node_attr = {k: [d.get(k, nan) for _, d in nodelist] for k in all_attrs}
node_attr[ID_COLUMN_NAME] = node_ids
nodes_table = pa.Table.from_pydict(mapping=node_attr)
return nodes_table
insert_table_data_into_network_graph(network_data, edges_table, edges_column_map=None, nodes_table=None, nodes_column_map=None, chunk_size=1024)
¶
Source code in network_analysis/utils.py
def insert_table_data_into_network_graph(
network_data: "NetworkData",
edges_table: "pa.Table",
edges_column_map: typing.Optional[typing.Mapping[str, str]] = None,
nodes_table: typing.Optional["pa.Table"] = None,
nodes_column_map: typing.Optional[typing.Mapping[str, str]] = None,
chunk_size: int = DEFAULT_NETWORK_DATA_CHUNK_SIZE,
):
added_node_ids = set()
if edges_column_map is None:
edges_column_map = {}
if nodes_column_map is None:
nodes_column_map = {}
if nodes_table is not None:
for batch in nodes_table.to_batches(chunk_size):
batch_dict = batch.to_pydict()
if nodes_column_map:
for k, v in nodes_column_map.items():
if k in batch_dict.keys():
if k == ID_COLUMN_NAME and v == LABEL_COLUMN_NAME:
_data = batch_dict.get(k)
else:
_data = batch_dict.pop(k)
if v in batch_dict.keys():
raise Exception(
"Duplicate nodes column name after mapping: {v}"
)
batch_dict[v] = _data
if LABEL_COLUMN_NAME not in batch_dict.keys():
batch_dict[LABEL_COLUMN_NAME] = (
str(x) for x in batch_dict[ID_COLUMN_NAME]
)
ids = batch_dict[ID_COLUMN_NAME]
data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]
network_data.insert_nodes(*data)
added_node_ids.update(ids)
for batch in edges_table.to_batches(chunk_size):
batch_dict = batch.to_pydict()
for k, v in edges_column_map.items():
if k in batch_dict.keys():
_data = batch_dict.pop(k)
if v in batch_dict.keys():
raise Exception("Duplicate edges column name after mapping: {v}")
batch_dict[v] = _data
data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]
all_node_ids = network_data.insert_edges(
*data,
existing_node_ids=added_node_ids,
)
added_node_ids.update(all_node_ids)