Skip to content

network_analysis

Top-level package for kiara_plugin.network_analysis.

KIARA_METADATA

find_data_types: Union[Type, Tuple, Callable]

find_model_classes: Union[Type, Tuple, Callable]

find_modules: Union[Type, Tuple, Callable]

find_pipelines: Union[Type, Tuple, Callable]

get_version()

Source code in network_analysis/__init__.py
def get_version():
    from pkg_resources import DistributionNotFound, get_distribution

    try:
        # Change here if project is renamed and does not equal the package name
        dist_name = __name__
        __version__ = get_distribution(dist_name).version
    except DistributionNotFound:

        try:
            version_file = os.path.join(os.path.dirname(__file__), "version.txt")

            if os.path.exists(version_file):
                with open(version_file, encoding="utf-8") as vf:
                    __version__ = vf.read()
            else:
                __version__ = "unknown"

        except (Exception):
            pass

        if __version__ is None:
            __version__ = "unknown"

    return __version__

Modules

data_types

This module contains the value type classes that are used in the kiara_plugin.network_analysis package.

Classes

NetworkDataType (DatabaseType)

Data that can be assembled into a graph.

Internally, this is backed by a sqlite database, using https://github.com/dpapathanasiou/simple-graph .

Source code in network_analysis/data_types.py
class NetworkDataType(DatabaseType):
    """Data that can be assembled into a graph.

    Internally, this is backed by a sqlite database, using https://github.com/dpapathanasiou/simple-graph .
    """

    _data_type_name = "network_data"

    @classmethod
    def python_class(cls) -> Type:
        return NetworkData

    def parse_python_obj(self, data: Any) -> NetworkData:

        if isinstance(data, str):
            # TODO: check path exists
            return NetworkData(db_file_path=data)

        return data

    def _validate(cls, value: Any) -> None:

        if isinstance(value, KiaraDatabase):
            value = NetworkData(db_file_path=value.db_file_path)

        if not isinstance(value, NetworkData):
            raise ValueError(
                f"Invalid type '{type(value)}': must be of 'NetworkData' (or a sub-class)."
            )

        network_data: NetworkData = value

        table_names = network_data.table_names
        for tn in ["edges", "nodes"]:
            if tn not in table_names:
                raise Exception(
                    f"Invalid 'network_data' value: database does not contain table '{tn}'"
                )

        table_names = network_data.table_names
        if "edges" not in table_names:
            raise Exception(
                "Invalid 'network_data' value: database does not contain table 'edges'"
            )
        if "nodes" not in table_names:
            raise Exception(
                "Invalid 'network_data' value: database does not contain table 'nodes'"
            )

        edges_columns = network_data.edges_schema.columns
        if SOURCE_COLUMN_NAME not in edges_columns.keys():
            raise Exception(
                f"Invalid 'network_data' value: 'edges' table does not contain a '{SOURCE_COLUMN_NAME}' column. Available columns: {', '.join(edges_columns.keys())}."
            )
        if TARGET_COLUMN_NAME not in edges_columns.keys():
            raise Exception(
                f"Invalid 'network_data' value: 'edges' table does not contain a '{TARGET_COLUMN_NAME}' column. Available columns: {', '.join(edges_columns.keys())}."
            )

        nodes_columns = network_data.nodes_schema.columns
        if ID_COLUMN_NAME not in nodes_columns.keys():
            raise Exception(
                f"Invalid 'network_data' value: 'nodes' table does not contain a '{ID_COLUMN_NAME}' column. Available columns: {', '.join(nodes_columns.keys())}."
            )
        if LABEL_COLUMN_NAME not in nodes_columns.keys():
            raise Exception(
                f"Invalid 'network_data' value: 'nodes' table does not contain a '{LABEL_COLUMN_NAME}' column. Available columns: {', '.join(nodes_columns.keys())}."
            )

    def render_as__terminal_renderable(
        self, value: Value, render_config: Mapping[str, Any]
    ) -> Any:

        max_rows = render_config.get(
            "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
        )
        max_row_height = render_config.get(
            "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
        )
        max_cell_length = render_config.get(
            "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
        )

        half_lines: Optional[int] = None
        if max_rows:
            half_lines = int(max_rows / 2)

        db: NetworkData = value.data

        result: List[Any] = [""]
        atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.EDGES)
        pretty = atw.pretty_print(
            rows_head=half_lines,
            rows_tail=half_lines,
            max_row_height=max_row_height,
            max_cell_length=max_cell_length,
        )
        result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.EDGES.value}[/i]")
        result.append(pretty)

        atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.NODES)
        pretty = atw.pretty_print(
            rows_head=half_lines,
            rows_tail=half_lines,
            max_row_height=max_row_height,
            max_cell_length=max_cell_length,
        )
        result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.NODES.value}[/i]")
        result.append(pretty)
        return Group(*result)
Methods
parse_python_obj(self, data)

Parse a value into a supported python type.

This exists to make it easier to do trivial conversions (e.g. from a date string to a datetime object). If you choose to overwrite this method, make 100% sure that you don't change the meaning of the value, and try to avoid adding or removing information from the data (e.g. by changing the resolution of a date).

Parameters:

Name Type Description Default
v

the value

required

Returns:

Type Description
NetworkData

'None', if no parsing was done and the original value should be used, otherwise return the parsed Python object

Source code in network_analysis/data_types.py
def parse_python_obj(self, data: Any) -> NetworkData:

    if isinstance(data, str):
        # TODO: check path exists
        return NetworkData(db_file_path=data)

    return data
python_class() classmethod
Source code in network_analysis/data_types.py
@classmethod
def python_class(cls) -> Type:
    return NetworkData
render_as__terminal_renderable(self, value, render_config)
Source code in network_analysis/data_types.py
def render_as__terminal_renderable(
    self, value: Value, render_config: Mapping[str, Any]
) -> Any:

    max_rows = render_config.get(
        "max_no_rows", DEFAULT_PRETTY_PRINT_CONFIG["max_no_rows"]
    )
    max_row_height = render_config.get(
        "max_row_height", DEFAULT_PRETTY_PRINT_CONFIG["max_row_height"]
    )
    max_cell_length = render_config.get(
        "max_cell_length", DEFAULT_PRETTY_PRINT_CONFIG["max_cell_length"]
    )

    half_lines: Optional[int] = None
    if max_rows:
        half_lines = int(max_rows / 2)

    db: NetworkData = value.data

    result: List[Any] = [""]
    atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.EDGES)
    pretty = atw.pretty_print(
        rows_head=half_lines,
        rows_tail=half_lines,
        max_row_height=max_row_height,
        max_cell_length=max_cell_length,
    )
    result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.EDGES.value}[/i]")
    result.append(pretty)

    atw = NetworkDataTabularWrap(db=db, table_type=NetworkDataTableType.NODES)
    pretty = atw.pretty_print(
        rows_head=half_lines,
        rows_tail=half_lines,
        max_row_height=max_row_height,
        max_cell_length=max_cell_length,
    )
    result.append(f"[b]Table[/b]: [i]{NetworkDataTableType.NODES.value}[/i]")
    result.append(pretty)
    return Group(*result)

defaults

Attributes

DEFAULT_NETWORK_DATA_CHUNK_SIZE
ID_COLUMN_NAME
KIARA_PLUGIN_NETWORK_BASE_FOLDER

Marker to indicate the base folder for the kiara network module package.

