Skip to content

ragraph.analysis.heuristics

Heuristics

Heuristics are combinations of algorithms available in the other sub-packages or have an output that does not strictly fit one of the other categories. Not all heuristics have a common argument structure because of their more diverse nature.

Available heuristics

  • dijkstra shortest reach algorithm.
  • johnson circuit finding algorithm.
  • markov_gamma hierarchical clustering with bus detection heuristic.

dijkstra

dijkstra(
    graph: Graph,
    start_node: Node,
    nodes: list[Node] | None = None,
    weight: str | None = None,
    inherit: bool = False,
    aggregate: bool = False,
    **kwargs
) -> tuple[defaultdict[str, float], dict[str, str]]

Dijkstra's algorithm for finding the all shortest (weighted) path originating from a given node.

Parameters:

Name Type Description Default
graph Graph

Graph to find circuits in.

required
start_node Node

Node to start the search from.

required
nodes list[Node] | None

Nodes to consider during circuit search.

None
weight str | None

Weight label to use when traversing edges.

None
inherit bool

Whether to consider edges between children of the given nodes.

False
aggregate bool

Whether to take the aggregate of all edge weights between two nodes as the distance instead of evaluating individual edges.

False

Returns:

Type Description
defaultdict[str, float]
  • Distance dictionary from reached nodes to the minimum weight to get there.
dict[str, str]
  • Previous node dictionary to reconstruct paths with.
Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def dijkstra(
    graph: Graph,
    start_node: Node,
    nodes: list[Node] | None = None,
    weight: str | None = None,
    inherit: bool = False,
    aggregate: bool = False,
    **kwargs,
) -> tuple[defaultdict[str, float], dict[str, str]]:
    """Dijkstra's algorithm for finding the all shortest (weighted) path originating from a given
    node.

    Arguments:
        graph: Graph to find circuits in.
        start_node: Node to start the search from.
        nodes: Nodes to consider during circuit search.
        weight: Weight label to use when traversing edges.
        inherit: Whether to consider edges between children of the given nodes.
        aggregate: Whether to take the aggregate of all edge weights between two nodes as the
            distance instead of evaluating individual edges.

    Returns:
        - Distance dictionary from reached nodes to the minimum weight to get there.
        - Previous node dictionary to reconstruct paths with.
    """
    if not nodes:
        nodes = graph.leafs

    # Initialize the visited set, queue, distance dictionary and previous node dictionary
    visited: set[str] = set()

    # Initialize the distance with infinity for all nodes  expect the start node
    distance: defaultdict[str, float] = defaultdict(lambda: float("inf"))
    distance[start_node.name] = 0.0
    previous: dict[str, str] = dict()

    queue = _initialize_queue(nodes, start_node)

    # Fill in the visit list based on whether we want to aggregate edge weights or not.
    visit: Callable[
        [
            Graph,
            SortedDict,
            dict[str, str],
            defaultdict[str, float],
            str,
            str,
            float,
            bool,
            str | None,
        ],
        None,
    ] = _evaluate_edge_aggregate if aggregate else _evaluate_individual_edges

    # Main loop of the algorithm: while there are nodes in the queue, pop the node with the
    # smallest distance (e.g. remove from the queue),
    # mark the node as visited, and update the distance to the start node with the offset.
    while queue:
        source: str
        offset: float
        source, offset = queue.popitem(index=0)
        visited.add(source)
        distance[source] = offset

        # For each target node that is a neighbour of the source  node,
        # if the target not has not been visited yet, evaluate the distance to the target node via
        # the visiting function.

        for target in graph.targets_of(
            source
        ):  # here, target is a node again, redirect to the name
            target = target.name
            if target == source or target in visited:
                continue
            visit(graph, queue, previous, distance, source, target, offset, inherit, weight)

    return distance, previous

johnson

johnson(
    graph: Graph,
    nodes: list[Node] | None = None,
    names: bool = False,
    inherit: bool = True,
    **kwargs
) -> Generator[list[str] | list[Node], None, None]

Donald B. Johnson's nested circuit finding algorithm. A circuit is a cycle in a graph, circuits can overlap or contain duplicate parts.

Parameters:

Name Type Description Default
graph Graph

Graph to find circuits in.

required
nodes list[Node] | None

Nodes to consider during circuit search.

None
inherit bool

Whether to take into account (inherit) edges between children during SCC calculations.

True

Yields:

Type Description
list[str] | list[Node]

Node circuit lists.

Note

Cannot cluster a graph directly since circuits may overlap in all sorts of ways. Therefore, it gives you cluster-related information, but no clear hierarchy.

Reference

Johnson, D. B. (1975). Finding all the elementary circuits of a directed graph. In SIAM J. COMPUT (Vol. 4). https://doi.org/10.1137/0204007

Source code in src/ragraph/analysis/heuristics/_johnson.py
def johnson(
    graph: Graph,
    nodes: list[Node] | None = None,
    names: bool = False,
    inherit: bool = True,
    **kwargs,
) -> Generator[list[str] | list[Node], None, None]:
    """Donald B. Johnson's nested circuit finding algorithm. A circuit is a cycle in a
    graph, circuits can overlap or contain duplicate parts.

    Arguments:
        graph: Graph to find circuits in.
        nodes: Nodes to consider during circuit search.
        inherit: Whether to take into account (inherit) edges between children during
            SCC calculations.

    Yields:
        Node circuit lists.

    Note:
        Cannot cluster a graph directly since circuits may overlap in all sorts of ways.
        Therefore, it gives you cluster-related information, but no clear hierarchy.

    Reference:
        Johnson, D. B. (1975). Finding all the elementary circuits of a directed graph.
            In SIAM J. COMPUT (Vol. 4). https://doi.org/10.1137/0204007
    """
    if not nodes:
        nodes = graph.leafs

    # Keep original nodes around for node remapping, get a working copy for the algo.
    translation = {n.name: n for n in nodes}
    graph = graph.get_graph_slice(nodes, inherit=inherit)

    def _unblock(node: Node, blocked: set[Node], B: dict[Node, set[Node]]):
        stack = set([node])
        while stack:
            node = stack.pop()
            if node in blocked:
                blocked.remove(node)
                stack.update(B[node])
                B[node].clear()

    # List of Lists of strongly connected nodes
    sccs = tarjans_scc_algorithm(graph, nodes=graph.roots, inherit=inherit)
    while sccs:
        scc = sccs.pop()
        startnode = scc.pop()
        path = [startnode]
        blocked = set()
        closed = set()
        blocked.add(startnode)
        B: dict[Node, set[Node]] = defaultdict(set)
        stack = [(startnode, list(graph.targets_of(startnode)))]
        while stack:
            node, targets = stack[-1]
            if targets:
                target = targets.pop()
                if target == startnode:
                    if names:
                        yield [translation[n.name].name for n in path]  # Original names
                    else:
                        yield [translation[n.name] for n in path]  # Original nodes
                    closed.update(path)
                elif target not in blocked:
                    path.append(target)
                    stack.append((target, list(graph.targets_of(target))))
                    closed.discard(target)
                    blocked.add(target)
                    continue
            else:
                if node in closed:
                    _unblock(node, blocked, B)
                else:
                    for target in graph.targets_of(node):
                        if node not in B[target]:
                            B[target].add(node)
                stack.pop()
                path.pop()
        graph.del_node(node)  # type: ignore
        sccs.extend(tarjans_scc_algorithm(graph, scc, inherit))

markov_gamma

