Skip to content

ragraph.analysis.cluster

Clustering algorithms

Clustering algorithms detect (nested) clusters of nodes in graphs that are relatively tightly connected by means of their edges. Both hierarchical and flat clustering algorithms are provided.

Apart of algorithm specific parameters, all of them feature the same basic parameters:

  • graph: Graph to cluster containing the relevant nodes and edges.
  • leafs: Optional list of leaf nodes to cluster. If not provided all the graph's leaf nodes are selected.
  • inplace: Boolean toggle whether to create the new cluster nodes in the provided graph or provided a deepcopy of the graph with only the leaf nodes, their edges and newly created cluster nodes.

Available algorithms

MarkovFlow

MarkovFlow(matrix: ndarray, mu: float, scale: bool)

Results of Markov steady-state flow analysis.

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix (non-negative, IR/FAD convention).

required
mu float

Evaporation constant (>1.0).

required
Note

Solves the flow balance equation: A_s @ f + f_in = f, for which: * A_s: normalized adjacency matrix with added evaporation sink. * f_in: injection vector (1.0 for every node except sink). * f = inv(I - A_s) @ f_in (flow vector) * Q = inv(I - A_s) (sensitivity matrix) * f = Q @ f_in * F = A_s * f.T (edge flow matrix)

Source code in ragraph/analysis/cluster/_markov.py
def __init__(self, matrix: np.ndarray, mu: float, scale: bool):
    self.matrix = matrix * (matrix > 0)
    self.mu = mu
    self.scale = scale

dim property

dim: int

Adjacency matrix dimension.

flow_matrix property

flow_matrix: ndarray

Flow over every edge if nodal flow equal self.flow_vector.

flow_vector property

flow_vector: ndarray

Flow through every node if system is injected with self.in_vector.

in_vector property

in_vector: ndarray

Inflow vector used normally. 1.0 inflow at every node except the sink.

sensitivity_matrix property

sensitivity_matrix: ndarray

Sensitivity matrix (Q) with respect to input flow. f = Q @ f_in.

sink_matrix property

sink_matrix: ndarray

Column normalized adjacency matrix with an added evaporation sink.

MarkovRelative

MarkovRelative(adj: ndarray, mu: float)

Markov relative influence and dependency calculations.

Parameters:

Name Type Description Default
adj ndarray

Adjacency matrix (non-negative, IR/FAD convention).

required
mu float

Evaporation constant (>1.0).

required
Note

Solves specific cases of the Markov flow analysis where each node is injected with a flow of 1 to calculate the relative influence. The network is also reversed and injected again, which results in relative dependency.

Source code in ragraph/analysis/cluster/_markov.py
def __init__(self, adj: np.ndarray, mu: float):
    self.adj = adj
    self.mu = mu

dependency_matrix property

dependency_matrix: ndarray

Relative Dependency Matrix (RDM). Percentage of dependency of j on i.

influence_matrix property

influence_matrix: ndarray

Relative influence matrix (RIM). Percentage of influence originating from j on i.

calculate_tpm

calculate_tpm(matrix: ndarray, mu: float) -> ndarray

Calculate Transfer Probability Matrix (TPM), which in turn is the sum of the Relative Influence and Relative Depencency Matrices (RIM, RDM).

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix.

required
mu float

Decay constant.

required

Returns:

Type Description
ndarray

Transfer Probability Matrix (TPM).

Source code in ragraph/analysis/cluster/_markov.py
def calculate_tpm(matrix: np.ndarray, mu: float) -> np.ndarray:
    """Calculate Transfer Probability Matrix (TPM), which in turn is the sum of the
    Relative Influence and Relative Depencency Matrices (RIM, RDM).

    Arguments:
        matrix: Adjacency matrix.
        mu: Decay constant.

    Returns:
        Transfer Probability Matrix (TPM).
    """
    relative = MarkovRelative(matrix, mu)

    # Relative Influence Matrix (RIM). Percentage of influence originating from j on i.
    rim = relative.influence_matrix

    # Relative Dependency Matrix (RDM). Percentage of dependency of j on i.
    rdm = relative.dependency_matrix

    # Create Transfer Probability Matrix (TPM).
    tpm = rim + rdm

    # Set diagonal to maximum nonzero column entry. This enforces a suitable aperiodic
    # stochastic matrix.
    np.fill_diagonal(tpm, 0.0)
    colmax = tpm.max(axis=0)
    colmax[colmax == 0.0] = 1.0
    np.fill_diagonal(tpm, colmax)

    # Normalize each column for a column sum of 1.
    tpm = tpm / np.sum(tpm, 0)

    return tpm