KIARA_PLUGIN_NETWORK_RESOURCES_FOLDER

Default resources folder for this package.

LABEL_COLUMN_NAME
SOURCE_COLUMN_NAME
TARGET_COLUMN_NAME
TEMPLATES_FOLDER

Classes

NetworkDataTableType (Enum)

An enumeration.

Source code in network_analysis/defaults.py
class NetworkDataTableType(Enum):

    EDGES = "edges"
    NODES = "nodes"
EDGES
NODES

models

This module contains the metadata (and other) models that are used in the kiara_plugin.network_analysis package.

Those models are convenience wrappers that make it easier for kiara to find, create, manage and version metadata -- but also other type of models -- that is attached to data, as well as kiara modules.

Metadata models must be a sub-class of [kiara.metadata.MetadataModel][]. Other models usually sub-class a pydantic BaseModel or implement custom base classes.

Classes

GraphType (Enum)

All possible graph types.

Source code in network_analysis/models.py
class GraphType(Enum):
    """All possible graph types."""

    UNDIRECTED = "undirected"
    DIRECTED = "directed"
    UNDIRECTED_MULTI = "undirected-multi"
    DIRECTED_MULTI = "directed-multi"
DIRECTED
DIRECTED_MULTI
UNDIRECTED
UNDIRECTED_MULTI
GraphTypesEnum (Enum)

An enumeration.

Source code in network_analysis/models.py
class GraphTypesEnum(Enum):

    undirected = "undirected"
    directed = "directed"
    multi_directed = "multi_directed"
    multi_undirected = "multi_undirected"
directed
multi_directed
multi_undirected
undirected
NetworkData (KiaraDatabase) pydantic-model

A helper class to access and query network datasets.

This class provides different ways to access the underlying network data, most notably via sql and as networkx Graph object.

Internally, network data is stored in a sqlite database with the edges stored in a table called 'edges' and the nodes, well, in a table aptly called 'nodes'.

Source code in network_analysis/models.py
class NetworkData(KiaraDatabase):
    """A helper class to access and query network datasets.

    This class provides different ways to access the underlying network data, most notably via sql and as networkx Graph object.

    Internally, network data is stored in a sqlite database with the edges stored in a table called 'edges' and the nodes, well,
    in a table aptly called 'nodes'.

    """

    _kiara_model_id = "instance.network_data"

    @classmethod
    def create_from_networkx_graph(cls, graph: "nx.Graph") -> "NetworkData":
        """Create a `NetworkData` instance from a networkx Graph object."""

        edges_table = extract_edges_as_table(graph)
        edges_schema = create_sqlite_schema_data_from_arrow_table(edges_table)

        nodes_table = extract_nodes_as_table(graph)
        nodes_schema = create_sqlite_schema_data_from_arrow_table(nodes_table)

        network_data = NetworkData.create_in_temp_dir(
            edges_schema=edges_schema, nodes_schema=nodes_schema, keep_unlocked=True
        )
        insert_table_data_into_network_graph(
            network_data=network_data,
            edges_table=edges_table,
            nodes_table=nodes_table,
            chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
        )
        network_data._lock_db()

        return network_data

    @classmethod
    def create_in_temp_dir(
        cls,
        edges_schema: Union[None, SqliteTableSchema, Mapping] = None,
        nodes_schema: Union[None, SqliteTableSchema, Mapping] = None,
        keep_unlocked: bool = False,
    ):

        temp_f = tempfile.mkdtemp()
        db_path = os.path.join(temp_f, "network_data.sqlite")

        def cleanup():
            shutil.rmtree(db_path, ignore_errors=True)

        atexit.register(cleanup)

        db = cls(
            db_file_path=db_path, edges_schema=edges_schema, nodes_schema=nodes_schema
        )
        db.create_if_not_exists()

        db._unlock_db()
        engine = db.get_sqlalchemy_engine()
        db.edges_schema.create_table(
            table_name=NetworkDataTableType.EDGES.value, engine=engine
        )
        db.nodes_schema.create_table(
            table_name=NetworkDataTableType.NODES.value, engine=engine
        )
        if not keep_unlocked:
            db._lock_db()

        return db

    edges_schema: SqliteTableSchema = Field(
        description="The schema information for the edges table."
    )
    nodes_schema: SqliteTableSchema = Field(
        description="The schema information for the nodes table."
    )

    @root_validator(pre=True)
    def pre_validate(cls, values):

        _edges_schema = values.get("edges_schema", None)
        _nodes_schema = values.get("nodes_schema", None)
        if _edges_schema is None:

            suggested_id_type = "TEXT"
            if _nodes_schema is not None:
                if isinstance(_nodes_schema, Mapping):
                    suggested_id_type = _nodes_schema.get(ID_COLUMN_NAME, "TEXT")
                elif isinstance(_nodes_schema, SqliteTableSchema):
                    suggested_id_type = _nodes_schema.columns.get(
                        ID_COLUMN_NAME, "TEXT"
                    )

            edges_schema = SqliteTableSchema.construct(
                columns={
                    SOURCE_COLUMN_NAME: suggested_id_type,
                    TARGET_COLUMN_NAME: suggested_id_type,
                }
            )
        else:
            if isinstance(_edges_schema, Mapping):
                edges_schema = SqliteTableSchema(**_edges_schema)
            elif not isinstance(_edges_schema, SqliteTableSchema):
                raise ValueError(
                    f"Invalid data type for edges schema: {type(_edges_schema)}"
                )
            else:
                edges_schema = _edges_schema

        if (
            edges_schema.columns[SOURCE_COLUMN_NAME]
            != edges_schema.columns[TARGET_COLUMN_NAME]
        ):
            raise ValueError(
                f"Invalid edges schema, source and edges columns have different type: {edges_schema[SOURCE_COLUMN_NAME]} != {edges_schema[TARGET_COLUMN_NAME]}"
            )

        if _nodes_schema is None:

            _nodes_schema = SqliteTableSchema.construct(
                columns={
                    ID_COLUMN_NAME: edges_schema.columns[SOURCE_COLUMN_NAME],
                    LABEL_COLUMN_NAME: "TEXT",
                }
            )

        if isinstance(_nodes_schema, Mapping):
            nodes_schema = SqliteTableSchema(**_nodes_schema)
        elif isinstance(_nodes_schema, SqliteTableSchema):
            nodes_schema = _nodes_schema
        else:
            raise ValueError(
                f"Invalid data type for nodes schema: {type(_edges_schema)}"
            )

        if ID_COLUMN_NAME not in nodes_schema.columns.keys():
            raise ValueError(
                f"Invalid nodes schema: missing '{ID_COLUMN_NAME}' column."
            )

        if LABEL_COLUMN_NAME not in nodes_schema.columns.keys():
            nodes_schema.columns[LABEL_COLUMN_NAME] = "TEXT"
        else:
            if nodes_schema.columns[LABEL_COLUMN_NAME] != "TEXT":
                raise ValueError(
                    f"Invalid nodes schema, '{LABEL_COLUMN_NAME}' column must be of type 'TEXT', not '{nodes_schema.columns[LABEL_COLUMN_NAME]}'."
                )

        if (
            nodes_schema.columns[ID_COLUMN_NAME]
            != edges_schema.columns[SOURCE_COLUMN_NAME]
        ):
            raise ValueError(
                f"Invalid nodes schema, id column has different type to edges source/target columns: {nodes_schema.columns[ID_COLUMN_NAME]} != {edges_schema.columns[SOURCE_COLUMN_NAME]}"
            )

        values["edges_schema"] = edges_schema
        values["nodes_schema"] = nodes_schema

        return values

    _nodes_table_obj: Optional[Table] = PrivateAttr(default=None)
    _edges_table_obj: Optional[Table] = PrivateAttr(default=None)

    _nx_graph = PrivateAttr(default={})

    def _invalidate_other(self):

        self._nodes_table_obj = None
        self._edges_table_obj = None

    def get_sqlalchemy_nodes_table(self) -> Table:
        """Return the sqlalchemy nodes table instance for this network datab."""

        if self._nodes_table_obj is not None:
            return self._nodes_table_obj

        self._nodes_table_obj = Table(
            NetworkDataTableType.NODES.value,
            self.get_sqlalchemy_metadata(),
            autoload_with=self.get_sqlalchemy_engine(),
        )
        return self._nodes_table_obj

    def get_sqlalchemy_edges_table(self) -> Table:
        """Return the sqlalchemy edges table instance for this network datab."""

        if self._edges_table_obj is not None:
            return self._edges_table_obj

        self._edges_table_obj = Table(
            NetworkDataTableType.EDGES.value,
            self.get_sqlalchemy_metadata(),
            autoload_with=self.get_sqlalchemy_engine(),
        )
        return self._edges_table_obj

    def insert_nodes(self, *nodes: Mapping[str, Any]):
        """Add nodes to a network data item.

        Arguments:
            nodes: a list of dicts with the nodes
        """

        engine = self.get_sqlalchemy_engine()
        nodes_table = self.get_sqlalchemy_nodes_table()

        with engine.connect() as conn:
            with conn.begin():
                conn.execute(nodes_table.insert(), nodes)

    def insert_edges(
        self,
        *edges: Mapping[str, Any],
        existing_node_ids: Iterable[int] = None,
    ) -> Set[int]:
        """Add edges to a network data item.

        All the edges need to have their node-ids registered already.

        Arguments:
            edges: a list of dicts with the edges
            existing_node_ids: a set of ids that can be assumed to already exist, this is mainly for performance reasons

        Returns:
            a unique set of all node ids contained in source and target columns
        """

        if existing_node_ids is None:
            # TODO: run query
            existing_node_ids = set()
        else:
            existing_node_ids = set(existing_node_ids)

        required_node_ids = set((edge[SOURCE_COLUMN_NAME] for edge in edges))
        required_node_ids.update(edge[TARGET_COLUMN_NAME] for edge in edges)

        node_ids = list(required_node_ids.difference(existing_node_ids))

        if node_ids:
            self.insert_nodes(
                *(
                    {ID_COLUMN_NAME: node_id, LABEL_COLUMN_NAME: str(node_id)}
                    for node_id in node_ids
                )
            )

        engine = self.get_sqlalchemy_engine()
        with engine.connect() as conn:
            with conn.begin():
                conn.execute(self.get_sqlalchemy_edges_table().insert(), edges)

        return required_node_ids

    def as_networkx_graph(self, graph_type: Type["nx.Graph"]) -> "nx.Graph":
        """Return the network data as a networkx graph object.

        Arguments:
            graph_type: the networkx Graph class to use
        """

        if graph_type in self._nx_graph.keys():
            return self._nx_graph[graph_type]

        graph = graph_type()

        engine = self.get_sqlalchemy_engine()
        nodes = self.get_sqlalchemy_nodes_table()
        edges = self.get_sqlalchemy_edges_table()

        with engine.connect() as conn:
            with conn.begin():
                result = conn.execute(nodes.select())
                for r in result:
                    row = dict(r)
                    node_id = row.pop(ID_COLUMN_NAME)
                    graph.add_node(node_id, **row)

                result = conn.execute(edges.select())
                for r in result:
                    row = dict(r)
                    source = row.pop(SOURCE_COLUMN_NAME)
                    target = row.pop(TARGET_COLUMN_NAME)
                    graph.add_edge(source, target, **row)

        self._nx_graph[graph_type] = graph
        return self._nx_graph[graph_type]