markov_gamma(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = default,
    beta: float = default,
    mu: float = default,
    gamma: float = default,
    local_buses: bool = default,
    max_iter: int = default,
    symmetrize: bool = default,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Union[List[Node], Tuple[Graph, List[Node]]]

Cluster a given graph hierarchically with buses on a local level or globally.

Source code in src/ragraph/analysis/heuristics/_markov_gamma.py
@markov_gamma_analysis
def markov_gamma(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_gamma_params["alpha"].default,  # type: ignore
    beta: float = markov_gamma_params["beta"].default,  # type: ignore
    mu: float = markov_gamma_params["mu"].default,  # type: ignore
    gamma: float = markov_gamma_params["gamma"].default,  # type: ignore
    local_buses: bool = markov_gamma_params["local_buses"].default,  # type: ignore
    max_iter: int = markov_gamma_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_gamma_params["symmetrize"].default,  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Union[List[Node], Tuple[Graph, List[Node]]]:
    """Cluster a given graph hierarchically with buses on a local level or globally."""
    assert leafs is not None
    graph, roots = hierarchical_markov(
        graph,
        root=root,
        leafs=leafs,
        inherit=inherit,
        loops=loops,
        alpha=alpha,
        beta=beta,
        mu=mu,
        edge_weights=edge_weights,
        inplace=True,
        max_iter=max_iter,
        symmetrize=symmetrize,
        names=False,
        safe=False,
    )
    check_roots = roots[:]
    checked_roots = []
    while check_roots:
        local_root = check_roots.pop()
        local_leafs = [leaf for leaf in leafs if leaf in local_root.descendants]

        # Detect bus in current root.
        bus_leafs, nonbus_leafs = gamma_bus_detection(
            graph,
            local_root,
            leafs=local_leafs,
            inherit=inherit,
            loops=loops,
            gamma=gamma,
            edge_weights=edge_weights,
            names=False,
            safe=False,
        )

        # Bus detected!
        if bus_leafs:
            # Unset local hierarchy.
            _utils.clear_local_hierarchy(graph, local_leafs, [local_root])
            local_root.children = None

            # Recalculate the nonbus hierarchy.
            graph, nonbus_roots = hierarchical_markov(
                graph,
                alpha=alpha,
                beta=beta,
                mu=mu,
                root=None,
                leafs=nonbus_leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=True,
                max_iter=max_iter,
                symmetrize=symmetrize,
                names=False,
                safe=False,
            )
            _utils.set_children(graph, local_root, nonbus_roots)

            # Recalculate the bus hierarchy.
            graph, bus_roots = hierarchical_markov(
                graph,
                alpha=alpha,
                beta=beta,
                mu=mu,
                root=None,
                leafs=bus_leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=True,
                max_iter=max_iter,
                symmetrize=symmetrize,
                names=False,
                safe=False,
            )
            local_root.children = bus_roots + local_root.children
            for bus in bus_roots:
                bus.is_bus = True

        # Finished this root.
        checked_roots.append(local_root)

        # If local: move down a level.
        if not check_roots and local_buses:
            check_roots = [child for local_root in checked_roots for child in local_root.children]
            checked_roots = []

    return graph, roots

_dijkstra

Dijkstra's algorithm

Finding the all shortest (weighted) paths originating from a given node.

_evaluate_edge_aggregate

_evaluate_edge_aggregate(
    graph: Graph,
    queue: SortedDict,
    previous: dict[str, str],
    distance: defaultdict[str, float],
    source: str,
    target: str,
    offset: float,
    inherit: bool,
    weight: str | None,
) -> None

Take the aggregated of the weight of all edges between two nodes as the distance.

Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def _evaluate_edge_aggregate(
    graph: Graph,
    queue: SortedDict,
    previous: dict[str, str],
    distance: defaultdict[str, float],
    source: str,
    target: str,
    offset: float,
    inherit: bool,
    weight: str | None,
) -> None:
    """Take the aggregated of the weight of all edges between two nodes as the distance."""
    """Arguments:
    graph: Graph to find circuits in.
    queue: Sorted Dictionary of nodes to visit
    previous: Previous node dictionary to reconstruct paths with.
    distance: Dictionary with names of reached nodes to the minimum weight to get there from the
        start node.
    source: The name of the node currently being visited.
    target: The name of the neighboring node being evaluated for distance update.
    offset: The distance from the start node to the source node.
    inherit: Whether to consider edges between children of the given nodes.
    weight: Weight label to use when traversing edges.
    """

    cur_dist: float = distance[target]

    get_weight: Callable[[Edge], float] = (
        _weight if weight is None else partial(_specific_weight, weight=weight)
    )

    alt_dist = offset + sum(
        get_weight(e) for e in graph.edges_between(source, target, inherit=inherit, loops=False)
    )
    if alt_dist < cur_dist:
        previous[target] = source
        distance[target] = alt_dist
        queue[target] = alt_dist

_evaluate_individual_edges

_evaluate_individual_edges(
    graph: Graph,
    queue: SortedDict,
    previous: dict[str, str],
    distance: defaultdict[str, float],
    source: str,
    target: str,
    offset: float,
    inherit: bool,
    weight: str | None,
) -> None

Evaluate each edge between two nodes as a separate bridge that can be crossed.

Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def _evaluate_individual_edges(
    graph: Graph,
    queue: SortedDict,
    previous: dict[str, str],
    distance: defaultdict[str, float],
    source: str,
    target: str,
    offset: float,
    inherit: bool,
    weight: str | None,
) -> None:
    """Evaluate each edge between two nodes as a separate bridge that can be crossed."""

    """Arguments:
        graph: Graph to find circuits in.
        queue: Sorted Dictionary of nodes to visit
        previous: Previous dictionary to reconstruct paths with.
        distance: Dictionary with names of reached nodes to the minimum weight to get there from
            the start node.
        source: The name of the node currently being visited.
        target: The name of the neighboring node being evaluated for distance update.
        offset: The distance from the start node to the source node.
        inherit: Whether to consider edges between children of the given nodes.
        weight: Weight label to use when traversing edges.
        """

    cur_dist: float = distance[target]
    get_weight: Callable[[Edge], float] = (
        _weight if weight is None else partial(_specific_weight, weight=weight)
    )

    for e in graph.edges_between(source, target, inherit=inherit, loops=False):
        alt_dist = offset + get_weight(e)
        if alt_dist < cur_dist:
            previous[target] = source
            distance[target] = alt_dist
            queue[target] = alt_dist

_initialize_queue

_initialize_queue(
    nodes: list[Node], start_node: Node
) -> SortedDict

Initialize the queue with the start node having a distance of 0.0 and all other nodes having a distance of infinity.

Parameters:

Name Type Description Default
nodes list[Node]

List of nodes to consider during circuit search.

required
start_node Node

Node to start the search from.

required

Returns:

Type Description
SortedDict

SortedDict with the start node having a distance of 0.0 and all other nodes having a

SortedDict

distance of infinity.

Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def _initialize_queue(nodes: list[Node], start_node: Node) -> SortedDict:
    """Initialize the queue with the start node having a distance of 0.0 and all other nodes having
    a distance of infinity.

    Arguments:
        nodes: List of nodes to consider during circuit search.
        start_node: Node to start the search from.

    Returns:
        SortedDict with the start node having a distance of 0.0 and all other nodes having a
        distance of infinity.
    """

    queue = SortedDict()
    queue[start_node.name] = 0.0
    for node in nodes:
        if node.name == start_node.name:
            continue
        queue[node.name] = float("inf")

    return queue

_specific_weight

_specific_weight(edge: Edge, weight: str) -> float

Get a specific weight from the edge's weights dictionary.

Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def _specific_weight(edge: Edge, weight: str) -> float:
    """Get a specific weight from the edge's weights dictionary."""
    return edge.weights[weight]

_weight

_weight(edge: Edge) -> float

Get the default weight from the edge's weights dictionary.

Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def _weight(edge: Edge) -> float:
    """Get the default weight from the edge's weights dictionary."""
    return edge.weight

dijkstra

dijkstra(
    graph: Graph,
    start_node: Node,
    nodes: list[Node] | None = None,
    weight: str | None = None,
    inherit: bool = False,
    aggregate: bool = False,
    **kwargs
) -> tuple[defaultdict[str, float], dict[str, str]]

Dijkstra's algorithm for finding the all shortest (weighted) path originating from a given node.

Parameters:

Name Type Description Default
graph Graph

Graph to find circuits in.

required
start_node Node

Node to start the search from.

required
nodes list[Node] | None

Nodes to consider during circuit search.

None
weight str | None

Weight label to use when traversing edges.

None
inherit bool

Whether to consider edges between children of the given nodes.

False
aggregate bool

Whether to take the aggregate of all edge weights between two nodes as the distance instead of evaluating individual edges.

False

Returns:

Type Description
defaultdict[str, float]
  • Distance dictionary from reached nodes to the minimum weight to get there.
dict[str, str]
  • Previous node dictionary to reconstruct paths with.
Source code in src/ragraph/analysis/heuristics/_dijkstra.py
def dijkstra(
    graph: Graph,
    start_node: Node,
    nodes: list[Node] | None = None,
    weight: str | None = None,
    inherit: bool = False,
    aggregate: bool = False,
    **kwargs,
) -> tuple[defaultdict[str, float], dict[str, str]]:
    """Dijkstra's algorithm for finding the all shortest (weighted) path originating from a given
    node.

    Arguments:
        graph: Graph to find circuits in.
        start_node: Node to start the search from.
        nodes: Nodes to consider during circuit search.
        weight: Weight label to use when traversing edges.
        inherit: Whether to consider edges between children of the given nodes.
        aggregate: Whether to take the aggregate of all edge weights between two nodes as the
            distance instead of evaluating individual edges.

    Returns:
        - Distance dictionary from reached nodes to the minimum weight to get there.
        - Previous node dictionary to reconstruct paths with.
    """
    if not nodes:
        nodes = graph.leafs

    # Initialize the visited set, queue, distance dictionary and previous node dictionary
    visited: set[str] = set()

    # Initialize the distance with infinity for all nodes  expect the start node
    distance: defaultdict[str, float] = defaultdict(lambda: float("inf"))
    distance[start_node.name] = 0.0
    previous: dict[str, str] = dict()

    queue = _initialize_queue(nodes, start_node)

    # Fill in the visit list based on whether we want to aggregate edge weights or not.
    visit: Callable[
        [
            Graph,
            SortedDict,
            dict[str, str],
            defaultdict[str, float],
            str,
            str,
            float,
            bool,
            str | None,
        ],
        None,
    ] = _evaluate_edge_aggregate if aggregate else _evaluate_individual_edges

    # Main loop of the algorithm: while there are nodes in the queue, pop the node with the
    # smallest distance (e.g. remove from the queue),
    # mark the node as visited, and update the distance to the start node with the offset.
    while queue:
        source: str
        offset: float
        source, offset = queue.popitem(index=0)
        visited.add(source)
        distance[source] = offset

        # For each target node that is a neighbour of the source  node,
        # if the target not has not been visited yet, evaluate the distance to the target node via
        # the visiting function.

        for target in graph.targets_of(
            source
        ):  # here, target is a node again, redirect to the name
            target = target.name
            if target == source or target in visited:
                continue
            visit(graph, queue, previous, distance, source, target, offset, inherit, weight)

    return distance, previous

_johnson

Donald B. Johnson's nested circuit finding algorithm

johnson

johnson(
    graph: Graph,
    nodes: list[Node] | None = None,
    names: bool = False,
    inherit: bool = True,
    **kwargs
) -> Generator[list[str] | list[Node], None, None]

Donald B. Johnson's nested circuit finding algorithm. A circuit is a cycle in a graph, circuits can overlap or contain duplicate parts.

Parameters:

Name Type Description Default
graph Graph

Graph to find circuits in.

required
nodes list[Node] | None

Nodes to consider during circuit search.

None
inherit bool

Whether to take into account (inherit) edges between children during SCC calculations.

True

Yields:

Type Description
list[str] | list[Node]

Node circuit lists.

Note

Cannot cluster a graph directly since circuits may overlap in all sorts of ways. Therefore, it gives you cluster-related information, but no clear hierarchy.

Reference

Johnson, D. B. (1975). Finding all the elementary circuits of a directed graph. In SIAM J. COMPUT (Vol. 4). https://doi.org/10.1137/0204007

Source code in src/ragraph/analysis/heuristics/_johnson.py
def johnson(
    graph: Graph,
    nodes: list[Node] | None = None,
    names: bool = False,
    inherit: bool = True,
    **kwargs,
) -> Generator[list[str] | list[Node], None, None]:
    """Donald B. Johnson's nested circuit finding algorithm. A circuit is a cycle in a
    graph, circuits can overlap or contain duplicate parts.

    Arguments:
        graph: Graph to find circuits in.
        nodes: Nodes to consider during circuit search.
        inherit: Whether to take into account (inherit) edges between children during
            SCC calculations.

    Yields:
        Node circuit lists.

    Note:
        Cannot cluster a graph directly since circuits may overlap in all sorts of ways.
        Therefore, it gives you cluster-related information, but no clear hierarchy.

    Reference:
        Johnson, D. B. (1975). Finding all the elementary circuits of a directed graph.
            In SIAM J. COMPUT (Vol. 4). https://doi.org/10.1137/0204007
    """
    if not nodes:
        nodes = graph.leafs

    # Keep original nodes around for node remapping, get a working copy for the algo.
    translation = {n.name: n for n in nodes}
    graph = graph.get_graph_slice(nodes, inherit=inherit)

    def _unblock(node: Node, blocked: set[Node], B: dict[Node, set[Node]]):
        stack = set([node])
        while stack:
            node = stack.pop()
            if node in blocked:
                blocked.remove(node)
                stack.update(B[node])
                B[node].clear()

    # List of Lists of strongly connected nodes
    sccs = tarjans_scc_algorithm(graph, nodes=graph.roots, inherit=inherit)
    while sccs:
        scc = sccs.pop()
        startnode = scc.pop()
        path = [startnode]
        blocked = set()
        closed = set()
        blocked.add(startnode)
        B: dict[Node, set[Node]] = defaultdict(set)
        stack = [(startnode, list(graph.targets_of(startnode)))]
        while stack:
            node, targets = stack[-1]
            if targets:
                target = targets.pop()
                if target == startnode:
                    if names:
                        yield [translation[n.name].name for n in path]  # Original names
                    else:
                        yield [translation[n.name] for n in path]  # Original nodes
                    closed.update(path)
                elif target not in blocked:
                    path.append(target)
                    stack.append((target, list(graph.targets_of(target))))
                    closed.discard(target)
                    blocked.add(target)
                    continue
            else:
                if node in closed:
                    _unblock(node, blocked, B)
                else:
                    for target in graph.targets_of(node):
                        if node not in B[target]:
                            B[target].add(node)
                stack.pop()
                path.pop()
        graph.del_node(node)  # type: ignore
        sccs.extend(tarjans_scc_algorithm(graph, scc, inherit))

_markov_gamma

Hierarchical Markov Clustering (HMC) with Gamma bus detection.

markov_gamma

markov_gamma(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = default,
    beta: float = default,
    mu: float = default,
    gamma: float = default,
    local_buses: bool = default,
    max_iter: int = default,
    symmetrize: bool = default,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Union[List[Node], Tuple[Graph, List[Node]]]

Cluster a given graph hierarchically with buses on a local level or globally.

Source code in src/ragraph/analysis/heuristics/_markov_gamma.py
@markov_gamma_analysis
def markov_gamma(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    alpha: int = markov_gamma_params["alpha"].default,  # type: ignore
    beta: float = markov_gamma_params["beta"].default,  # type: ignore
    mu: float = markov_gamma_params["mu"].default,  # type: ignore
    gamma: float = markov_gamma_params["gamma"].default,  # type: ignore
    local_buses: bool = markov_gamma_params["local_buses"].default,  # type: ignore
    max_iter: int = markov_gamma_params["max_iter"].default,  # type: ignore
    symmetrize: bool = markov_gamma_params["symmetrize"].default,  # type: ignore
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Union[List[Node], Tuple[Graph, List[Node]]]:
    """Cluster a given graph hierarchically with buses on a local level or globally."""
    assert leafs is not None
    graph, roots = hierarchical_markov(
        graph,
        root=root,
        leafs=leafs,
        inherit=inherit,
        loops=loops,
        alpha=alpha,
        beta=beta,
        mu=mu,
        edge_weights=edge_weights,
        inplace=True,
        max_iter=max_iter,
        symmetrize=symmetrize,
        names=False,
        safe=False,
    )
    check_roots = roots[:]
    checked_roots = []
    while check_roots:
        local_root = check_roots.pop()
        local_leafs = [leaf for leaf in leafs if leaf in local_root.descendants]

        # Detect bus in current root.
        bus_leafs, nonbus_leafs = gamma_bus_detection(
            graph,
            local_root,
            leafs=local_leafs,
            inherit=inherit,
            loops=loops,
            gamma=gamma,
            edge_weights=edge_weights,
            names=False,
            safe=False,
        )

        # Bus detected!
        if bus_leafs:
            # Unset local hierarchy.
            _utils.clear_local_hierarchy(graph, local_leafs, [local_root])
            local_root.children = None

            # Recalculate the nonbus hierarchy.
            graph, nonbus_roots = hierarchical_markov(
                graph,
                alpha=alpha,
                beta=beta,
                mu=mu,
                root=None,
                leafs=nonbus_leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=True,
                max_iter=max_iter,
                symmetrize=symmetrize,
                names=False,
                safe=False,
            )
            _utils.set_children(graph, local_root, nonbus_roots)

            # Recalculate the bus hierarchy.
            graph, bus_roots = hierarchical_markov(
                graph,
                alpha=alpha,
                beta=beta,
                mu=mu,
                root=None,
                leafs=bus_leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=True,
                max_iter=max_iter,
                symmetrize=symmetrize,
                names=False,
                safe=False,
            )
            local_root.children = bus_roots + local_root.children
            for bus in bus_roots:
                bus.is_bus = True

        # Finished this root.
        checked_roots.append(local_root)

        # If local: move down a level.
        if not check_roots and local_buses:
            check_roots = [child for local_root in checked_roots for child in local_root.children]
            checked_roots = []

    return graph, roots