Skip to content

ragraph.analysis.sequence

Sequencing algorithms

Sequencing algorithms search for sequences of nodes according to some objective or logic. Simple are sorting by name and more elaborate examples sort according to Markov transition probabilities or sequences without feedback (edges to earlier nodes).

Available algorithms

  • markov sequencing based on Markov transition probabilities between nodes.
  • scc_tearing efficient algorithm to sort the "trivial" part of sequencing quickly by putting the Strongly Connected Components (largest possible cycles) in a non-feedback order and then "tears" iteratively according to a tearing indicator.
  • name just sorts nodes by their name.
  • tarjans_dfs Tarjan's Depth First Search algorithm. Efficient sorting algorithm for Directed Acyclic Graphs (DAG).
  • axis Typical Multi-Domain Matrix axis sorting method.

Metrics

There are several metrics from literature available to grade sequences as well. The metrics are documented over at metrics.

Utilities

Finally, there are some utilities like branch-sorting (recursively sorting all branches in a hierarchical tree instead of all leaf nodes as a whole) available in utils:

  • branchsort Recursively sort children within a branch or hierarchy using any of the above.

axis

axis(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
)

docstring stub

Source code in ragraph/analysis/sequence/_axis.py
@axis_sequencing_analysis
def axis(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
):
    """docstring stub"""
    seq = get_axis_sequence(nodes, kinds, sort_by_bus, sort_by_width)  # type: ignore
    return graph, seq

branchsort

branchsort(
    graph: Graph,
    algo: Callable,
    algo_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    recurse: bool = True,
    names: bool = False,
    safe: bool = True,
) -> Tuple[Graph, List[Node], List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/utils.py
@branchsort_analysis
def branchsort(
    graph: Graph,
    algo: Callable,
    algo_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    recurse: bool = True,
    names: bool = False,
    safe: bool = True,
) -> Tuple[Graph, List[Node], List[Node]]:
    """docstring stub"""
    # Sort roots
    assert isinstance(algo_args, dict)
    graph, node_sequence = algo(
        graph=graph,
        root=None,
        nodes=nodes,
        inherit=inherit,
        loops=loops,
        edge_weights=edge_weights,
        inplace=inplace,
        **algo_args,
    )
    assert node_sequence

    # Start with empty leaf sequence.
    leaf_sequence: List[Node] = []

    # Traverse branches
    for node in node_sequence:
        branchsort_analysis.log(f"Traversing '{node.name}'...")

        # Nothing to do for leaf nodes or nodes that should be treated as leafs.
        if node.is_leaf or node in leafs:  # type: ignore
            branchsort_analysis.log(f"Skipped leaf node '{node.name}'.")
            leaf_sequence.append(node)
            continue

        # Recursive case, get sorted children as the sorted "roots" of recursive call.
        if recurse:
            branchsort_analysis.log(f"Recursing into '{node.name}'...")
            _, _child_sequence, _leaf_sequence = branchsort(
                algo=algo,
                algo_args=algo_args,
                graph=graph,
                root=node,
                leafs=leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=inplace,
                recurse=recurse,
            )
            leaf_sequence.extend(_leaf_sequence)
            branchsort_analysis.log(
                f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}."
            )

        # Non-recursive case, get sorted children in this call.
        else:
            branchsort_analysis.log("Non-recursive call to algorithm...")
            _, _leaf_sequence = algo(
                graph=graph,
                root=node,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=inplace,
                **algo_args,
            )
            leaf_sequence.extend(_leaf_sequence)
            branchsort_analysis.log(
                f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}."
            )

    return graph, node_sequence, leaf_sequence

genetic