Attributes
edges_schema: SqliteTableSchema pydantic-field required

The schema information for the edges table.

nodes_schema: SqliteTableSchema pydantic-field required

The schema information for the nodes table.

Methods
as_networkx_graph(self, graph_type)

Return the network data as a networkx graph object.

Parameters:

Name Type Description Default
graph_type Type[nx.Graph]

the networkx Graph class to use

required
Source code in network_analysis/models.py
def as_networkx_graph(self, graph_type: Type["nx.Graph"]) -> "nx.Graph":
    """Return the network data as a networkx graph object.

    Arguments:
        graph_type: the networkx Graph class to use
    """

    if graph_type in self._nx_graph.keys():
        return self._nx_graph[graph_type]

    graph = graph_type()

    engine = self.get_sqlalchemy_engine()
    nodes = self.get_sqlalchemy_nodes_table()
    edges = self.get_sqlalchemy_edges_table()

    with engine.connect() as conn:
        with conn.begin():
            result = conn.execute(nodes.select())
            for r in result:
                row = dict(r)
                node_id = row.pop(ID_COLUMN_NAME)
                graph.add_node(node_id, **row)

            result = conn.execute(edges.select())
            for r in result:
                row = dict(r)
                source = row.pop(SOURCE_COLUMN_NAME)
                target = row.pop(TARGET_COLUMN_NAME)
                graph.add_edge(source, target, **row)

    self._nx_graph[graph_type] = graph
    return self._nx_graph[graph_type]
create_from_networkx_graph(graph) classmethod

Create a NetworkData instance from a networkx Graph object.

Source code in network_analysis/models.py
@classmethod
def create_from_networkx_graph(cls, graph: "nx.Graph") -> "NetworkData":
    """Create a `NetworkData` instance from a networkx Graph object."""

    edges_table = extract_edges_as_table(graph)
    edges_schema = create_sqlite_schema_data_from_arrow_table(edges_table)

    nodes_table = extract_nodes_as_table(graph)
    nodes_schema = create_sqlite_schema_data_from_arrow_table(nodes_table)

    network_data = NetworkData.create_in_temp_dir(
        edges_schema=edges_schema, nodes_schema=nodes_schema, keep_unlocked=True
    )
    insert_table_data_into_network_graph(
        network_data=network_data,
        edges_table=edges_table,
        nodes_table=nodes_table,
        chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
    )
    network_data._lock_db()

    return network_data
create_in_temp_dir(edges_schema=None, nodes_schema=None, keep_unlocked=False) classmethod
Source code in network_analysis/models.py
@classmethod
def create_in_temp_dir(
    cls,
    edges_schema: Union[None, SqliteTableSchema, Mapping] = None,
    nodes_schema: Union[None, SqliteTableSchema, Mapping] = None,
    keep_unlocked: bool = False,
):

    temp_f = tempfile.mkdtemp()
    db_path = os.path.join(temp_f, "network_data.sqlite")

    def cleanup():
        shutil.rmtree(db_path, ignore_errors=True)

    atexit.register(cleanup)

    db = cls(
        db_file_path=db_path, edges_schema=edges_schema, nodes_schema=nodes_schema
    )
    db.create_if_not_exists()

    db._unlock_db()
    engine = db.get_sqlalchemy_engine()
    db.edges_schema.create_table(
        table_name=NetworkDataTableType.EDGES.value, engine=engine
    )
    db.nodes_schema.create_table(
        table_name=NetworkDataTableType.NODES.value, engine=engine
    )
    if not keep_unlocked:
        db._lock_db()

    return db