create_clusters

1
2
3
create_clusters(
    graph: Graph, nodes: List[Node], cluster_ids: ndarray
) -> List[Node]

Assign nodes in graph to new cluster nodes using a numbered array.

Parameters:

Name Type Description Default
graph Graph

Graph to add clusters into.

required
nodes List[Node]

Nodes that were clustered.

required
cluster_ids ndarray

1D array with numbered cluster assignment per node.

required

Returns:

Type Description
List[Node]

Current root nodes in the graph.

Source code in ragraph/analysis/cluster/_markov.py
def create_clusters(graph: Graph, nodes: List[Node], cluster_ids: np.ndarray) -> List[Node]:
    """Assign nodes in graph to new cluster nodes using a numbered array.

    Arguments:
        graph: Graph to add clusters into.
        nodes: Nodes that were clustered.
        cluster_ids: 1D array with numbered cluster assignment per node.

    Returns:
        Current root nodes in the graph.
    """
    assert (
        len(nodes) == cluster_ids.size
    ), "Node count should match cluster IDs length. Found %d vs %d." % (
        len(nodes),
        cluster_ids.size,
    )

    # Transform cluster_ids to ranks of their unique values.
    # This makes them a perfect index sequence starting at 0.
    unique_ids, id_ranks = np.unique(cluster_ids, return_inverse=True)

    # Prepare children lists and fill them.
    children_lists: List[List[Node]] = [[] for i in unique_ids]
    for i, node in enumerate(nodes):
        rank = id_ranks[i]
        children_lists[rank].append(node)

    # Create cluster nodes and keep track of cluster roots.
    new_cluster_roots = []
    unchanged_cluster_roots = []
    for children in children_lists:
        if len(children) > 1:
            parent = create_parent(graph, children)
            new_cluster_roots.append(parent)
        # Node is it's own parent, don't change anything.
        else:
            unchanged_cluster_roots.append(children[0])

    return new_cluster_roots + unchanged_cluster_roots

get_sink_matrix

get_sink_matrix(matrix: ndarray, mu: float) -> ndarray

Calculate a normalized flow distribution matrix with an evaporation sink node.

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix.

required
mu float

Evaporation constant.

required

Returns:

Type Description
ndarray

Normalized flow distribution matrix with an added evaporation sink node at the bottom right.

Note

Matrix structure of: [a00 a01 ... a0n 0][a10 a11 ... a1n 0] [... ... ... ... .][an0 an1 ... ann 0] [ e e ... e 0] Where all columns are normalized [0.0, 1.0]

Source code in ragraph/analysis/cluster/_markov.py
def get_sink_matrix(matrix: np.ndarray, mu: float) -> np.ndarray:
    """Calculate a normalized flow distribution matrix with an evaporation sink node.

    Arguments:
        matrix: Adjacency matrix.
        mu: Evaporation constant.

    Returns:
        Normalized flow distribution matrix with an added evaporation sink node at the
            bottom right.

    Note:
        Matrix structure of:
        [a00 a01 ... a0n 0]
        [a10 a11 ... a1n 0]
        [... ... ... ... .]
        [an0 an1 ... ann 0]
        [ e   e  ...  e  0]
        Where all columns are normalized [0.0, 1.0]
    """
    cont = 1 / mu
    evap = 1 - cont
    dim = len(matrix)
    S = matrix.sum(axis=0)  # Column sums.
    E = evap * np.ones((1, dim))  # Evaporation row.

    # Fixup edges with no outputs
    no_outputs = S == 0
    S[no_outputs] = 1.0
    E[:, no_outputs] = 1.0

    A_n = cont * matrix / S  # Normalized flow matrix [0,cont].

    # Add evaporation sink to adjacency matrix.
    # A_s = [[A_mu, 0], [evap, 0]]
    A_s = np.block([[A_n, np.zeros((dim, 1))], [E, np.zeros((1, 1))]])
    return A_s

