"""
Graph-related utilities.
"""
import logging
import warnings
import numpy as np
import scipy.sparse
import scipy.sparse.csgraph
from hynet.types_ import hynet_int_
from hynet.utilities.base import (create_sparse_matrix,
create_sparse_zero_matrix)
_log = logging.getLogger(__name__)
[docs]def eliminate_parallel_edges(edges):
"""
Eliminates parallel edges from the graph and returns those edges.
The edges are considered undirected, i.e., and the source and destination
node are interchangeable. In the returned edges, the source is set to the
adjacent node with the higher node number.
"""
E = np.column_stack((edges[0], edges[1]))
if E.shape[0] != 0:
E.sort(axis=1)
E = np.unique(E, axis=0)
return E[:, 1], E[:, 0]
[docs]def get_adjacency_matrix(edges, num_nodes=None, weights=None):
"""
Return the adjacency matrix for an undirected graph specified by its edges.
Parameters
----------
edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the graph in terms of node number tuples.
num_nodes : int, optional
Number of nodes in the graph. By default, this is set to the maximum
node number in ``edges`` plus one.
weights : numpy.ndarray[.hynet_float_], optional
Edge weights for a *weighted* adjacency matrix. For parallel edges,
the weights are accumulated.
Returns
-------
A : .hynet_sparse_
Adjacency matrix of the undirected graph.
"""
(e_src, e_dst) = edges
if not e_src.size:
num_nodes = 0 if num_nodes is None else num_nodes
return create_sparse_zero_matrix(num_nodes, num_nodes, dtype=hynet_int_)
if min([e_src.min(), e_dst.min()]) < 0:
raise ValueError('Negative node numbers are not supported.')
if np.any(e_src == e_dst):
raise ValueError('The graph must not contain any self-loops.')
max_node = max([e_src.max(), e_dst.max()])
if num_nodes is None:
num_nodes = max_node + 1
elif num_nodes <= max_node:
raise ValueError('The edges are referencing non-existent nodes.')
if weights is None:
(e_src, e_dst) = eliminate_parallel_edges((e_src, e_dst))
weights = np.ones(len(e_src), dtype=hynet_int_)
else:
if len(weights) != len(e_src):
raise ValueError('The number of weights and edges must match.')
if np.any(weights == 0):
_log.warning('Some edge weights are zero.')
A = create_sparse_matrix(e_src, e_dst, weights, num_nodes, num_nodes,
dtype=weights.dtype)
A += A.transpose()
return A
[docs]def get_laplacian_matrix(edges, num_nodes=None):
"""
Return the Laplacian matrix for an undirected graph specified by its edges.
Parameters
----------
edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the graph in terms of node number tuples.
num_nodes : int, optional
Number of nodes in the graph. By default, this is set to the maximum
node number in ``edges`` plus one.
Returns
-------
L : .hynet_sparse_
Laplacian matrix of the undirected graph.
"""
A = get_adjacency_matrix(edges, num_nodes=num_nodes)
if A.shape == (0, 0):
return create_sparse_zero_matrix(0, 0, dtype=hynet_int_)
return scipy.sparse.csgraph.laplacian(A)
[docs]def get_minimum_spanning_tree(edges, weights):
"""
Return the minimum spanning tree in terms of an edges tuple.
Parameters
----------
edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the graph in terms of node number tuples.
weights : numpy.ndarray[.hynet_float_], optional
Edge weights. For parallel edges, the weights are accumulated.
Returns
-------
mst_edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the minimum spanning tree in terms of node number tuples.
"""
A = get_adjacency_matrix(edges, weights=weights)
if A.shape == (0, 0):
return np.ndarray(0, dtype=hynet_int_), np.ndarray(0, dtype=hynet_int_)
return scipy.sparse.csgraph.minimum_spanning_tree(A).nonzero()
[docs]def get_num_spanning_trees(edges):
"""
Return the number of spanning trees for the given undirected graph.
The number of spanning trees is determined using Kirchhoff's matrix tree
theorem, see e.g. [1]_.
*Caution:* The vertices must be numbered consecutively starting from zero.
If the graph is not connected, the number of spanning trees is zero.
*Remark:* As SciPy does not inherently support the computation of the
determinant of a sparse matrix, the Laplacian matrix is converted to a
dense (NumPy) matrix for the computation of a cofactor of the Laplacian.
Due to this, the performance is potentially poor for large graphs.
Parameters
----------
edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the graph in terms of node number tuples.
Returns
-------
numpy.float64
Number of spanning trees of the undirected graph.
Raises
------
OverflowError : If the computation results in an overflow.
References
----------
.. [1] Russell Merris, "Laplacian Matrices of Graphs: A Survey", Linear
Algebra and its Applications, vol. 197-198, p. 143-176, 1994.
"""
L = get_laplacian_matrix(edges).toarray()
with warnings.catch_warnings():
warnings.filterwarnings('error') # Treat warnings as errors
try:
num_span_trees = np.around(np.linalg.det(L[:-1, :-1]))
except Warning as warning:
if 'overflow' in str(warning).lower():
raise OverflowError('Overflow encountered during the computation '
'of the number of spanning trees.') from None
else:
_log.warning(str(warning))
return num_span_trees
[docs]def traverse_graph(nodes, edges, callback, roots=None, auto_root=True):
"""
Traverse the graph and run a callback at every node.
The graph is traversed in a depth-first fashion and, at every visited node,
the provided callback function is called, which must exhibit the signature::
def callback(node, node_pre, cycle)
Therein, ``node`` is the currently visited node, ``node_pre`` is the
previously visited node (or ``numpy.nan`` if the current node is the
component's root, i.e., the traversal entered a new component), and
``cycle`` is ``True`` if the current node was already visited, i.e., the
edge traversed from the previous to the current node closes a cycle. The
callback function returns an abort flag, i.e., if the callback returns
``True``, the traversal is aborted.
**Remark:** This function is most efficient if the nodes are numbered
consecutively from zero to the number of nodes minus one. Negative node
numbers are not supported.
Parameters
----------
nodes : numpy.ndarray[.hynet_int_]
Node numbers of the graph.
edges : (numpy.ndarray[.hynet_int_], numpy.ndarray[.hynet_int_])
Edges of the graph in terms of node number tuples.
callback : function
Node visit callback function.
roots : numpy.ndarray[.hynet_int_], optional
Numbers of the root nodes of the graph.
auto_root : bool, optional
If True (default), components without a root node are assigned an
arbitrary node as its root.
"""
# timer = Timer()
# debug_message = ("Graph traversal with '"
# + callback.__name__ + "' ({:.3f} sec.)")
roots = np.ndarray(0) if roots is None else roots
if not nodes.size:
return
(e_src, e_dst) = eliminate_parallel_edges(edges)
if min(nodes) < 0:
raise RuntimeError('Negative node numbers are not supported.')
if not (np.all(np.isin(e_src, nodes)) and np.all(np.isin(e_dst, nodes))):
raise RuntimeError('The edges are referencing non-existent nodes.')
if not np.all(np.isin(roots, nodes)):
raise RuntimeError('The roots are referencing non-existent nodes.')
# REMARK: As this implementation is robust against duplicate entries in
# 'nodes', this data validity check is omitted.
N = max(nodes) + 1
stack = np.zeros(N, dtype=hynet_int_) # Stack of nodes to be visited
visited = np.zeros(N, dtype=bool) # Indicates if a node was visited
root_idx = 0 # Index of the next root to process
# Set gaps in the node numbering to 'visited'
visited[np.setdiff1d(range(N), nodes)] = True
while not np.all(visited):
# Ignore ambiguous root nodes
while root_idx < len(roots) and visited[roots[root_idx]]:
_log.warning("Ambiguous root {} is ignored.".format(roots[root_idx]))
root_idx += 1
# Set root node for next component to traverse
stack_top = 0
if root_idx < len(roots):
stack[stack_top] = roots[root_idx]
root_idx += 1
elif auto_root:
stack[stack_top] = np.where(~visited)[0][0]
else:
raise RuntimeError('Graph contains components without root nodes.')
if callback(stack[stack_top], np.nan, False):
# _log.debug(debug_message.format(timer.total()))
return
visited[stack[stack_top]] = True
# Perform depth-first traversal of this connected component
while stack_top >= 0:
# Pop a previously visited node from the stack
node_pre = stack[stack_top]
stack_top -= 1
# Visit all adjacent nodes
idx_src = e_src == node_pre
idx_dst = e_dst == node_pre
adjacent_nodes = np.concatenate((e_dst[idx_src], e_src[idx_dst]))
for node in adjacent_nodes:
if callback(node, node_pre, visited[node]):
# _log.debug(debug_message.format(timer.total()))
return
# Push previously unvisited adjacent nodes to the top of the stack
adjacent_nodes = adjacent_nodes[~visited[adjacent_nodes]]
new_top = stack_top + len(adjacent_nodes)
stack[(stack_top + 1):(new_top + 1)] = adjacent_nodes[::-1]
stack_top = new_top
visited[adjacent_nodes] = True
# Remove traversed edges
idx = idx_src | idx_dst
e_src[idx] = -1
e_dst[idx] = -1
# _log.debug(debug_message.format(timer.total()))
[docs]def is_acyclic_graph(nodes, edges):
"""Return ``True`` if the graph is acyclic and ``False`` otherwise."""
# Flag for acyclicity: For the closure to work, we have to avoid a
# direct assignment in the callback as this causes Python to create a
# local variable. Due to this, the flag is stored in a list.
acyclic = [True]
def detect_cycles(node, node_pre, cycle): # pylint: disable=unused-argument
"""Callback for the graph traversal to detect cycles."""
if cycle:
acyclic[0] = False
return True # Abort traversal
return False
traverse_graph(nodes, edges, detect_cycles)
return acyclic[0]
[docs]def get_graph_components(nodes, edges, roots=None):
"""
Return a list of connected components.
The returned list contains an element for every connected component, where
the element constitutes an array with all nodes of that component. The
first element of the array is the root node of the respective component.
"""
components = []
def identify_components(node, node_pre, cycle):
"""Callback for the graph traversal to identify components."""
if np.isnan(node_pre):
components.append([node])
elif not cycle:
components[-1].append(node)
traverse_graph(nodes, edges, identify_components, roots)
return [np.array(component) for component in components]
[docs]def is_acyclic_component(nodes, edges, root):
"""
Return ``True`` if the root's component is acyclic and ``False`` otherwise.
"""
# Flag for acyclicity: For the closure to work, we have to avoid a
# direct assignment in the callback as this causes Python to create a
# local variable. Due to this, the flag is stored in a list.
acyclic = [True]
def detect_component_cycles(node, node_pre, cycle):
"""
Callback for the graph traversal to detect cycles in the component.
"""
if node != root and np.isnan(node_pre):
# We left the root's component, so abort the traversal
return True
if cycle:
# We identified a cycle, so abort the traversal
acyclic[0] = False
return True
return False
traverse_graph(nodes, edges, detect_component_cycles, roots=[root])
return acyclic[0]