get_sqlalchemy_edges_table(self)

Return the sqlalchemy edges table instance for this network datab.

Source code in network_analysis/models.py
def get_sqlalchemy_edges_table(self) -> Table:
    """Return the sqlalchemy edges table instance for this network datab."""

    if self._edges_table_obj is not None:
        return self._edges_table_obj

    self._edges_table_obj = Table(
        NetworkDataTableType.EDGES.value,
        self.get_sqlalchemy_metadata(),
        autoload_with=self.get_sqlalchemy_engine(),
    )
    return self._edges_table_obj
get_sqlalchemy_nodes_table(self)

Return the sqlalchemy nodes table instance for this network datab.

Source code in network_analysis/models.py
def get_sqlalchemy_nodes_table(self) -> Table:
    """Return the sqlalchemy nodes table instance for this network datab."""

    if self._nodes_table_obj is not None:
        return self._nodes_table_obj

    self._nodes_table_obj = Table(
        NetworkDataTableType.NODES.value,
        self.get_sqlalchemy_metadata(),
        autoload_with=self.get_sqlalchemy_engine(),
    )
    return self._nodes_table_obj
insert_edges(self, *edges, *, existing_node_ids=None)

Add edges to a network data item.

All the edges need to have their node-ids registered already.

Parameters:

Name Type Description Default
edges Mapping[str, Any]

a list of dicts with the edges

()
existing_node_ids Iterable[int]

a set of ids that can be assumed to already exist, this is mainly for performance reasons

None

Returns:

Type Description
Set[int]

a unique set of all node ids contained in source and target columns

Source code in network_analysis/models.py
def insert_edges(
    self,
    *edges: Mapping[str, Any],
    existing_node_ids: Iterable[int] = None,
) -> Set[int]:
    """Add edges to a network data item.

    All the edges need to have their node-ids registered already.

    Arguments:
        edges: a list of dicts with the edges
        existing_node_ids: a set of ids that can be assumed to already exist, this is mainly for performance reasons

    Returns:
        a unique set of all node ids contained in source and target columns
    """

    if existing_node_ids is None:
        # TODO: run query
        existing_node_ids = set()
    else:
        existing_node_ids = set(existing_node_ids)

    required_node_ids = set((edge[SOURCE_COLUMN_NAME] for edge in edges))
    required_node_ids.update(edge[TARGET_COLUMN_NAME] for edge in edges)

    node_ids = list(required_node_ids.difference(existing_node_ids))

    if node_ids:
        self.insert_nodes(
            *(
                {ID_COLUMN_NAME: node_id, LABEL_COLUMN_NAME: str(node_id)}
                for node_id in node_ids
            )
        )

    engine = self.get_sqlalchemy_engine()
    with engine.connect() as conn:
        with conn.begin():
            conn.execute(self.get_sqlalchemy_edges_table().insert(), edges)

    return required_node_ids
insert_nodes(self, *nodes)

Add nodes to a network data item.

Parameters:

Name Type Description Default
nodes Mapping[str, Any]

a list of dicts with the nodes

()
Source code in network_analysis/models.py
def insert_nodes(self, *nodes: Mapping[str, Any]):
    """Add nodes to a network data item.

    Arguments:
        nodes: a list of dicts with the nodes
    """

    engine = self.get_sqlalchemy_engine()
    nodes_table = self.get_sqlalchemy_nodes_table()

    with engine.connect() as conn:
        with conn.begin():
            conn.execute(nodes_table.insert(), nodes)
pre_validate(values) classmethod
Source code in network_analysis/models.py
@root_validator(pre=True)
def pre_validate(cls, values):

    _edges_schema = values.get("edges_schema", None)
    _nodes_schema = values.get("nodes_schema", None)
    if _edges_schema is None:

        suggested_id_type = "TEXT"
        if _nodes_schema is not None:
            if isinstance(_nodes_schema, Mapping):
                suggested_id_type = _nodes_schema.get(ID_COLUMN_NAME, "TEXT")
            elif isinstance(_nodes_schema, SqliteTableSchema):
                suggested_id_type = _nodes_schema.columns.get(
                    ID_COLUMN_NAME, "TEXT"
                )

        edges_schema = SqliteTableSchema.construct(
            columns={
                SOURCE_COLUMN_NAME: suggested_id_type,
                TARGET_COLUMN_NAME: suggested_id_type,
            }
        )
    else:
        if isinstance(_edges_schema, Mapping):
            edges_schema = SqliteTableSchema(**_edges_schema)
        elif not isinstance(_edges_schema, SqliteTableSchema):
            raise ValueError(
                f"Invalid data type for edges schema: {type(_edges_schema)}"
            )
        else:
            edges_schema = _edges_schema

    if (
        edges_schema.columns[SOURCE_COLUMN_NAME]
        != edges_schema.columns[TARGET_COLUMN_NAME]
    ):
        raise ValueError(
            f"Invalid edges schema, source and edges columns have different type: {edges_schema[SOURCE_COLUMN_NAME]} != {edges_schema[TARGET_COLUMN_NAME]}"
        )

    if _nodes_schema is None:

        _nodes_schema = SqliteTableSchema.construct(
            columns={
                ID_COLUMN_NAME: edges_schema.columns[SOURCE_COLUMN_NAME],
                LABEL_COLUMN_NAME: "TEXT",
            }
        )

    if isinstance(_nodes_schema, Mapping):
        nodes_schema = SqliteTableSchema(**_nodes_schema)
    elif isinstance(_nodes_schema, SqliteTableSchema):
        nodes_schema = _nodes_schema
    else:
        raise ValueError(
            f"Invalid data type for nodes schema: {type(_edges_schema)}"
        )

    if ID_COLUMN_NAME not in nodes_schema.columns.keys():
        raise ValueError(
            f"Invalid nodes schema: missing '{ID_COLUMN_NAME}' column."
        )

    if LABEL_COLUMN_NAME not in nodes_schema.columns.keys():
        nodes_schema.columns[LABEL_COLUMN_NAME] = "TEXT"
    else:
        if nodes_schema.columns[LABEL_COLUMN_NAME] != "TEXT":
            raise ValueError(
                f"Invalid nodes schema, '{LABEL_COLUMN_NAME}' column must be of type 'TEXT', not '{nodes_schema.columns[LABEL_COLUMN_NAME]}'."
            )

    if (
        nodes_schema.columns[ID_COLUMN_NAME]
        != edges_schema.columns[SOURCE_COLUMN_NAME]
    ):
        raise ValueError(
            f"Invalid nodes schema, id column has different type to edges source/target columns: {nodes_schema.columns[ID_COLUMN_NAME]} != {edges_schema.columns[SOURCE_COLUMN_NAME]}"
        )

    values["edges_schema"] = edges_schema
    values["nodes_schema"] = nodes_schema

    return values
NetworkGraphProperties (ValueMetadata) pydantic-model

File stats.