hierarchical_markov

hierarchical_markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,
    beta: float = markov_params["beta"].default,
    mu: float = markov_params["mu"].default,
    max_iter: int = markov_params["max_iter"].default,
    symmetrize: bool = markov_params["symmetrize"].default,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_markov.py
@hierarchical_markov_analysis
def hierarchical_markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,  # type: ignore
    beta: float = markov_params["beta"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    max_iter: int = markov_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_params["symmetrize"].default,  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    cluster_roots: List[Node] = [] if leafs is None else leafs  # type: ignore
    children: List[Node] = []

    while set(cluster_roots) != set(children) and len(cluster_roots) > 1:
        # Previous cluster_roots become children for new cluster level.
        children = cluster_roots
        graph, cluster_roots = markov(
            graph,
            root=None,
            leafs=children,
            inherit=inherit,
            loops=loops,
            edge_weights=edge_weights,
            alpha=alpha,
            beta=beta,
            mu=mu,
            inplace=True,  # Recursive calls may work in the given graph.
            max_iter=max_iter,
        )

    return graph, cluster_roots

markov

markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,
    beta: float = markov_params["beta"].default,
    mu: float = markov_params["mu"].default,
    max_iter: int = markov_params["max_iter"].default,
    symmetrize: bool = markov_params["symmetrize"],
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_markov.py
@markov_analysis
def markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,  # type: ignore
    beta: float = markov_params["beta"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    max_iter: int = markov_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_params["symmetrize"],  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    assert leafs is not None
    mat = graph.get_adjacency_matrix(leafs, inherit=inherit, loops=loops, only=edge_weights)
    assert isinstance(mat, np.ndarray)
    if symmetrize:
        mat = mat + mat.T  # Add transpose to get a guaranteed symmetrical matrix.
    tpm = calculate_tpm(mat, mu)
    if alpha > 1:  # Otherwise algorithm does nothing, then column max -> cluster ID.
        i = 0

        # TPM pruning threshold and equality tolerances.
        rtol = max(max_iter**-2, 1e-15)
        atol = max(max_iter**-3, 1e-15)
        bin_mat = mat > 0
        wmax = bin_mat.sum(axis=0).flatten().max()
        threshold = (mu * max(1.0, wmax)) ** -(alpha + 1)

        # TPM expansion/inflation loop
        tpm = prune_matrix(tpm, threshold)
        last_tpm = np.zeros_like(tpm)
        while not np.allclose(tpm, last_tpm, rtol=rtol, atol=atol) and i < max_iter:
            last_tpm = tpm.copy()

            tpm = np.linalg.matrix_power(tpm, alpha)  # Expansion step
            tpm = np.power(tpm, beta)  # Inflation step
            tpm = prune_matrix(tpm, threshold)  # Threshold step

            i += 1

    # Cluster IDs are row numbers of max values in columns.
    cluster_ids = tpm.argmax(0)
    cluster_roots = create_clusters(graph, leafs, cluster_ids)  # type: ignore

    return graph, cluster_roots

prune_matrix

prune_matrix(matrix: ndarray, threshold: float) -> ndarray

Return a column normalized matrix for which all values below the threshold have been pruned (set to 0.0).

Parameters:

Name Type Description Default
matrix ndarray

Matrix to prune.

required
threshold float

Cut-off threshold for normalized values.

required

Returns:

Type Description
ndarray

Pruned matrix.

Source code in ragraph/analysis/cluster/_markov.py
def prune_matrix(matrix: np.ndarray, threshold: float) -> np.ndarray:
    """Return a column normalized matrix for which all values below the threshold have
    been pruned (set to 0.0).

    Arguments:
        matrix: Matrix to prune.
        threshold: Cut-off threshold for normalized values.

    Returns:
        Pruned matrix.
    """
    colsum = matrix.sum(0)
    to_prune = np.logical_and(matrix < threshold * colsum, matrix > 0)

    while to_prune.any():
        matrix[to_prune] = 0
        colsum = matrix.sum(0)
        to_prune = np.logical_and(matrix < threshold * colsum, matrix > 0)

    matrix = matrix / colsum
    return matrix

tarjans_scc

tarjans_scc(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_tarjan.py
@tarjans_scc_analysis
def tarjans_scc(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    assert leafs is not None
    clusters = tarjans_scc_algorithm(graph, leafs, inherit)  # type: ignore

    cluster_roots = []
    for children in clusters:
        if len(children) > 1:
            parent = create_parent(graph, children)
            cluster_roots.append(parent)
        else:
            cluster_roots.append(children[0])

    return graph, cluster_roots

_markov

Markov clustering module

Reference

Wilschut, T., Etman, L. F. P., Rooda, J. E., & Adan, I. J. B. F. (2017). Multilevel Flow-Based Markov Clustering for Design Structure Matrices. Journal of Mechanical Design, 139(12), 121402. DOI: 10.1115/1.4037626

MarkovFlow

MarkovFlow(matrix: ndarray, mu: float, scale: bool)

Results of Markov steady-state flow analysis.

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix (non-negative, IR/FAD convention).

required
mu float

Evaporation constant (>1.0).

required
Note

Solves the flow balance equation: A_s @ f + f_in = f, for which: * A_s: normalized adjacency matrix with added evaporation sink. * f_in: injection vector (1.0 for every node except sink). * f = inv(I - A_s) @ f_in (flow vector) * Q = inv(I - A_s) (sensitivity matrix) * f = Q @ f_in * F = A_s * f.T (edge flow matrix)

Source code in ragraph/analysis/cluster/_markov.py
def __init__(self, matrix: np.ndarray, mu: float, scale: bool):
    self.matrix = matrix * (matrix > 0)
    self.mu = mu
    self.scale = scale

dim property

dim: int

Adjacency matrix dimension.

flow_matrix property

flow_matrix: ndarray

Flow over every edge if nodal flow equal self.flow_vector.

flow_vector property

flow_vector: ndarray

Flow through every node if system is injected with self.in_vector.

in_vector property

in_vector: ndarray

Inflow vector used normally. 1.0 inflow at every node except the sink.

sensitivity_matrix property

sensitivity_matrix: ndarray

Sensitivity matrix (Q) with respect to input flow. f = Q @ f_in.

sink_matrix property

sink_matrix: ndarray

Column normalized adjacency matrix with an added evaporation sink.

MarkovRelative

MarkovRelative(adj: ndarray, mu: float)

Markov relative influence and dependency calculations.

Parameters:

Name Type Description Default
adj ndarray

Adjacency matrix (non-negative, IR/FAD convention).

required
mu float

Evaporation constant (>1.0).

required
Note

Solves specific cases of the Markov flow analysis where each node is injected with a flow of 1 to calculate the relative influence. The network is also reversed and injected again, which results in relative dependency.

Source code in ragraph/analysis/cluster/_markov.py
def __init__(self, adj: np.ndarray, mu: float):
    self.adj = adj
    self.mu = mu

dependency_matrix property

dependency_matrix: ndarray

Relative Dependency Matrix (RDM). Percentage of dependency of j on i.

influence_matrix property

influence_matrix: ndarray

Relative influence matrix (RIM). Percentage of influence originating from j on i.

calculate_tpm

calculate_tpm(matrix: ndarray, mu: float) -> ndarray

Calculate Transfer Probability Matrix (TPM), which in turn is the sum of the Relative Influence and Relative Depencency Matrices (RIM, RDM).

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix.

required
mu float

Decay constant.

required

Returns:

Type Description
ndarray

Transfer Probability Matrix (TPM).

Source code in ragraph/analysis/cluster/_markov.py
def calculate_tpm(matrix: np.ndarray, mu: float) -> np.ndarray:
    """Calculate Transfer Probability Matrix (TPM), which in turn is the sum of the
    Relative Influence and Relative Depencency Matrices (RIM, RDM).

    Arguments:
        matrix: Adjacency matrix.
        mu: Decay constant.

    Returns:
        Transfer Probability Matrix (TPM).
    """
    relative = MarkovRelative(matrix, mu)

    # Relative Influence Matrix (RIM). Percentage of influence originating from j on i.
    rim = relative.influence_matrix

    # Relative Dependency Matrix (RDM). Percentage of dependency of j on i.
    rdm = relative.dependency_matrix

    # Create Transfer Probability Matrix (TPM).
    tpm = rim + rdm

    # Set diagonal to maximum nonzero column entry. This enforces a suitable aperiodic
    # stochastic matrix.
    np.fill_diagonal(tpm, 0.0)
    colmax = tpm.max(axis=0)
    colmax[colmax == 0.0] = 1.0
    np.fill_diagonal(tpm, colmax)

    # Normalize each column for a column sum of 1.
    tpm = tpm / np.sum(tpm, 0)

    return tpm

create_clusters

1
2
3
create_clusters(
    graph: Graph, nodes: List[Node], cluster_ids: ndarray
) -> List[Node]

Assign nodes in graph to new cluster nodes using a numbered array.

Parameters:

Name Type Description Default
graph Graph

Graph to add clusters into.

required
nodes List[Node]

Nodes that were clustered.

required
cluster_ids ndarray

1D array with numbered cluster assignment per node.

required

Returns:

Type Description
List[Node]

Current root nodes in the graph.

Source code in ragraph/analysis/cluster/_markov.py
def create_clusters(graph: Graph, nodes: List[Node], cluster_ids: np.ndarray) -> List[Node]:
    """Assign nodes in graph to new cluster nodes using a numbered array.

    Arguments:
        graph: Graph to add clusters into.
        nodes: Nodes that were clustered.
        cluster_ids: 1D array with numbered cluster assignment per node.

    Returns:
        Current root nodes in the graph.
    """
    assert (
        len(nodes) == cluster_ids.size
    ), "Node count should match cluster IDs length. Found %d vs %d." % (
        len(nodes),
        cluster_ids.size,
    )

    # Transform cluster_ids to ranks of their unique values.
    # This makes them a perfect index sequence starting at 0.
    unique_ids, id_ranks = np.unique(cluster_ids, return_inverse=True)

    # Prepare children lists and fill them.
    children_lists: List[List[Node]] = [[] for i in unique_ids]
    for i, node in enumerate(nodes):
        rank = id_ranks[i]
        children_lists[rank].append(node)

    # Create cluster nodes and keep track of cluster roots.
    new_cluster_roots = []
    unchanged_cluster_roots = []
    for children in children_lists:
        if len(children) > 1:
            parent = create_parent(graph, children)
            new_cluster_roots.append(parent)
        # Node is it's own parent, don't change anything.
        else:
            unchanged_cluster_roots.append(children[0])

    return new_cluster_roots + unchanged_cluster_roots

get_sink_matrix

get_sink_matrix(matrix: ndarray, mu: float) -> ndarray

Calculate a normalized flow distribution matrix with an evaporation sink node.

Parameters:

Name Type Description Default
matrix ndarray

Adjacency matrix.

required
mu float

Evaporation constant.

required

Returns:

Type Description
ndarray

Normalized flow distribution matrix with an added evaporation sink node at the bottom right.

Note

Matrix structure of: [a00 a01 ... a0n 0][a10 a11 ... a1n 0] [... ... ... ... .][an0 an1 ... ann 0] [ e e ... e 0] Where all columns are normalized [0.0, 1.0]

Source code in ragraph/analysis/cluster/_markov.py
def get_sink_matrix(matrix: np.ndarray, mu: float) -> np.ndarray:
    """Calculate a normalized flow distribution matrix with an evaporation sink node.

    Arguments:
        matrix: Adjacency matrix.
        mu: Evaporation constant.

    Returns:
        Normalized flow distribution matrix with an added evaporation sink node at the
            bottom right.

    Note:
        Matrix structure of:
        [a00 a01 ... a0n 0]
        [a10 a11 ... a1n 0]
        [... ... ... ... .]
        [an0 an1 ... ann 0]
        [ e   e  ...  e  0]
        Where all columns are normalized [0.0, 1.0]
    """
    cont = 1 / mu
    evap = 1 - cont
    dim = len(matrix)
    S = matrix.sum(axis=0)  # Column sums.
    E = evap * np.ones((1, dim))  # Evaporation row.

    # Fixup edges with no outputs
    no_outputs = S == 0
    S[no_outputs] = 1.0
    E[:, no_outputs] = 1.0

    A_n = cont * matrix / S  # Normalized flow matrix [0,cont].

    # Add evaporation sink to adjacency matrix.
    # A_s = [[A_mu, 0], [evap, 0]]
    A_s = np.block([[A_n, np.zeros((dim, 1))], [E, np.zeros((1, 1))]])
    return A_s

hierarchical_markov

hierarchical_markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,
    beta: float = markov_params["beta"].default,
    mu: float = markov_params["mu"].default,
    max_iter: int = markov_params["max_iter"].default,
    symmetrize: bool = markov_params["symmetrize"].default,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_markov.py
@hierarchical_markov_analysis
def hierarchical_markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,  # type: ignore
    beta: float = markov_params["beta"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    max_iter: int = markov_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_params["symmetrize"].default,  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    cluster_roots: List[Node] = [] if leafs is None else leafs  # type: ignore
    children: List[Node] = []

    while set(cluster_roots) != set(children) and len(cluster_roots) > 1:
        # Previous cluster_roots become children for new cluster level.
        children = cluster_roots
        graph, cluster_roots = markov(
            graph,
            root=None,
            leafs=children,
            inherit=inherit,
            loops=loops,
            edge_weights=edge_weights,
            alpha=alpha,
            beta=beta,
            mu=mu,
            inplace=True,  # Recursive calls may work in the given graph.
            max_iter=max_iter,
        )

    return graph, cluster_roots

markov

markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,
    beta: float = markov_params["beta"].default,
    mu: float = markov_params["mu"].default,
    max_iter: int = markov_params["max_iter"].default,
    symmetrize: bool = markov_params["symmetrize"],
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_markov.py
@markov_analysis
def markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_params["alpha"].default,  # type: ignore
    beta: float = markov_params["beta"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    max_iter: int = markov_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_params["symmetrize"],  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    assert leafs is not None
    mat = graph.get_adjacency_matrix(leafs, inherit=inherit, loops=loops, only=edge_weights)
    assert isinstance(mat, np.ndarray)
    if symmetrize:
        mat = mat + mat.T  # Add transpose to get a guaranteed symmetrical matrix.
    tpm = calculate_tpm(mat, mu)
    if alpha > 1:  # Otherwise algorithm does nothing, then column max -> cluster ID.
        i = 0

        # TPM pruning threshold and equality tolerances.
        rtol = max(max_iter**-2, 1e-15)
        atol = max(max_iter**-3, 1e-15)
        bin_mat = mat > 0
        wmax = bin_mat.sum(axis=0).flatten().max()
        threshold = (mu * max(1.0, wmax)) ** -(alpha + 1)

        # TPM expansion/inflation loop
        tpm = prune_matrix(tpm, threshold)
        last_tpm = np.zeros_like(tpm)
        while not np.allclose(tpm, last_tpm, rtol=rtol, atol=atol) and i < max_iter:
            last_tpm = tpm.copy()

            tpm = np.linalg.matrix_power(tpm, alpha)  # Expansion step
            tpm = np.power(tpm, beta)  # Inflation step
            tpm = prune_matrix(tpm, threshold)  # Threshold step

            i += 1

    # Cluster IDs are row numbers of max values in columns.
    cluster_ids = tpm.argmax(0)
    cluster_roots = create_clusters(graph, leafs, cluster_ids)  # type: ignore

    return graph, cluster_roots

prune_matrix

prune_matrix(matrix: ndarray, threshold: float) -> ndarray

Return a column normalized matrix for which all values below the threshold have been pruned (set to 0.0).

Parameters:

Name Type Description Default
matrix ndarray

Matrix to prune.

required
threshold float

Cut-off threshold for normalized values.

required

Returns:

Type Description
ndarray

Pruned matrix.

Source code in ragraph/analysis/cluster/_markov.py
def prune_matrix(matrix: np.ndarray, threshold: float) -> np.ndarray:
    """Return a column normalized matrix for which all values below the threshold have
    been pruned (set to 0.0).

    Arguments:
        matrix: Matrix to prune.
        threshold: Cut-off threshold for normalized values.

    Returns:
        Pruned matrix.
    """
    colsum = matrix.sum(0)
    to_prune = np.logical_and(matrix < threshold * colsum, matrix > 0)

    while to_prune.any():
        matrix[to_prune] = 0
        colsum = matrix.sum(0)
        to_prune = np.logical_and(matrix < threshold * colsum, matrix > 0)

    matrix = matrix / colsum
    return matrix

_tarjan

Tarjan's strongly connected components algorithm.

Reference

Tarjan, R. (1972). Depth-First Search and Linear Graph Algorithms. SIAM Journal on Computing, 1(2), 146-160. [DOI: 10.1137/0201010] (https://doi.org/10.1137/0201010)

tarjans_scc

tarjans_scc(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, Union[List[Node], List[str]]]

docstring stub

Source code in ragraph/analysis/cluster/_tarjan.py
@tarjans_scc_analysis
def tarjans_scc(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, Union[List[Node], List[str]]]:
    """docstring stub"""
    assert leafs is not None
    clusters = tarjans_scc_algorithm(graph, leafs, inherit)  # type: ignore

    cluster_roots = []
    for children in clusters:
        if len(children) > 1:
            parent = create_parent(graph, children)
            cluster_roots.append(parent)
        else:
            cluster_roots.append(children[0])

    return graph, cluster_roots

tarjans_scc_algorithm

1
2
3
tarjans_scc_algorithm(
    graph: Graph, nodes: List[Node], inherit: bool
) -> List[List[Node]]

Tarjan's strongly connected components algorithm.

Parameters:

Name Type Description Default
graph Graph

Graph to detect SCC's in (cycles).

required
nodes List[Node]

List of nodes (components) to detect SCC's for.

required
inherit bool

Whether to take into account (inherit) edges between children during calculations.

required

Returns:

Type Description
List[List[Node]]

Lists of strongly connected components.

Reference

Tarjan, R. (1972). Depth-First Search and Linear Graph Algorithms. SIAM Journal on Computing, 1(2), 146-160. https://doi.org/10.1137/0201010

Source code in ragraph/analysis/cluster/_tarjan.py
def tarjans_scc_algorithm(graph: Graph, nodes: List[Node], inherit: bool) -> List[List[Node]]:
    """Tarjan's strongly connected components algorithm.

    Arguments:
        graph: Graph to detect SCC's in (cycles).
        nodes: List of nodes (components) to detect SCC's for.
        inherit: Whether to take into account (inherit) edges between children during
            calculations.

    Returns:
        Lists of strongly connected components.

    Reference:
        Tarjan, R. (1972). Depth-First Search and Linear Graph Algorithms.
            SIAM Journal on Computing, 1(2), 146-160. https://doi.org/10.1137/0201010
    """
    out = []
    index = 0
    indexes: Dict[Node, int] = {n: -1 for n in nodes}
    lowlinks: Dict[Node, int] = {n: -1 for n in nodes}
    onstacks: Dict[Node, int] = {n: -1 for n in nodes}
    stack = []

    targets_of: Dict[Node, List[Node]] = {
        source: [
            target
            for target in nodes
            if any(graph.edges_between(source, target, inherit=inherit, loops=False))
        ]
        for source in nodes
    }

    def strongconnect(n: Node):
        nonlocal index
        indexes[n] = index
        lowlinks[n] = index
        index = index + 1
        stack.append(n)
        onstacks[n] = True

        for m in targets_of[n]:
            if indexes[m] == -1:
                strongconnect(m)
                lowlinks[n] = min(lowlinks[n], lowlinks[m])  # type: ignore
            elif onstacks[m]:
                lowlinks[n] = min(lowlinks[n], indexes[m])  # type: ignore

        if lowlinks[n] == indexes[n]:
            scc = []
            k = None
            while n != k:
                k = stack.pop()
                onstacks[k] = False
                scc.append(k)
            out.append(scc)

    for node in nodes:
        if indexes[node] == -1:
            strongconnect(node)

    return out