genetic(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    evaluator: Optional[str] = default,
    n_chromosomes: Optional[int] = default,
    n_generations: Optional[int] = default,
    p_crossover: Optional[float] = default,
    p_mutation: Optional[float] = default,
    p_swap: Optional[float] = default,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_genetic.py
@genetic_analysis
def genetic(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    evaluator: Optional[str] = params["evaluator"].default,
    n_chromosomes: Optional[int] = params["n_chromosomes"].default,
    n_generations: Optional[int] = params["n_generations"].default,
    p_crossover: Optional[float] = params["p_crossover"].default,
    p_mutation: Optional[float] = params["p_mutation"].default,
    p_swap: Optional[float] = params["p_swap"].default,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    lineage = genetic_sequencing(
        graph,
        nodes,  # type: ignore
        n_chromosomes,  # type: ignore
        n_generations,  # type: ignore
        1,
        p_crossover,  # type: ignore
        p_mutation,  # type: ignore
        p_swap,  # type: ignore
        n_records=1,
        inherit=inherit,
        edge_weights=edge_weights,
    )

    best = lineage.hall_of_fame.chromosomes[0].genes

    seq = [nodes[i] for i in best]  # type: ignore
    return graph, seq

markov

markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inf: float = default,
    dep: float = default,
    mu: float = default,
    scale: bool = default,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_markov.py
@markov_sequencing_analysis
def markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inf: float = markov_params["inf"].default,  # type: ignore
    dep: float = markov_params["dep"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    scale: bool = markov_params["scale"].default,  # type: ignore
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    assert nodes is not None
    adj = graph.get_adjacency_matrix(nodes=nodes, inherit=inherit, loops=loops, only=edge_weights)
    assert isinstance(adj, np.ndarray)

    # Obtain node penalty scores.
    penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu, scale=scale)

    # Calculate final sequence.
    idxs = np.argsort(penalties)

    # Reorder nodes
    seq = [nodes[i] for i in idxs]

    return graph, seq

name

name(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

Sequence nodes by node name.

Parameters:

Name Type Description Default
graph Graph

Graph to sequence nodes of.

required
nodes Optional[Union[List[str], List[Node]]]

Nodes to sequence.

None

Returns:

Type Description
Tuple[Graph, List[Node]]

Sequenced nodes.

Source code in ragraph/analysis/sequence/_name.py
@sort_by_name_analysis
def name(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """Sequence nodes by node name.

    Arguments:
        graph: Graph to sequence nodes of.
        nodes: Nodes to sequence.

    Returns:
        Sequenced nodes.
    """
    return graph, sorted(nodes, key=lambda n: n.name)

scc_tearing

scc_tearing(
    graph: Graph,
    decision: Callable[[Graph, List[Node]], Node],
    decision_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_scc_tearing.py
@scc_tearing_analysis
def scc_tearing(
    graph: Graph,
    decision: Callable[[Graph, List[Node]], Node],
    decision_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    sequence = scc_tearing_algorithm(
        graph, nodes, inherit, loops, decision, decision_args  # type: ignore
    )
    seq = list(sequence)

    return graph, seq

tarjans_dfs

tarjans_dfs(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_tarjan.py
@tarjans_dfs_analysis
def tarjans_dfs(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    assert nodes
    out = []
    unmarked = [n for n in nodes if isinstance(n, Node)]
    state = {n: "todo" for n in nodes}

    targets_of: Dict[Node, List[Node]] = {
        source: [
            target
            for target in unmarked
            if any(graph.edges_between(source, target, inherit=inherit, loops=False))
        ]
        for source in unmarked
    }

    def visit(n: Node):
        s = state[n]
        if s == "done":
            return
        if s == "temp":
            raise ValueError("Not a Directed Acyclic Graph (DAG).")
        state[n] = "temp"
        for m in targets_of[n]:
            visit(m)
        state[n] = "done"
        out.append(n)

    while unmarked:
        node = unmarked.pop()
        if state[node] == "done":
            continue
        visit(node)

    return graph, out[::-1]

_axis

Sort nodes in a typical axis-ready format.

axis

axis(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
)

docstring stub

Source code in ragraph/analysis/sequence/_axis.py
@axis_sequencing_analysis
def axis(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
):
    """docstring stub"""
    seq = get_axis_sequence(nodes, kinds, sort_by_bus, sort_by_width)  # type: ignore
    return graph, seq

get_axis_sequence

1
2
3
4
5
6
7
8
9
get_axis_sequence(
    nodes: Iterable[Node],
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
    width_cache: Optional[Dict[Node, int]] = None,
) -> List[Node]

Get nodes in a plot-axis-ready order.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Axis nodes to order.

required
kinds Optional[List[str]]

Optional order of node kinds.

None
sort_by_bus bool

Whether to put bus nodes first.

True
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

True
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache.

None
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
List[Node]

Ordered nodes.

Note

Axis ready means ordered by kind and buses nodes first where possible. Any supplied cache dictionary will be updated in-place.

Warning

When supplying node kinds it's your own responsibility to make sure that each node's kind actually is included in that node kind order.

Source code in ragraph/analysis/sequence/_axis.py
def get_axis_sequence(
    nodes: Iterable[Node],
    kinds: Optional[List[str]] = None,
    sort_by_bus: bool = True,
    sort_by_width: bool = True,
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
    width_cache: Optional[Dict[Node, int]] = None,
) -> List[Node]:
    """Get nodes in a plot-axis-ready order.

    Arguments:
        nodes: Axis nodes to order.
        kinds: Optional order of node kinds.
        sort_by_bus: Whether to put bus nodes first.
        sort_by_width: Whether to sort nodes by width in terms of leaf nodes.
        root_cache: Node to resulting root node cache. Any supplied dictionary
            will be updated in-place.
        child_cache: Node to child nodes that have been seen cache.
        width_cache: Node to node width cache.

    Returns:
        Ordered nodes.

    Note:
        Axis ready means ordered by kind and buses nodes first where possible.
        Any supplied cache dictionary will be updated in-place.

    Warning:
        When supplying node kinds it's your own responsibility to make sure
        that each node's kind actually is included in that node kind order.
    """
    root_cache = dict() if root_cache is None else root_cache
    child_cache = dict() if child_cache is None else child_cache
    width_cache = dict() if width_cache is None else width_cache

    kinds = get_kinds(nodes) if kinds is None else kinds
    kind_order = {kind: position for position, kind in enumerate(kinds)}

    roots = get_roots(nodes, root_cache, child_cache)
    roots.sort(key=lambda x: get_root_key(x, kind_order, sort_by_width, width_cache))

    # Sort by is_bus status with buses first, width second, if applicable.
    if sort_by_bus or sort_by_width:
        for parent in child_cache.keys():
            child_cache[parent].sort(
                key=lambda x: get_sibling_key(x, sort_by_bus, sort_by_width, width_cache)
            )

    # Complement width_cache for roots for completeness sake.
    if sort_by_width:
        for root in roots:
            width_cache[root] = sum(
                width_cache.get(child, 1) for child in child_cache.get(root, [])
            )

    leafs = [leaf for root in roots for leaf in get_leafs(root, child_cache)]
    return leafs

get_kinds

get_kinds(nodes: Iterable[Node]) -> List[str]

Get node kinds in order of occurrence.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Nodes to iterate over.

required

Returns:

Type Description
List[str]

List of Node kinds.

Source code in ragraph/analysis/sequence/_axis.py
def get_kinds(nodes: Iterable[Node]) -> List[str]:
    """Get node kinds in order of occurrence.

    Arguments:
        nodes: Nodes to iterate over.

    Returns:
        List of Node kinds.
    """
    ordered = []
    seen = set()
    for node in nodes:
        kind = node.kind
        if kind not in seen:
            ordered.append(kind)
            seen.add(kind)
    return ordered

get_leafs

1
2
3
get_leafs(
    parent: Node, child_cache: Dict[Node, List[Node]]
) -> List[Node]

Get leaf nodes from a linked-list style child_cache (e.g. parent to children).

Parameters:

Name Type Description Default
parent Node

Parent node.

required
child_cache Dict[Node, List[Node]]

Linked-list style parent-children dictionary.

required

Returns:

Type Description
List[Node]

Displayed leaf nodes as they occur under this parent.

Source code in ragraph/analysis/sequence/_axis.py
def get_leafs(parent: Node, child_cache: Dict[Node, List[Node]]) -> List[Node]:
    """Get leaf nodes from a linked-list style child_cache (e.g. parent to children).

    Arguments:
        parent: Parent node.
        child_cache: Linked-list style parent-children dictionary.

    Returns:
        Displayed leaf nodes as they occur under this parent.
    """
    # Handle the case where this node is actually a leaf node.
    if parent not in child_cache:
        return [parent]

    # Fetch leaf nodes from deeper into the cached hierarchy.
    leafs = []
    for child in child_cache[parent]:
        leafs.extend(get_leafs(child, child_cache))
    return leafs

get_root

1
2
3
4
5
get_root(
    node: Node,
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
) -> Node

Get root node of a node.

Parameters:

Name Type Description Default
node Node

Nodes to find corresponding root nodes for.

required
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place.

None

Returns:

Type Description
Node

Root node.

Note

Supply cache dictionaries to store intermediate results.

Source code in ragraph/analysis/sequence/_axis.py
def get_root(
    node: Node,
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
) -> Node:
    """Get root node of a node.

    Arguments:
        node: Nodes to find corresponding root nodes for.
        root_cache: Node to resulting root node cache. Any supplied dictionary
            will be updated in-place.
        child_cache: Node to child nodes that have been seen cache. Any supplied
            dictionary will be updated in-place.

    Returns:
        Root node.

    Note:
        Supply cache dictionaries to store intermediate results.
    """

    # Cache from any seen node to the resulting root node.
    root_cache = dict() if root_cache is None else root_cache

    # Cache from any parent we came across to the included children in the search.
    child_cache = dict() if child_cache is None else child_cache

    # Cached, return cache value.
    if node in root_cache:
        return root_cache[node]

    # Is a root, return.
    if node.parent is None:
        return node

    # Find and update caches accordingly.
    if node.parent not in child_cache:
        child_cache[node.parent] = [node]
    else:
        child_cache[node.parent].append(node)
    root = get_root(node.parent, root_cache, child_cache)
    root_cache[node] = root

    return root

get_root_key

1
2
3
4
5
6
get_root_key(
    node: Node,
    kind_order: Dict[str, int],
    sort_by_width: bool,
    width_cache: Optional[Dict[Node, int]] = None,
) -> Union[int, Tuple[int, int]]

Calculate a sorting score for root nodes.

Parameters:

Name Type Description Default
node Node

Root node to obtain a score for.

required
kind_order Dict[str, int]

Node kind to position dictionary.

required
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

required
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
Union[int, Tuple[int, int]]

Root node sorting key.

Note

Sort by kinds first, width in terms of actual (not display) leaf nodes second.

Source code in ragraph/analysis/sequence/_axis.py
def get_root_key(
    node: Node,
    kind_order: Dict[str, int],
    sort_by_width: bool,
    width_cache: Optional[Dict[Node, int]] = None,
) -> Union[int, Tuple[int, int]]:
    """Calculate a sorting score for root nodes.

    Arguments:
        node: Root node to obtain a score for.
        kind_order: Node kind to position dictionary.
        sort_by_width: Whether to sort nodes by width in terms of leaf nodes.
        width_cache: Node to node width cache.

    Returns:
        Root node sorting key.

    Note:
        Sort by kinds first, width in terms of actual (not display) leaf nodes second.
    """
    if sort_by_width:
        return (
            kind_order[node.kind],
            -get_width(node, width_cache),
        )
    else:
        return kind_order[node.kind]

get_roots

1
2
3
4
5
get_roots(
    nodes: Iterable[Node],
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
) -> List[Node]

Get list of root nodes corresponding to some collection of nodes.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Collection of nodes to find corresponding root nodes for.

required
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place.

None

Returns:

Type Description
List[Node]

Root nodes.

Note

Supply cache dictionaries to store intermediate results.

Source code in ragraph/analysis/sequence/_axis.py
def get_roots(
    nodes: Iterable[Node],
    root_cache: Optional[Dict[Node, Node]] = None,
    child_cache: Optional[Dict[Node, List[Node]]] = None,
) -> List[Node]:
    """Get list of root nodes corresponding to some collection of nodes.

    Arguments:
        nodes: Collection of nodes to find corresponding root nodes for.
        root_cache: Node to resulting root node cache. Any supplied dictionary
            will be updated in-place.
        child_cache: Node to child nodes that have been seen cache. Any supplied
            dictionary will be updated in-place.

    Returns:
        Root nodes.

    Note:
        Supply cache dictionaries to store intermediate results.
    """
    # Cache from any seen node to the resulting root node.
    root_cache = dict() if root_cache is None else root_cache

    # Cache from any parent we came across to the included children in the search.
    child_cache = dict() if child_cache is None else child_cache

    seen = set()  # Seen root nodes.
    ordered = []  # Seen root nodes in order of appearance.
    for node in nodes:
        root = get_root(node, root_cache, child_cache)
        if root not in seen:
            seen.add(root)
            ordered.append(root)
    return ordered

get_sibling_key

1
2
3
4
5
6
get_sibling_key(
    node: Node,
    sort_by_bus: bool,
    sort_by_width: bool,
    width_cache: Optional[Dict[Node, int]] = None,
) -> Union[int, Tuple[int, int]]

Calculate a sorting score for sibling nodes. Lower scores go first.

Parameters:

Name Type Description Default
node Node

Node to score.

required
sort_by_bus bool

Whether to put bus nodes first.

required
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

required
width_cache Optional[Dict[Node, int]]

Dictionary of nodes to displayed children.

None

Returns:

Type Description
Union[int, Tuple[int, int]]

Sibling node sorting score.

Note

Bus nodes first, width in terms of actual (not display) leaf nodes second.

Source code in ragraph/analysis/sequence/_axis.py
def get_sibling_key(
    node: Node,
    sort_by_bus: bool,
    sort_by_width: bool,
    width_cache: Optional[Dict[Node, int]] = None,
) -> Union[int, Tuple[int, int]]:
    """Calculate a sorting score for sibling nodes. Lower scores go first.

    Arguments:
        node: Node to score.
        sort_by_bus: Whether to put bus nodes first.
        sort_by_width: Whether to sort nodes by width in terms of leaf nodes.
        width_cache: Dictionary of nodes to displayed children.

    Returns:
        Sibling node sorting score.

    Note:
        Bus nodes first, width in terms of actual (not display) leaf nodes second."""
    if sort_by_bus and sort_by_width:
        return (not node.is_bus, -get_width(node, width_cache))
    elif sort_by_bus:
        return not node.is_bus
    elif sort_by_width:
        return -get_width(node, width_cache)
    else:
        return 0

get_width

1
2
3
4
get_width(
    node: Node,
    width_cache: Optional[Dict[Node, int]] = None,
) -> int

Get width of a node and update width cache in-place.

Parameters:

Name Type Description Default
node Node

Node to get the width of.

required
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
int

Node width.

Source code in ragraph/analysis/sequence/_axis.py
def get_width(node: Node, width_cache: Optional[Dict[Node, int]] = None) -> int:
    """Get width of a node and update width cache in-place.

    Arguments:
        node: Node to get the width of.
        width_cache: Node to node width cache.

    Returns:
        Node width.
    """
    width_cache = dict() if width_cache is None else width_cache
    if node not in width_cache:
        if node.children:
            width_cache[node] = sum(get_width(child, width_cache) for child in node.children)
        else:
            width_cache[node] = 1
    return width_cache[node]

_genetic

Genetic algorithms for sequencing purposes.

genetic

genetic(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    evaluator: Optional[str] = default,
    n_chromosomes: Optional[int] = default,
    n_generations: Optional[int] = default,
    p_crossover: Optional[float] = default,
    p_mutation: Optional[float] = default,
    p_swap: Optional[float] = default,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_genetic.py
@genetic_analysis
def genetic(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    evaluator: Optional[str] = params["evaluator"].default,
    n_chromosomes: Optional[int] = params["n_chromosomes"].default,
    n_generations: Optional[int] = params["n_generations"].default,
    p_crossover: Optional[float] = params["p_crossover"].default,
    p_mutation: Optional[float] = params["p_mutation"].default,
    p_swap: Optional[float] = params["p_swap"].default,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    lineage = genetic_sequencing(
        graph,
        nodes,  # type: ignore
        n_chromosomes,  # type: ignore
        n_generations,  # type: ignore
        1,
        p_crossover,  # type: ignore
        p_mutation,  # type: ignore
        p_swap,  # type: ignore
        n_records=1,
        inherit=inherit,
        edge_weights=edge_weights,
    )

    best = lineage.hall_of_fame.chromosomes[0].genes

    seq = [nodes[i] for i in best]  # type: ignore
    return graph, seq

genetic_sequencing

genetic_sequencing(
    graph: Graph,
    nodes: List[Node],
    n_chromosomes: int,
    n_generations: int,
    n_hall_of_fame: int,
    p_crossover: float,
    p_mutation: float,
    p_swap: float,
    evaluator: str = "feedback_distance",
    n_records: Optional[int] = None,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
) -> Lineage

Genetic sequencing of nodes in a graph.

Parameters:

Name Type Description Default
graph Graph

Graph holding data.

required
nodes List[Node]

Nodes to sequence.

required
evaluator str

Evaluation method to use. One of "feedback_distance", "feedback_marks", or "lower_left_distance".

'feedback_distance'
n_chromosomes int

Number of chromosomes in each generation.

required
n_generations int

Number of generations to simulate.

required
n_hall_of_fame int

Hall of Fame size of best performing chromosomes.

required
p_crossover float

Probability for a pair to be subjected to crossover.

required
p_mutation float

Probability for each chromosome to be subjected to mutation.

required
p_swap float

Probability for each gene to be swapped with another during mutation.

required
n_records Optional[int]

Number of generation records to keep.

None
inherit bool

Whether to inherit edges between children when getting the adjacency matrix.

True
edge_weights Optional[List[str]]

Edge weights to consider when getting the adjacency matrix.

None

Returns:

Type Description
Lineage

Lineage object containing generations of chromosomes, generation records and

Lineage

a hall of fame of best performing chromosomes.

Source code in ragraph/analysis/sequence/_genetic.py
def genetic_sequencing(
    graph: Graph,
    nodes: List[Node],
    n_chromosomes: int,
    n_generations: int,
    n_hall_of_fame: int,
    p_crossover: float,
    p_mutation: float,
    p_swap: float,
    evaluator: str = "feedback_distance",
    n_records: Optional[int] = None,
    inherit: bool = True,
    edge_weights: Optional[List[str]] = None,
) -> Lineage:
    """Genetic sequencing of nodes in a graph.

    Arguments:
        graph: Graph holding data.
        nodes: Nodes to sequence.
        evaluator: Evaluation method to use. One of "feedback_distance",
            "feedback_marks", or "lower_left_distance".
        n_chromosomes: Number of chromosomes in each generation.
        n_generations: Number of generations to simulate.
        n_hall_of_fame: Hall of Fame size of best performing chromosomes.
        p_crossover: Probability for a pair to be subjected to crossover.
        p_mutation: Probability for each chromosome to be subjected to mutation.
        p_swap: Probability for each gene to be swapped with another during mutation.
        n_records: Number of generation records to keep.
        inherit: Whether to inherit edges between children when getting the adjacency
            matrix.
        edge_weights: Edge weights to consider when getting the adjacency matrix.

    Returns:
        Lineage object containing generations of chromosomes, generation records and
        a hall of fame of best performing chromosomes.
    """
    matrix = graph.get_adjacency_matrix(nodes, inherit=inherit, only=edge_weights)
    vec = [x for row in matrix for x in row]

    ekinds = dict(
        feedback_distance=EvaluatorKinds.FeedbackDistance,
        feedback_marks=EvaluatorKinds.FeedbackMarks,
        lower_left_distance=EvaluatorKinds.LowerLeftDistance,
    )
    evaluator = EvaluatorMatrix(ekinds[evaluator], vec, 1)

    settings = SequencingSettings(
        n_genes=len(nodes),
        p_crossover=p_crossover,
        p_mutation=p_mutation,
        n_chromosomes=n_chromosomes,
        n_generations=n_generations,
        n_records=n_records,
        n_hall_of_fame=n_hall_of_fame,
    )

    lineage = sequence_sga(
        settings,
        GeneratorKinds.RandomSequence,
        evaluator,
        RecorderKinds.FitnessStatistics,
        SelectorKinds.Roulette,
        CrossoverKinds.IPX,
        MutatorSwap(p_swap),
        ConvergenceKinds.Never,
    )

    return lineage

_markov

Sorting algorithm based on the node dependency/influence in a Markov chain.

markov

markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inf: float = default,
    dep: float = default,
    mu: float = default,
    scale: bool = default,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_markov.py
@markov_sequencing_analysis
def markov(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inf: float = markov_params["inf"].default,  # type: ignore
    dep: float = markov_params["dep"].default,  # type: ignore
    mu: float = markov_params["mu"].default,  # type: ignore
    scale: bool = markov_params["scale"].default,  # type: ignore
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    assert nodes is not None
    adj = graph.get_adjacency_matrix(nodes=nodes, inherit=inherit, loops=loops, only=edge_weights)
    assert isinstance(adj, np.ndarray)

    # Obtain node penalty scores.
    penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu, scale=scale)

    # Calculate final sequence.
    idxs = np.argsort(penalties)

    # Reorder nodes
    seq = [nodes[i] for i in idxs]

    return graph, seq

_name

Sequence nodes by name.

name

name(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

Sequence nodes by node name.

Parameters:

Name Type Description Default
graph Graph

Graph to sequence nodes of.

required
nodes Optional[Union[List[str], List[Node]]]

Nodes to sequence.

None

Returns:

Type Description
Tuple[Graph, List[Node]]

Sequenced nodes.

Source code in ragraph/analysis/sequence/_name.py
@sort_by_name_analysis
def name(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """Sequence nodes by node name.

    Arguments:
        graph: Graph to sequence nodes of.
        nodes: Nodes to sequence.

    Returns:
        Sequenced nodes.
    """
    return graph, sorted(nodes, key=lambda n: n.name)

_scc_tearing

A sequencing algorithm for cyclic graphs that tears cycles based on a metric (penalty) function. By default we use the metric that the Markov sequencing algorithm uses to sort all nodes straight away.

scc_tearing

scc_tearing(
    graph: Graph,
    decision: Callable[[Graph, List[Node]], Node],
    decision_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_scc_tearing.py
@scc_tearing_analysis
def scc_tearing(
    graph: Graph,
    decision: Callable[[Graph, List[Node]], Node],
    decision_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    sequence = scc_tearing_algorithm(
        graph, nodes, inherit, loops, decision, decision_args  # type: ignore
    )
    seq = list(sequence)

    return graph, seq

_tarjan

Tarjan's Depth First Search Algorithm.

tarjans_dfs

tarjans_dfs(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs
) -> Tuple[Graph, List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/_tarjan.py
@tarjans_dfs_analysis
def tarjans_dfs(
    graph: Graph,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[str], List[Node]]] = None,
    inherit: bool = True,
    loops: bool = False,
    edge_weights: Optional[List[str]] = None,
    inplace: bool = True,
    names: bool = False,
    safe: bool = True,
    **kwargs,
) -> Tuple[Graph, List[Node]]:
    """docstring stub"""
    assert nodes
    out = []
    unmarked = [n for n in nodes if isinstance(n, Node)]
    state = {n: "todo" for n in nodes}

    targets_of: Dict[Node, List[Node]] = {
        source: [
            target
            for target in unmarked
            if any(graph.edges_between(source, target, inherit=inherit, loops=False))
        ]
        for source in unmarked
    }

    def visit(n: Node):
        s = state[n]
        if s == "done":
            return
        if s == "temp":
            raise ValueError("Not a Directed Acyclic Graph (DAG).")
        state[n] = "temp"
        for m in targets_of[n]:
            visit(m)
        state[n] = "done"
        out.append(n)

    while unmarked:
        node = unmarked.pop()
        if state[node] == "done":
            continue
        visit(node)

    return graph, out[::-1]

metrics

Sequence metrics.

feedback_crossover

1
2
3
feedback_crossover(
    mat: ndarray, fb: float = 0.9, co: float = 0.1
) -> Tuple[float, Tuple[ndarray, ndarray]]

Jointly measure feedback marks and crossovers in an adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
fb float

Weight of feedback marks.

0.9
co float

Weight of feedback crossovers.

0.1

Returns:

Type Description
float

Resulting metric.

Tuple[ndarray, ndarray]

Cell contribution matrices (feedback, crossover).

Note

Crossovers are where the "axis" from feedback marks towards the diagonal cross, except when this crossing is a feedback mark in itself. Multiple lines from "above" in the matrix are summed, those from the right are NOT (treated as binary).

References

McCulley, C., and Bloebaum, C. L., 1996, “A Genetic Tool for Optimal Design Sequencing in Complex Engineering Systems,” Struct. Optim., 12(2), pp. 186-201.

Source code in ragraph/analysis/sequence/metrics.py
def feedback_crossover(
    mat: np.ndarray, fb: float = 0.9, co: float = 0.1
) -> Tuple[float, Tuple[np.ndarray, np.ndarray]]:
    """Jointly measure feedback marks and crossovers in an adjacency matrix.

    Arguments:
        mat: Adjacency matrix to analyze.
        fb: Weight of feedback marks.
        co: Weight of feedback crossovers.

    Returns:
        Resulting metric.
        Cell contribution matrices (feedback, crossover).

    Note:
        Crossovers are where the "axis" from feedback marks towards the diagonal cross,
        except when this crossing is a feedback mark in itself. Multiple lines from
        "above" in the matrix are summed, those from the right are **NOT** (treated as
        binary).

    References:
        McCulley, C., and Bloebaum, C. L., 1996, “A Genetic Tool for Optimal Design
            Sequencing in Complex Engineering Systems,” Struct. Optim., 12(2), pp.
            186-201.
    """
    # Treat binary matrix
    mat = (mat > 0) * 1.0

    # contrib_fb is upper triangularized
    total_fb, contrib_fb = feedback_marks(mat)

    # vertical cum sum (number of accumulated marks from the top until diagonal)
    vcumsum = np.triu(np.cumsum(contrib_fb, axis=0), 1)

    # horizontal "line" multiplier, 1s toward the left from the rightmost 1 except the
    # marks themselves:
    # rows of [0,0,1,0,0,1,0,0,0]
    # become  [1,1,0,1,1,0,0,0,0]
    hline = np.cumsum(contrib_fb[:, ::-1], axis=1)[:, ::-1]
    hline = (hline > 0) * 1.0 - contrib_fb
    # no need to np.triu() here, since vcumsum already is upper triangularized.

    # Contribution matrix at crossover points and total score.
    contrib_co = vcumsum * hline
    total_co = contrib_co.sum()

    return fb * total_fb + co * total_co, (contrib_fb, contrib_co)

feedback_distance

feedback_distance(mat: ndarray) -> Tuple[float, ndarray]

Measure the feedback length, e.g. distance from the adjacency matrix diagonal.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Gebala, D. A., & Eppinger, S. D. (1991). Methods for analyzing design procedures. ASME Design Technical Conferences (Design Theory and Methodology), 227-233. Miami.

Source code in ragraph/analysis/sequence/metrics.py
def feedback_distance(mat: np.ndarray) -> Tuple[float, np.ndarray]:
    """Measure the feedback length, e.g. distance from the adjacency matrix diagonal.

    Arguments:
        mat: Adjacency matrix to analyze.

    Returns:
        Resulting metric.
        Cell contribution matrix.

    References:
        Gebala, D. A., & Eppinger, S. D. (1991). Methods for analyzing design
            procedures. ASME Design Technical Conferences (Design Theory and
            Methodology), 227-233. Miami.
    """
    dim = len(mat)

    dimtile = np.tile(np.arange(dim), (dim, 1))
    ddist = dimtile - dimtile.T

    contrib = np.triu(mat, 1) * ddist
    return contrib.sum(), contrib

feedback_lower_left

1
2
3
feedback_lower_left(
    mat: ndarray, fb: float = 100.0, ff: float = 1.0
) -> Tuple[float, ndarray]

Jointly measure lower left distance to the adjacency matrix diagonal and feedback marks. Feedback and feedforward are penalized differently, but both via a quadratic lower-left distance factor.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
fb float

Feedback adjacency multiplier.

100.0
ff float

Feedforward adjacency multiplier.

1.0

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

Note

Feedback marks above the diagonal are weighted 100 times more than those below the diagonal. The multiplier is offset by (n-1)^2.

References

Scott, J. A. (1999). A strategy for modelling the design-development phase of a product. University of Newcastle. See Equation 6.1 and Figure 6.4.

Source code in ragraph/analysis/sequence/metrics.py
def feedback_lower_left(
    mat: np.ndarray, fb: float = 100.0, ff: float = 1.0
) -> Tuple[float, np.ndarray]:
    """Jointly measure lower left distance to the adjacency matrix diagonal and
    feedback marks. Feedback and feedforward are penalized differently, but both via a
    quadratic lower-left distance factor.

    Arguments:
        mat: Adjacency matrix to analyze.
        fb: Feedback adjacency multiplier.
        ff: Feedforward adjacency multiplier.

    Returns:
        Resulting metric.
        Cell contribution matrix.

    Note:
        Feedback marks above the diagonal are weighted 100 times more than those below
        the diagonal. The multiplier is offset by (n-1)^2.

    References:
        Scott, J. A. (1999). A strategy for modelling the design-development phase of a
            product. University of Newcastle. See Equation 6.1 and Figure 6.4.
    """
    dim = len(mat)

    dimtile = np.tile(np.arange(1, dim + 1), (dim, 1))
    lldist = dimtile - dimtile.T + dim
    omega = np.square(lldist)

    # Lower triangle
    omgl = ff * omega
    tril = np.tril(mat, -1)
    resl = omgl * tril

    # Upper triangle
    omgu = fb * omega
    triu = np.triu(mat, 1)
    resu = omgu * triu

    contrib = resl + resu
    return contrib.sum(), contrib

feedback_marks

feedback_marks(mat: ndarray) -> Tuple[float, ndarray]

Measure the sum of feedback marks in an adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Steward, D. V., 1981, Systems Analysis and Management: Structure, Strategy and Design, Petrocelli Books, New York. Kusiak, A., & Wang, J. (1993). Decomposition of the Design Process. Journal of Mechanical Design, 115(4), 687. https://doi.org/10.1115/1.2919255

Source code in ragraph/analysis/sequence/metrics.py
def feedback_marks(mat: np.ndarray) -> Tuple[float, np.ndarray]:
    """Measure the sum of feedback marks in an adjacency matrix.

    Arguments:
        mat: Adjacency matrix to analyze.

    Returns:
        Resulting metric.
        Cell contribution matrix.

    References:
        Steward, D. V., 1981, Systems Analysis and Management: Structure, Strategy and
            Design, Petrocelli Books, New York.
        Kusiak, A., & Wang, J. (1993). Decomposition of the Design Process. Journal of
            Mechanical Design, 115(4), 687. https://doi.org/10.1115/1.2919255
    """
    contrib = np.triu(mat, 1)
    return contrib.sum(), contrib

lower_left

lower_left(mat: ndarray) -> Tuple[float, ndarray]

Measure the distance to the lower left of the adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Todd, D. S. (1997). Multiple criteria genetic algorithms in engineering design and operation. University of Newcastle.

Source code in ragraph/analysis/sequence/metrics.py
def lower_left(mat: np.ndarray) -> Tuple[float, np.ndarray]:
    """Measure the distance to the lower left of the adjacency matrix.

    Arguments:
        mat: Adjacency matrix to analyze.

    Returns:
        Resulting metric.
        Cell contribution matrix.

    References:
        Todd, D. S. (1997). Multiple criteria genetic algorithms in engineering design
            and operation. University of Newcastle.
    """
    dim = len(mat)

    dimtile = np.tile(np.arange(dim), (dim, 1))
    lldist = dimtile - dimtile.T + dim - 1

    contrib = mat * lldist
    return contrib.sum(), contrib

net_markov_flow_adjacency

1
2
3
4
5
6
7
net_markov_flow_adjacency(
    mat: ndarray,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 2.0,
    scale: bool = True,
) -> ndarray

Calculate a flow balance as if the matrix would constitute a (weighted) Markov chain.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

2.0
scale bool

Whether to scale the inflow vector according to the adjacency matrix column sums.

True

Returns:

Type Description
ndarray

Nodal flow balance as an array.

Source code in ragraph/analysis/sequence/metrics.py
def net_markov_flow_adjacency(
    mat: np.ndarray,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 2.0,
    scale: bool = True,
) -> np.ndarray:
    """Calculate a flow balance as if the matrix would constitute a (weighted)
    Markov chain.

    Arguments:
        mat: Adjacency matrix to analyze.
        inf: The weight to subtract outgoing flow by.
        dep: The weight to add incoming flow by.
        mu: Evaporation constant when calculating flow through nodes.
        scale: Whether to scale the inflow vector according to the adjacency matrix
            column sums.

    Returns:
        Nodal flow balance as an array.
    """
    from ragraph.analysis.cluster._markov import MarkovFlow

    flow = MarkovFlow(mat, mu, scale).flow_matrix[:-1, :-1]  # Exclude sink.
    inf_vector = flow.sum(axis=0)
    dep_vector = flow.sum(axis=1)
    return dep * dep_vector - inf * inf_vector

net_markov_flow_graph

1
2
3
4
5
6
7
8
9
net_markov_flow_graph(
    graph: Graph,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 2.0,
) -> ndarray

Calculate a flow balance as if the graph would constitute a (weighted) Markov chain.

Parameters:

Name Type Description Default
graph Graph

Graph to analyze nodes of.

required
nodes Optional[Union[List[Node], List[str]]]

Set of node (names) to calculate the flow with.

None
inherit bool

Whether to take into account edges between descendants of nodes.

True
loops bool

Whether to take into account self-loop edges.

False
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

2.0

Returns:

Type Description
ndarray

Nodal flow balance as an array.

Note

Uses edge inheritance to calculate an adjacency matrix.

Source code in ragraph/analysis/sequence/metrics.py
def net_markov_flow_graph(
    graph: Graph,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = True,
    loops: bool = False,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 2.0,
) -> np.ndarray:
    """Calculate a flow balance as if the graph would constitute a (weighted)
    Markov chain.

    Arguments:
        graph: Graph to analyze nodes of.
        nodes: Set of node (names) to calculate the flow with.
        inherit: Whether to take into account edges between descendants of nodes.
        loops: Whether to take into account self-loop edges.
        inf: The weight to subtract outgoing flow by.
        dep: The weight to add incoming flow by.
        mu: Evaporation constant when calculating flow through nodes.

    Returns:
        Nodal flow balance as an array.

    Note:
        Uses edge inheritance to calculate an adjacency matrix.
    """
    mat = graph.get_adjacency_matrix(nodes, inherit=inherit, loops=loops)
    return net_markov_flow_adjacency(mat, inf=inf, dep=dep, mu=mu)

utils

Sequencing utils.

branchsort

branchsort(
    graph: Graph,
    algo: Callable,
    algo_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    recurse: bool = True,
    names: bool = False,
    safe: bool = True,
) -> Tuple[Graph, List[Node], List[Node]]

docstring stub

Source code in ragraph/analysis/sequence/utils.py
@branchsort_analysis
def branchsort(
    graph: Graph,
    algo: Callable,
    algo_args: Optional[Dict[str, Any]] = None,
    root: Optional[Union[str, Node]] = None,
    nodes: Optional[Union[List[Node], List[str]]] = None,
    leafs: Optional[Union[List[Node], List[str]]] = None,
    edge_weights: Optional[List[str]] = None,
    inherit: bool = True,
    loops: bool = False,
    inplace: bool = True,
    recurse: bool = True,
    names: bool = False,
    safe: bool = True,
) -> Tuple[Graph, List[Node], List[Node]]:
    """docstring stub"""
    # Sort roots
    assert isinstance(algo_args, dict)
    graph, node_sequence = algo(
        graph=graph,
        root=None,
        nodes=nodes,
        inherit=inherit,
        loops=loops,
        edge_weights=edge_weights,
        inplace=inplace,
        **algo_args,
    )
    assert node_sequence

    # Start with empty leaf sequence.
    leaf_sequence: List[Node] = []

    # Traverse branches
    for node in node_sequence:
        branchsort_analysis.log(f"Traversing '{node.name}'...")

        # Nothing to do for leaf nodes or nodes that should be treated as leafs.
        if node.is_leaf or node in leafs:  # type: ignore
            branchsort_analysis.log(f"Skipped leaf node '{node.name}'.")
            leaf_sequence.append(node)
            continue

        # Recursive case, get sorted children as the sorted "roots" of recursive call.
        if recurse:
            branchsort_analysis.log(f"Recursing into '{node.name}'...")
            _, _child_sequence, _leaf_sequence = branchsort(
                algo=algo,
                algo_args=algo_args,
                graph=graph,
                root=node,
                leafs=leafs,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=inplace,
                recurse=recurse,
            )
            leaf_sequence.extend(_leaf_sequence)
            branchsort_analysis.log(
                f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}."
            )

        # Non-recursive case, get sorted children in this call.
        else:
            branchsort_analysis.log("Non-recursive call to algorithm...")
            _, _leaf_sequence = algo(
                graph=graph,
                root=node,
                inherit=inherit,
                loops=loops,
                edge_weights=edge_weights,
                inplace=inplace,
                **algo_args,
            )
            leaf_sequence.extend(_leaf_sequence)
            branchsort_analysis.log(
                f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}."
            )

    return graph, node_sequence, leaf_sequence

markov_decision

markov_decision(
    graph: Graph,
    options: List[Node],
    inherit: bool = True,
    loops: bool = False,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 1.5,
    context: Optional[List[Node]] = None,
) -> int

Make a decision based on a Markov flow analysis of the adjacency matrix. The node with the lowest net markov flow is picked.

Parameters:

Name Type Description Default
graph Graph

Graph data.

required
options List[Node]

Nodes to decide between.

required
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

1.5
context Optional[List[Node]]

Optional superset of nodes with respect to the options argument that constitutes the "complete" Markov chain that should be taken into account.

None

Returns:

Type Description
int

Index of node to pick.

Source code in ragraph/analysis/sequence/utils.py
def markov_decision(
    graph: Graph,
    options: List[Node],
    inherit: bool = True,
    loops: bool = False,
    inf: float = 1.0,
    dep: float = 1.0,
    mu: float = 1.5,
    context: Optional[List[Node]] = None,
) -> int:
    """Make a decision based on a Markov flow analysis of the adjacency matrix. The node
    with the lowest net markov flow is picked.

    Arguments:
        graph: Graph data.
        options: Nodes to decide between.
        inf: The weight to subtract outgoing flow by.
        dep: The weight to add incoming flow by.
        mu: Evaporation constant when calculating flow through nodes.
        context: Optional superset of nodes with respect to the options argument that
            constitutes the "complete" Markov chain that should be taken into account.

    Returns:
        Index of node to pick.
    """
    if context:
        adj = graph.get_adjacency_matrix(context, inherit=inherit, loops=loops)
        mapping = [context.index(option) for option in options]
        penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu)[mapping]
    else:
        adj = graph.get_adjacency_matrix(options, inherit=inherit, loops=loops)
        penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu)
    idx = np.argmin(penalties)
    logger.debug(
        f"Markov decision:\nNode {options[idx].name} has the lowest of penalties of "
        + f"{[n.name for n in options]}.\n{penalties}"
    )
    return np.argmin(penalties)