Source code in network_analysis/models.py
class NetworkGraphProperties(ValueMetadata):
    """File stats."""

    _metadata_key = "graph_properties"

    number_of_nodes: int = Field(description="Number of nodes in the network graph.")
    properties_by_graph_type: List[PropertiesByGraphType] = Field(
        description="Properties of the network data, by graph type."
    )

    @classmethod
    def retrieve_supported_data_types(cls) -> Iterable[str]:
        return ["network_data"]

    @classmethod
    def create_value_metadata(cls, value: Value) -> "NetworkGraphProperties":

        from sqlalchemy import text

        network_data: NetworkData = value.data

        with network_data.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text("SELECT count(*) from nodes"))
            num_rows = result.fetchone()[0]
            result = con.execute(text("SELECT count(*) from edges"))
            num_rows_eges = result.fetchone()[0]
            result = con.execute(
                text("SELECT COUNT(*) FROM (SELECT DISTINCT source, target FROM edges)")
            )
            num_edges_directed = result.fetchone()[0]
            query = "SELECT COUNT(*) FROM edges WHERE rowid in (SELECT DISTINCT MIN(rowid) FROM (SELECT rowid, source, target from edges UNION ALL SELECT rowid, target, source from edges) GROUP BY source, target)"

            result = con.execute(text(query))
            num_edges_undirected = result.fetchone()[0]

        directed = PropertiesByGraphType(
            graph_type=GraphType.DIRECTED, number_of_edges=num_edges_directed
        )
        undirected = PropertiesByGraphType(
            graph_type=GraphType.UNDIRECTED, number_of_edges=num_edges_undirected
        )
        directed_multi = PropertiesByGraphType(
            graph_type=GraphType.DIRECTED_MULTI, number_of_edges=num_rows_eges
        )
        undirected_multi = PropertiesByGraphType(
            graph_type=GraphType.UNDIRECTED_MULTI, number_of_edges=num_rows_eges
        )

        return cls(
            number_of_nodes=num_rows,
            properties_by_graph_type=[
                directed,
                undirected,
                directed_multi,
                undirected_multi,
            ],
        )
Attributes
number_of_nodes: int pydantic-field required

Number of nodes in the network graph.

properties_by_graph_type: List[kiara_plugin.network_analysis.models.PropertiesByGraphType] pydantic-field required

Properties of the network data, by graph type.

create_value_metadata(value) classmethod
Source code in network_analysis/models.py
@classmethod
def create_value_metadata(cls, value: Value) -> "NetworkGraphProperties":

    from sqlalchemy import text

    network_data: NetworkData = value.data

    with network_data.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text("SELECT count(*) from nodes"))
        num_rows = result.fetchone()[0]
        result = con.execute(text("SELECT count(*) from edges"))
        num_rows_eges = result.fetchone()[0]
        result = con.execute(
            text("SELECT COUNT(*) FROM (SELECT DISTINCT source, target FROM edges)")
        )
        num_edges_directed = result.fetchone()[0]
        query = "SELECT COUNT(*) FROM edges WHERE rowid in (SELECT DISTINCT MIN(rowid) FROM (SELECT rowid, source, target from edges UNION ALL SELECT rowid, target, source from edges) GROUP BY source, target)"

        result = con.execute(text(query))
        num_edges_undirected = result.fetchone()[0]

    directed = PropertiesByGraphType(
        graph_type=GraphType.DIRECTED, number_of_edges=num_edges_directed
    )
    undirected = PropertiesByGraphType(
        graph_type=GraphType.UNDIRECTED, number_of_edges=num_edges_undirected
    )
    directed_multi = PropertiesByGraphType(
        graph_type=GraphType.DIRECTED_MULTI, number_of_edges=num_rows_eges
    )
    undirected_multi = PropertiesByGraphType(
        graph_type=GraphType.UNDIRECTED_MULTI, number_of_edges=num_rows_eges
    )

    return cls(
        number_of_nodes=num_rows,
        properties_by_graph_type=[
            directed,
            undirected,
            directed_multi,
            undirected_multi,
        ],
    )
retrieve_supported_data_types() classmethod
Source code in network_analysis/models.py
@classmethod
def retrieve_supported_data_types(cls) -> Iterable[str]:
    return ["network_data"]
PropertiesByGraphType (BaseModel) pydantic-model

Properties of graph data, if interpreted as a specific graph type.

Source code in network_analysis/models.py
class PropertiesByGraphType(BaseModel):
    """Properties of graph data, if interpreted as a specific graph type."""

    graph_type: GraphType = Field(description="The graph type name.")
    number_of_edges: int = Field(description="The number of edges.")
Attributes
graph_type: GraphType pydantic-field required

The graph type name.

number_of_edges: int pydantic-field required

The number of edges.

modules special

Classes

CreateGraphFromTablesModule (KiaraModule)

Create a graph object from one or two tables.

Source code in network_analysis/modules/__init__.py
class CreateGraphFromTablesModule(KiaraModule):
    """Create a graph object from one or two tables."""

    _module_type_name = "create.network_data.from.tables"

    def create_inputs_schema(
        self,
    ) -> ValueSetSchema:

        inputs: Mapping[str, Any] = {
            "edges": {
                "type": "table",
                "doc": "A table that contains the edges data.",
                "optional": False,
            },
            "source_column_name": {
                "type": "string",
                "doc": "The name of the source column name in the edges table.",
                "default": SOURCE_COLUMN_NAME,
            },
            "target_column_name": {
                "type": "string",
                "doc": "The name of the target column name in the edges table.",
                "default": TARGET_COLUMN_NAME,
            },
            "edges_column_map": {
                "type": "dict",
                "doc": "An optional map of original column name to desired.",
                "optional": True,
            },
            "nodes": {
                "type": "table",
                "doc": "A table that contains the nodes data.",
                "optional": True,
            },
            "id_column_name": {
                "type": "string",
                "doc": "The name (before any potential column mapping) of the node-table column that contains the node identifier (used in the edges table).",
                "default": ID_COLUMN_NAME,
            },
            "label_column_name": {
                "type": "string",
                "doc": "The name of a column that contains the node label (before any potential column name mapping). If not specified, the value of the id value will be used as label.",
                "optional": True,
            },
            "nodes_column_map": {
                "type": "dict",
                "doc": "An optional map of original column name to desired.",
                "optional": True,
            },
        }
        return inputs

    def create_outputs_schema(
        self,
    ) -> ValueSetSchema:

        outputs: Mapping[str, Any] = {
            "network_data": {"type": "network_data", "doc": "The network/graph data."}
        }
        return outputs

    def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

        pass

        edges = inputs.get_value_obj("edges")
        edges_table: KiaraTable = edges.data
        edges_source_column_name = inputs.get_value_data("source_column_name")
        edges_target_column_name = inputs.get_value_data("target_column_name")

        edges_columns = edges_table.column_names
        if edges_source_column_name not in edges_columns:
            raise KiaraProcessingException(
                f"Edges table does not contain source column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
            )
        if edges_target_column_name not in edges_columns:
            raise KiaraProcessingException(
                f"Edges table does not contain target column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
            )

        nodes = inputs.get_value_obj("nodes")

        id_column_name = inputs.get_value_data("id_column_name")
        label_column_name = inputs.get_value_data("label_column_name")
        nodes_column_map: Dict[str, str] = inputs.get_value_data("nodes_column_map")
        if nodes_column_map is None:
            nodes_column_map = {}

        edges_column_map: Dict[str, str] = inputs.get_value_data("edges_column_map")
        if edges_column_map is None:
            edges_column_map = {}
        if edges_source_column_name in edges_column_map.keys():
            raise KiaraProcessingException(
                "The value of the 'source_column_name' argument is not allowed in the edges column map."
            )
        if edges_target_column_name in edges_column_map.keys():
            raise KiaraProcessingException(
                "The value of the 'source_column_name' argument is not allowed in the edges column map."
            )

        edges_column_map[edges_source_column_name] = SOURCE_COLUMN_NAME
        edges_column_map[edges_target_column_name] = TARGET_COLUMN_NAME

        edges_data_schema = create_sqlite_schema_data_from_arrow_table(
            table=edges_table.arrow_table,
            index_columns=[SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME],
            column_map=edges_column_map,
        )

        nodes_table: Optional[KiaraTable] = None
        if nodes.is_set:
            if (
                id_column_name in nodes_column_map.keys()
                and nodes_column_map[id_column_name] != ID_COLUMN_NAME
            ):
                raise KiaraProcessingException(
                    "The value of the 'id_column_name' argument is not allowed in the node column map."
                )

            nodes_column_map[id_column_name] = ID_COLUMN_NAME

            nodes_table = nodes.data

            extra_schema = []
            if label_column_name is None:
                label_column_name = LABEL_COLUMN_NAME

            for cn in nodes_table.column_names:
                if cn.lower() == LABEL_COLUMN_NAME.lower():
                    label_column_name = cn
                    break

            if LABEL_COLUMN_NAME in nodes_table.column_names:
                if label_column_name != LABEL_COLUMN_NAME:
                    raise KiaraProcessingException(
                        f"Can't create database for graph data: original data contains column called 'label', which is a protected column name. If this column can be used as a label, remove your '{label_column_name}' input value for the 'label_column_name' input and re-run this module."
                    )

            if label_column_name in nodes_table.column_names:
                if label_column_name in nodes_column_map.keys():
                    raise KiaraProcessingException(
                        "The value of the 'label_column_name' argument is not allowed in the node column map."
                    )
            else:
                extra_schema.append("    label    TEXT")

            nodes_column_map[label_column_name] = LABEL_COLUMN_NAME

            nullable_columns = list(nodes_table.column_names)
            if ID_COLUMN_NAME in nullable_columns:
                nullable_columns.remove(ID_COLUMN_NAME)

            nodes_data_schema = create_sqlite_schema_data_from_arrow_table(
                table=nodes_table.arrow_table,
                index_columns=[ID_COLUMN_NAME],
                column_map=nodes_column_map,
                nullable_columns=[],
                unique_columns=[ID_COLUMN_NAME],
            )

        else:
            nodes_data_schema = None

        network_data = NetworkData.create_in_temp_dir(
            edges_schema=edges_data_schema,
            nodes_schema=nodes_data_schema,
            keep_unlocked=True,
        )

        insert_table_data_into_network_graph(
            network_data=network_data,
            edges_table=edges_table.arrow_table,
            edges_column_map=edges_column_map,
            nodes_table=None if nodes_table is None else nodes_table.arrow_table,
            nodes_column_map=nodes_column_map,
            chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
        )

        network_data._lock_db()

        outputs.set_value("network_data", network_data)
Methods
create_inputs_schema(self)

Return the schema for this types' inputs.

Source code in network_analysis/modules/__init__.py
def create_inputs_schema(
    self,
) -> ValueSetSchema:

    inputs: Mapping[str, Any] = {
        "edges": {
            "type": "table",
            "doc": "A table that contains the edges data.",
            "optional": False,
        },
        "source_column_name": {
            "type": "string",
            "doc": "The name of the source column name in the edges table.",
            "default": SOURCE_COLUMN_NAME,
        },
        "target_column_name": {
            "type": "string",
            "doc": "The name of the target column name in the edges table.",
            "default": TARGET_COLUMN_NAME,
        },
        "edges_column_map": {
            "type": "dict",
            "doc": "An optional map of original column name to desired.",
            "optional": True,
        },
        "nodes": {
            "type": "table",
            "doc": "A table that contains the nodes data.",
            "optional": True,
        },
        "id_column_name": {
            "type": "string",
            "doc": "The name (before any potential column mapping) of the node-table column that contains the node identifier (used in the edges table).",
            "default": ID_COLUMN_NAME,
        },
        "label_column_name": {
            "type": "string",
            "doc": "The name of a column that contains the node label (before any potential column name mapping). If not specified, the value of the id value will be used as label.",
            "optional": True,
        },
        "nodes_column_map": {
            "type": "dict",
            "doc": "An optional map of original column name to desired.",
            "optional": True,
        },
    }
    return inputs
create_outputs_schema(self)

Return the schema for this types' outputs.

Source code in network_analysis/modules/__init__.py
def create_outputs_schema(
    self,
) -> ValueSetSchema:

    outputs: Mapping[str, Any] = {
        "network_data": {"type": "network_data", "doc": "The network/graph data."}
    }
    return outputs
process(self, inputs, outputs)
Source code in network_analysis/modules/__init__.py
def process(self, inputs: ValueMap, outputs: ValueMap) -> None:

    pass

    edges = inputs.get_value_obj("edges")
    edges_table: KiaraTable = edges.data
    edges_source_column_name = inputs.get_value_data("source_column_name")
    edges_target_column_name = inputs.get_value_data("target_column_name")

    edges_columns = edges_table.column_names
    if edges_source_column_name not in edges_columns:
        raise KiaraProcessingException(
            f"Edges table does not contain source column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
        )
    if edges_target_column_name not in edges_columns:
        raise KiaraProcessingException(
            f"Edges table does not contain target column '{edges_source_column_name}'. Choose one of: {', '.join(edges_columns)}."
        )

    nodes = inputs.get_value_obj("nodes")

    id_column_name = inputs.get_value_data("id_column_name")
    label_column_name = inputs.get_value_data("label_column_name")
    nodes_column_map: Dict[str, str] = inputs.get_value_data("nodes_column_map")
    if nodes_column_map is None:
        nodes_column_map = {}

    edges_column_map: Dict[str, str] = inputs.get_value_data("edges_column_map")
    if edges_column_map is None:
        edges_column_map = {}
    if edges_source_column_name in edges_column_map.keys():
        raise KiaraProcessingException(
            "The value of the 'source_column_name' argument is not allowed in the edges column map."
        )
    if edges_target_column_name in edges_column_map.keys():
        raise KiaraProcessingException(
            "The value of the 'source_column_name' argument is not allowed in the edges column map."
        )

    edges_column_map[edges_source_column_name] = SOURCE_COLUMN_NAME
    edges_column_map[edges_target_column_name] = TARGET_COLUMN_NAME

    edges_data_schema = create_sqlite_schema_data_from_arrow_table(
        table=edges_table.arrow_table,
        index_columns=[SOURCE_COLUMN_NAME, TARGET_COLUMN_NAME],
        column_map=edges_column_map,
    )

    nodes_table: Optional[KiaraTable] = None
    if nodes.is_set:
        if (
            id_column_name in nodes_column_map.keys()
            and nodes_column_map[id_column_name] != ID_COLUMN_NAME
        ):
            raise KiaraProcessingException(
                "The value of the 'id_column_name' argument is not allowed in the node column map."
            )

        nodes_column_map[id_column_name] = ID_COLUMN_NAME

        nodes_table = nodes.data

        extra_schema = []
        if label_column_name is None:
            label_column_name = LABEL_COLUMN_NAME

        for cn in nodes_table.column_names:
            if cn.lower() == LABEL_COLUMN_NAME.lower():
                label_column_name = cn
                break

        if LABEL_COLUMN_NAME in nodes_table.column_names:
            if label_column_name != LABEL_COLUMN_NAME:
                raise KiaraProcessingException(
                    f"Can't create database for graph data: original data contains column called 'label', which is a protected column name. If this column can be used as a label, remove your '{label_column_name}' input value for the 'label_column_name' input and re-run this module."
                )

        if label_column_name in nodes_table.column_names:
            if label_column_name in nodes_column_map.keys():
                raise KiaraProcessingException(
                    "The value of the 'label_column_name' argument is not allowed in the node column map."
                )
        else:
            extra_schema.append("    label    TEXT")

        nodes_column_map[label_column_name] = LABEL_COLUMN_NAME

        nullable_columns = list(nodes_table.column_names)
        if ID_COLUMN_NAME in nullable_columns:
            nullable_columns.remove(ID_COLUMN_NAME)

        nodes_data_schema = create_sqlite_schema_data_from_arrow_table(
            table=nodes_table.arrow_table,
            index_columns=[ID_COLUMN_NAME],
            column_map=nodes_column_map,
            nullable_columns=[],
            unique_columns=[ID_COLUMN_NAME],
        )

    else:
        nodes_data_schema = None

    network_data = NetworkData.create_in_temp_dir(
        edges_schema=edges_data_schema,
        nodes_schema=nodes_data_schema,
        keep_unlocked=True,
    )

    insert_table_data_into_network_graph(
        network_data=network_data,
        edges_table=edges_table.arrow_table,
        edges_column_map=edges_column_map,
        nodes_table=None if nodes_table is None else nodes_table.arrow_table,
        nodes_column_map=nodes_column_map,
        chunk_size=DEFAULT_NETWORK_DATA_CHUNK_SIZE,
    )

    network_data._lock_db()

    outputs.set_value("network_data", network_data)
ExportNetworkDataModule (DataExportModule)

Export network data items.

Source code in network_analysis/modules/__init__.py
class ExportNetworkDataModule(DataExportModule):
    """Export network data items."""

    _module_type_name = "export.network_data"

    def export_as__graphml_file(self, value: NetworkData, base_path: str, name: str):

        import networkx as nx

        target_path = os.path.join(base_path, f"{name}.graphml")

        # TODO: can't just assume digraph
        graph: nx.Graph = value.as_networkx_graph(nx.DiGraph)
        nx.write_graphml(graph, target_path)

        return {"files": target_path}

    def export__network_data__as__sqlite_db(
        self, value: NetworkData, base_path: str, name: str
    ):

        target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
        shutil.copy2(value.db_file_path, target_path)

        return {"files": target_path}

    def export__network_data__as__sql_dump(
        self, value: NetworkData, base_path: str, name: str
    ):

        import sqlite_utils

        db = sqlite_utils.Database(value.db_file_path)
        target_path = Path(os.path.join(base_path, f"{name}.sql"))
        with target_path.open("wt") as f:
            for line in db.conn.iterdump():
                f.write(line + "\n")

        return {"files": target_path}

    def export__network_data__as__csv_files(
        self, value: NetworkData, base_path: str, name: str
    ):

        import sqlite3

        files = []

        for table_name in value.table_names:
            target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
            os.makedirs(os.path.dirname(target_path), exist_ok=True)

            # copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
            con = sqlite3.connect(value.db_file_path)
            outfile = open(target_path, "wt")
            outcsv = csv.writer(outfile)

            cursor = con.execute(f"select * from {table_name}")
            # dump column titles (optional)
            outcsv.writerow(x[0] for x in cursor.description)
            # dump rows
            outcsv.writerows(cursor.fetchall())

            outfile.close()
            files.append(target_path)

        return {"files": files}
export__network_data__as__csv_files(self, value, base_path, name)
Source code in network_analysis/modules/__init__.py
def export__network_data__as__csv_files(
    self, value: NetworkData, base_path: str, name: str
):

    import sqlite3

    files = []

    for table_name in value.table_names:
        target_path = os.path.join(base_path, f"{name}__{table_name}.csv")
        os.makedirs(os.path.dirname(target_path), exist_ok=True)

        # copied from: https://stackoverflow.com/questions/2952366/dump-csv-from-sqlalchemy
        con = sqlite3.connect(value.db_file_path)
        outfile = open(target_path, "wt")
        outcsv = csv.writer(outfile)

        cursor = con.execute(f"select * from {table_name}")
        # dump column titles (optional)
        outcsv.writerow(x[0] for x in cursor.description)
        # dump rows
        outcsv.writerows(cursor.fetchall())

        outfile.close()
        files.append(target_path)

    return {"files": files}
export__network_data__as__sql_dump(self, value, base_path, name)
Source code in network_analysis/modules/__init__.py
def export__network_data__as__sql_dump(
    self, value: NetworkData, base_path: str, name: str
):

    import sqlite_utils

    db = sqlite_utils.Database(value.db_file_path)
    target_path = Path(os.path.join(base_path, f"{name}.sql"))
    with target_path.open("wt") as f:
        for line in db.conn.iterdump():
            f.write(line + "\n")

    return {"files": target_path}
export__network_data__as__sqlite_db(self, value, base_path, name)
Source code in network_analysis/modules/__init__.py
def export__network_data__as__sqlite_db(
    self, value: NetworkData, base_path: str, name: str
):

    target_path = os.path.abspath(os.path.join(base_path, f"{name}.sqlite"))
    shutil.copy2(value.db_file_path, target_path)

    return {"files": target_path}
export_as__graphml_file(self, value, base_path, name)
Source code in network_analysis/modules/__init__.py
def export_as__graphml_file(self, value: NetworkData, base_path: str, name: str):

    import networkx as nx

    target_path = os.path.join(base_path, f"{name}.graphml")

    # TODO: can't just assume digraph
    graph: nx.Graph = value.as_networkx_graph(nx.DiGraph)
    nx.write_graphml(graph, target_path)

    return {"files": target_path}

pipelines special

Default (empty) module that is used as a base path for pipelines contained in this package.

utils

NetworkDataTabularWrap (TabularWrap)
Source code in network_analysis/utils.py
class NetworkDataTabularWrap(TabularWrap):
    def __init__(self, db: "NetworkData", table_type: NetworkDataTableType):
        self._db: NetworkData = db
        self._table_type: NetworkDataTableType = table_type
        super().__init__()

    @property
    def _table_name(self):
        return self._table_type.value

    def retrieve_number_of_rows(self) -> int:

        from sqlalchemy import text

        with self._db.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
            num_rows = result.fetchone()[0]

        return num_rows

    def retrieve_column_names(self) -> typing.Iterable[str]:

        from sqlalchemy import inspect

        engine = self._db.get_sqlalchemy_engine()
        inspector = inspect(engine)
        columns = inspector.get_columns(self._table_type.value)
        result = [column["name"] for column in columns]
        return result

    def slice(
        self, offset: int = 0, length: typing.Optional[int] = None
    ) -> "TabularWrap":

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"
        if length:
            query = f"{query} LIMIT {length}"
        else:
            query = f"{query} LIMIT {self.num_rows}"
        if offset > 0:
            query = f"{query} OFFSET {offset}"
        with self._db.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(query))
            result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return DictTabularWrap(result_dict)

    def to_pydict(self) -> typing.Mapping:

        from sqlalchemy import text

        query = f"SELECT * FROM {self._table_name}"

        with self._db.get_sqlalchemy_engine().connect() as con:
            result = con.execute(text(query))
            result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
            for cn in self.column_names:
                result_dict[cn] = []
            for r in result:
                for i, cn in enumerate(self.column_names):
                    result_dict[cn].append(r[i])

        return result_dict
retrieve_column_names(self)
Source code in network_analysis/utils.py
def retrieve_column_names(self) -> typing.Iterable[str]:

    from sqlalchemy import inspect

    engine = self._db.get_sqlalchemy_engine()
    inspector = inspect(engine)
    columns = inspector.get_columns(self._table_type.value)
    result = [column["name"] for column in columns]
    return result
retrieve_number_of_rows(self)
Source code in network_analysis/utils.py
def retrieve_number_of_rows(self) -> int:

    from sqlalchemy import text

    with self._db.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(f"SELECT count(*) from {self._table_name}"))
        num_rows = result.fetchone()[0]

    return num_rows
slice(self, offset=0, length=None)
Source code in network_analysis/utils.py
def slice(
    self, offset: int = 0, length: typing.Optional[int] = None
) -> "TabularWrap":

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"
    if length:
        query = f"{query} LIMIT {length}"
    else:
        query = f"{query} LIMIT {self.num_rows}"
    if offset > 0:
        query = f"{query} OFFSET {offset}"
    with self._db.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(query))
        result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return DictTabularWrap(result_dict)
to_pydict(self)
Source code in network_analysis/utils.py
def to_pydict(self) -> typing.Mapping:

    from sqlalchemy import text

    query = f"SELECT * FROM {self._table_name}"

    with self._db.get_sqlalchemy_engine().connect() as con:
        result = con.execute(text(query))
        result_dict: typing.Dict[str, typing.List[typing.Any]] = {}
        for cn in self.column_names:
            result_dict[cn] = []
        for r in result:
            for i, cn in enumerate(self.column_names):
                result_dict[cn].append(r[i])

    return result_dict
convert_graphml_type_to_sqlite(data_type)
Source code in network_analysis/utils.py
def convert_graphml_type_to_sqlite(data_type: str) -> str:

    type_map = {
        "boolean": "INTEGER",
        "int": "INTEGER",
        "long": "INTEGER",
        "float": "REAL",
        "double": "REAL",
        "string": "TEXT",
    }

    return type_map[data_type]
extract_edges_as_table(graph)
Source code in network_analysis/utils.py
def extract_edges_as_table(graph: "nx.Graph"):

    # adapted from networx code
    # License: 3-clause BSD license
    # Copyright (C) 2004-2022, NetworkX Developers

    import networkx as nx
    import pyarrow as pa

    edgelist = graph.edges(data=True)
    source_nodes = [s for s, _, _ in edgelist]
    target_nodes = [t for _, t, _ in edgelist]

    all_attrs: typing.Set[str] = set().union(*(d.keys() for _, _, d in edgelist))  # type: ignore

    if SOURCE_COLUMN_NAME in all_attrs:
        raise nx.NetworkXError(
            f"Source name {SOURCE_COLUMN_NAME} is an edge attribute name"
        )
    if SOURCE_COLUMN_NAME in all_attrs:
        raise nx.NetworkXError(
            f"Target name {SOURCE_COLUMN_NAME} is an edge attribute name"
        )

    nan = float("nan")
    edge_attr = {k: [d.get(k, nan) for _, _, d in edgelist] for k in all_attrs}

    edge_lists = {
        SOURCE_COLUMN_NAME: source_nodes,
        TARGET_COLUMN_NAME: target_nodes,
    }

    edge_lists.update(edge_attr)
    edges_table = pa.Table.from_pydict(mapping=edge_lists)

    return edges_table
extract_nodes_as_table(graph)
Source code in network_analysis/utils.py
def extract_nodes_as_table(graph: "nx.Graph"):

    # adapted from networx code
    # License: 3-clause BSD license
    # Copyright (C) 2004-2022, NetworkX Developers

    import networkx as nx
    import pyarrow as pa

    nodelist = graph.nodes(data=True)

    node_ids = [n for n, _ in nodelist]

    all_attrs: typing.Set[str] = set().union(*(d.keys() for _, d in nodelist))  # type: ignore

    if ID_COLUMN_NAME in all_attrs:
        raise nx.NetworkXError(
            f"Id column name {ID_COLUMN_NAME} is an node attribute name"
        )
    if SOURCE_COLUMN_NAME in all_attrs:
        raise nx.NetworkXError(
            f"Target name {SOURCE_COLUMN_NAME} is an edge attribute name"
        )

    nan = float("nan")
    node_attr = {k: [d.get(k, nan) for _, d in nodelist] for k in all_attrs}

    node_attr[ID_COLUMN_NAME] = node_ids
    nodes_table = pa.Table.from_pydict(mapping=node_attr)

    return nodes_table
insert_table_data_into_network_graph(network_data, edges_table, edges_column_map=None, nodes_table=None, nodes_column_map=None, chunk_size=1024)
Source code in network_analysis/utils.py
def insert_table_data_into_network_graph(
    network_data: "NetworkData",
    edges_table: "pa.Table",
    edges_column_map: typing.Optional[typing.Mapping[str, str]] = None,
    nodes_table: typing.Optional["pa.Table"] = None,
    nodes_column_map: typing.Optional[typing.Mapping[str, str]] = None,
    chunk_size: int = DEFAULT_NETWORK_DATA_CHUNK_SIZE,
):

    added_node_ids = set()

    if edges_column_map is None:
        edges_column_map = {}
    if nodes_column_map is None:
        nodes_column_map = {}

    if nodes_table is not None:
        for batch in nodes_table.to_batches(chunk_size):
            batch_dict = batch.to_pydict()

            if nodes_column_map:
                for k, v in nodes_column_map.items():
                    if k in batch_dict.keys():
                        if k == ID_COLUMN_NAME and v == LABEL_COLUMN_NAME:
                            _data = batch_dict.get(k)
                        else:
                            _data = batch_dict.pop(k)
                            if v in batch_dict.keys():
                                raise Exception(
                                    "Duplicate nodes column name after mapping: {v}"
                                )
                        batch_dict[v] = _data
            if LABEL_COLUMN_NAME not in batch_dict.keys():
                batch_dict[LABEL_COLUMN_NAME] = (
                    str(x) for x in batch_dict[ID_COLUMN_NAME]
                )

            ids = batch_dict[ID_COLUMN_NAME]
            data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]
            network_data.insert_nodes(*data)

            added_node_ids.update(ids)

    for batch in edges_table.to_batches(chunk_size):

        batch_dict = batch.to_pydict()

        for k, v in edges_column_map.items():
            if k in batch_dict.keys():
                _data = batch_dict.pop(k)
                if v in batch_dict.keys():
                    raise Exception("Duplicate edges column name after mapping: {v}")
                batch_dict[v] = _data

        data = [dict(zip(batch_dict, t)) for t in zip(*batch_dict.values())]

        all_node_ids = network_data.insert_edges(
            *data,
            existing_node_ids=added_node_ids,
        )
        added_node_ids.update(all_node_ids)