# ragraph.analysis.sequence¶

## Sequencing algorithms¶

Sequencing algorithms search for sequences of nodes according to some objective or logic. Simple are sorting by name and more elaborate examples sort according to Markov transition probabilities or sequences without feedback (edges to earlier nodes).

### Available algorithms¶

• markov sequencing based on Markov transition probabilities between nodes.
• scc_tearing efficient algorithm to sort the "trivial" part of sequencing quickly by putting the Strongly Connected Components (largest possible cycles) in a non-feedback order and then "tears" iteratively according to a tearing indicator.
• name just sorts nodes by their name.
• tarjans_dfs Tarjan's Depth First Search algorithm. Efficient sorting algorithm for Directed Acyclic Graphs (DAG).
• axis Typical Multi-Domain Matrix axis sorting method.

### Metrics¶

There are several metrics from literature available to grade sequences as well. The metrics are documented over at metrics.

## Utilities¶

Finally, there are some utilities like branch-sorting (recursively sorting all branches in a hierarchical tree instead of all leaf nodes as a whole) available in utils:

## axis¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 axis( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) 

docstring stub

Source code in ragraph/analysis/sequence/_axis.py
 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 @axis_sequencing_analysis def axis( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ): """docstring stub""" seq = get_axis_sequence(nodes, kinds, sort_by_bus, sort_by_width) # type: ignore return graph, seq 

## branchsort¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 branchsort( graph: Graph, algo: Callable, algo_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, leafs: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, recurse: bool = True, names: bool = False, safe: bool = True, ) -> Tuple[Graph, List[Node], List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/utils.py
  29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 @branchsort_analysis def branchsort( graph: Graph, algo: Callable, algo_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, leafs: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, recurse: bool = True, names: bool = False, safe: bool = True, ) -> Tuple[Graph, List[Node], List[Node]]: """docstring stub""" # Sort roots assert isinstance(algo_args, dict) graph, node_sequence = algo( graph=graph, root=None, nodes=nodes, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, **algo_args, ) assert node_sequence # Start with empty leaf sequence. leaf_sequence: List[Node] = [] # Traverse branches for node in node_sequence: branchsort_analysis.log(f"Traversing '{node.name}'...") # Nothing to do for leaf nodes or nodes that should be treated as leafs. if node.is_leaf or node in leafs: # type: ignore branchsort_analysis.log(f"Skipped leaf node '{node.name}'.") leaf_sequence.append(node) continue # Recursive case, get sorted children as the sorted "roots" of recursive call. if recurse: branchsort_analysis.log(f"Recursing into '{node.name}'...") _, _child_sequence, _leaf_sequence = branchsort( algo=algo, algo_args=algo_args, graph=graph, root=node, leafs=leafs, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, recurse=recurse, ) leaf_sequence.extend(_leaf_sequence) branchsort_analysis.log( f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}." ) # Non-recursive case, get sorted children in this call. else: branchsort_analysis.log("Non-recursive call to algorithm...") _, _leaf_sequence = algo( graph=graph, root=node, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, **algo_args, ) leaf_sequence.extend(_leaf_sequence) branchsort_analysis.log( f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}." ) return graph, node_sequence, leaf_sequence 

## genetic¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 genetic( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, evaluator: Optional[str] = params["evaluator"].default, n_chromosomes: Optional[int] = params[ "n_chromosomes" ].default, n_generations: Optional[int] = params[ "n_generations" ].default, p_crossover: Optional[float] = params[ "p_crossover" ].default, p_mutation: Optional[float] = params[ "p_mutation" ].default, p_swap: Optional[float] = params["p_swap"].default, inherit: bool = True, edge_weights: Optional[List[str]] = None, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_genetic.py
  63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @genetic_analysis def genetic( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, evaluator: Optional[str] = params["evaluator"].default, n_chromosomes: Optional[int] = params["n_chromosomes"].default, n_generations: Optional[int] = params["n_generations"].default, p_crossover: Optional[float] = params["p_crossover"].default, p_mutation: Optional[float] = params["p_mutation"].default, p_swap: Optional[float] = params["p_swap"].default, inherit: bool = True, edge_weights: Optional[List[str]] = None, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" lineage = genetic_sequencing( graph, nodes, # type: ignore n_chromosomes, # type: ignore n_generations, # type: ignore 1, p_crossover, # type: ignore p_mutation, # type: ignore p_swap, # type: ignore n_records=1, inherit=inherit, edge_weights=edge_weights, ) best = lineage.hall_of_fame.chromosomes[0].genes seq = [nodes[i] for i in best] # type: ignore return graph, seq 

## markov¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 markov( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inf: float = markov_params["inf"].default, dep: float = markov_params["dep"].default, mu: float = markov_params["mu"].default, scale: bool = markov_params["scale"].default, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_markov.py
 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @markov_sequencing_analysis def markov( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inf: float = markov_params["inf"].default, # type: ignore dep: float = markov_params["dep"].default, # type: ignore mu: float = markov_params["mu"].default, # type: ignore scale: bool = markov_params["scale"].default, # type: ignore inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" assert nodes is not None adj = graph.get_adjacency_matrix(nodes=nodes, inherit=inherit, loops=loops, only=edge_weights) assert isinstance(adj, np.ndarray) # Obtain node penalty scores. penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu, scale=scale) # Calculate final sequence. idxs = np.argsort(penalties) # Reorder nodes seq = [nodes[i] for i in idxs] return graph, seq 

## name¶

  1 2 3 4 5 6 7 8 9 10 11 12 name( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

Sequence nodes by node name.

Parameters:

Name Type Description Default
graph Graph

Graph to sequence nodes of.

required
nodes Optional[Union[List[str], List[Node]]]

Nodes to sequence.

None

Returns:

Type Description
Tuple[Graph, List[Node]]

Sequenced nodes.

Source code in ragraph/analysis/sequence/_name.py
 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @sort_by_name_analysis def name( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """Sequence nodes by node name. Arguments: graph: Graph to sequence nodes of. nodes: Nodes to sequence. Returns: Sequenced nodes. """ return graph, sorted(nodes, key=lambda n: n.name) 

## scc_tearing¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 scc_tearing( graph: Graph, decision: Callable[[Graph, List[Node]], Node], decision_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_scc_tearing.py
 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @scc_tearing_analysis def scc_tearing( graph: Graph, decision: Callable[[Graph, List[Node]], Node], decision_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" sequence = scc_tearing_algorithm( graph, nodes, inherit, loops, decision, decision_args # type: ignore ) seq = list(sequence) return graph, seq 

## tarjans_dfs¶

  1 2 3 4 5 6 7 8 9 10 11 12 tarjans_dfs( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_tarjan.py
 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @tarjans_dfs_analysis def tarjans_dfs( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" assert nodes out = [] unmarked = [n for n in nodes if isinstance(n, Node)] state = {n: "todo" for n in nodes} targets_of: Dict[Node, List[Node]] = { source: [ target for target in unmarked if any(graph.edges_between(source, target, inherit=inherit, loops=False)) ] for source in unmarked } def visit(n: Node): s = state[n] if s == "done": return if s == "temp": raise ValueError("Not a Directed Acyclic Graph (DAG).") state[n] = "temp" for m in targets_of[n]: visit(m) state[n] = "done" out.append(n) while unmarked: node = unmarked.pop() if state[node] == "done": continue visit(node) return graph, out[::-1] 

## _axis¶

Sort nodes in a typical axis-ready format.

### axis¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 axis( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) 

docstring stub

Source code in ragraph/analysis/sequence/_axis.py
 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 @axis_sequencing_analysis def axis( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ): """docstring stub""" seq = get_axis_sequence(nodes, kinds, sort_by_bus, sort_by_width) # type: ignore return graph, seq 

### get_axis_sequence¶

 1 2 3 4 5 6 7 8 9 get_axis_sequence( nodes: Iterable[Node], kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, width_cache: Optional[Dict[Node, int]] = None, ) -> List[Node] 

Get nodes in a plot-axis-ready order.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Axis nodes to order.

required
kinds Optional[List[str]]

Optional order of node kinds.

None
sort_by_bus bool

Whether to put bus nodes first.

True
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

True
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache.

None
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
List[Node]

Ordered nodes.

Note

Axis ready means ordered by kind and buses nodes first where possible. Any supplied cache dictionary will be updated in-place.

Warning

When supplying node kinds it's your own responsibility to make sure that each node's kind actually is included in that node kind order.

Source code in ragraph/analysis/sequence/_axis.py
  70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 def get_axis_sequence( nodes: Iterable[Node], kinds: Optional[List[str]] = None, sort_by_bus: bool = True, sort_by_width: bool = True, root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, width_cache: Optional[Dict[Node, int]] = None, ) -> List[Node]: """Get nodes in a plot-axis-ready order. Arguments: nodes: Axis nodes to order. kinds: Optional order of node kinds. sort_by_bus: Whether to put bus nodes first. sort_by_width: Whether to sort nodes by width in terms of leaf nodes. root_cache: Node to resulting root node cache. Any supplied dictionary will be updated in-place. child_cache: Node to child nodes that have been seen cache. width_cache: Node to node width cache. Returns: Ordered nodes. Note: Axis ready means ordered by kind and buses nodes first where possible. Any supplied cache dictionary will be updated in-place. Warning: When supplying node kinds it's your own responsibility to make sure that each node's kind actually is included in that node kind order. """ root_cache = dict() if root_cache is None else root_cache child_cache = dict() if child_cache is None else child_cache width_cache = dict() if width_cache is None else width_cache kinds = get_kinds(nodes) if kinds is None else kinds kind_order = {kind: position for position, kind in enumerate(kinds)} roots = get_roots(nodes, root_cache, child_cache) roots.sort(key=lambda x: get_root_key(x, kind_order, sort_by_width, width_cache)) # Sort by is_bus status with buses first, width second, if applicable. if sort_by_bus or sort_by_width: for parent in child_cache.keys(): child_cache[parent].sort( key=lambda x: get_sibling_key(x, sort_by_bus, sort_by_width, width_cache) ) # Complement width_cache for roots for completeness sake. if sort_by_width: for root in roots: width_cache[root] = sum( width_cache.get(child, 1) for child in child_cache.get(root, []) ) leafs = [leaf for root in roots for leaf in get_leafs(root, child_cache)] return leafs 

### get_kinds¶

 1 get_kinds(nodes: Iterable[Node]) -> List[str] 

Get node kinds in order of occurrence.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Nodes to iterate over.

required

Returns:

Type Description
List[str]

List of Node kinds.

Source code in ragraph/analysis/sequence/_axis.py
 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 def get_kinds(nodes: Iterable[Node]) -> List[str]: """Get node kinds in order of occurrence. Arguments: nodes: Nodes to iterate over. Returns: List of Node kinds. """ ordered = [] seen = set() for node in nodes: kind = node.kind if kind not in seen: ordered.append(kind) seen.add(kind) return ordered 

### get_leafs¶

 1 2 3 get_leafs( parent: Node, child_cache: Dict[Node, List[Node]] ) -> List[Node] 

Get leaf nodes from a linked-list style child_cache (e.g. parent to children).

Parameters:

Name Type Description Default
parent Node

Parent node.

required
child_cache Dict[Node, List[Node]]

Linked-list style parent-children dictionary.

required

Returns:

Type Description
List[Node]

Displayed leaf nodes as they occur under this parent.

Source code in ragraph/analysis/sequence/_axis.py
 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 def get_leafs(parent: Node, child_cache: Dict[Node, List[Node]]) -> List[Node]: """Get leaf nodes from a linked-list style child_cache (e.g. parent to children). Arguments: parent: Parent node. child_cache: Linked-list style parent-children dictionary. Returns: Displayed leaf nodes as they occur under this parent. """ # Handle the case where this node is actually a leaf node. if parent not in child_cache: return [parent] # Fetch leaf nodes from deeper into the cached hierarchy. leafs = [] for child in child_cache[parent]: leafs.extend(get_leafs(child, child_cache)) return leafs 

### get_root¶

 1 2 3 4 5 get_root( node: Node, root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, ) -> Node 

Get root node of a node.

Parameters:

Name Type Description Default
node Node

Nodes to find corresponding root nodes for.

required
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place.

None

Returns:

Type Description
Node

Root node.

Note

Supply cache dictionaries to store intermediate results.

Source code in ragraph/analysis/sequence/_axis.py
 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 def get_root( node: Node, root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, ) -> Node: """Get root node of a node. Arguments: node: Nodes to find corresponding root nodes for. root_cache: Node to resulting root node cache. Any supplied dictionary will be updated in-place. child_cache: Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place. Returns: Root node. Note: Supply cache dictionaries to store intermediate results. """ # Cache from any seen node to the resulting root node. root_cache = dict() if root_cache is None else root_cache # Cache from any parent we came across to the included children in the search. child_cache = dict() if child_cache is None else child_cache # Cached, return cache value. if node in root_cache: return root_cache[node] # Is a root, return. if node.parent is None: return node # Find and update caches accordingly. if node.parent not in child_cache: child_cache[node.parent] = [node] else: child_cache[node.parent].append(node) root = get_root(node.parent, root_cache, child_cache) root_cache[node] = root return root 

### get_root_key¶

 1 2 3 4 5 6 get_root_key( node: Node, kind_order: Dict[str, int], sort_by_width: bool, width_cache: Optional[Dict[Node, int]] = None, ) -> Union[int, Tuple[int, int]] 

Calculate a sorting score for root nodes.

Parameters:

Name Type Description Default
node Node

Root node to obtain a score for.

required
kind_order Dict[str, int]

Node kind to position dictionary.

required
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

required
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
Union[int, Tuple[int, int]]

Root node sorting key.

Note

Sort by kinds first, width in terms of actual (not display) leaf nodes second.

Source code in ragraph/analysis/sequence/_axis.py
 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 def get_root_key( node: Node, kind_order: Dict[str, int], sort_by_width: bool, width_cache: Optional[Dict[Node, int]] = None, ) -> Union[int, Tuple[int, int]]: """Calculate a sorting score for root nodes. Arguments: node: Root node to obtain a score for. kind_order: Node kind to position dictionary. sort_by_width: Whether to sort nodes by width in terms of leaf nodes. width_cache: Node to node width cache. Returns: Root node sorting key. Note: Sort by kinds first, width in terms of actual (not display) leaf nodes second. """ if sort_by_width: return ( kind_order[node.kind], -get_width(node, width_cache), ) else: return kind_order[node.kind] 

### get_roots¶

 1 2 3 4 5 get_roots( nodes: Iterable[Node], root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, ) -> List[Node] 

Get list of root nodes corresponding to some collection of nodes.

Parameters:

Name Type Description Default
nodes Iterable[Node]

Collection of nodes to find corresponding root nodes for.

required
root_cache Optional[Dict[Node, Node]]

Node to resulting root node cache. Any supplied dictionary will be updated in-place.

None
child_cache Optional[Dict[Node, List[Node]]]

Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place.

None

Returns:

Type Description
List[Node]

Root nodes.

Note

Supply cache dictionaries to store intermediate results.

Source code in ragraph/analysis/sequence/_axis.py
 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 def get_roots( nodes: Iterable[Node], root_cache: Optional[Dict[Node, Node]] = None, child_cache: Optional[Dict[Node, List[Node]]] = None, ) -> List[Node]: """Get list of root nodes corresponding to some collection of nodes. Arguments: nodes: Collection of nodes to find corresponding root nodes for. root_cache: Node to resulting root node cache. Any supplied dictionary will be updated in-place. child_cache: Node to child nodes that have been seen cache. Any supplied dictionary will be updated in-place. Returns: Root nodes. Note: Supply cache dictionaries to store intermediate results. """ # Cache from any seen node to the resulting root node. root_cache = dict() if root_cache is None else root_cache # Cache from any parent we came across to the included children in the search. child_cache = dict() if child_cache is None else child_cache seen = set() # Seen root nodes. ordered = [] # Seen root nodes in order of appearance. for node in nodes: root = get_root(node, root_cache, child_cache) if root not in seen: seen.add(root) ordered.append(root) return ordered 

### get_sibling_key¶

 1 2 3 4 5 6 get_sibling_key( node: Node, sort_by_bus: bool, sort_by_width: bool, width_cache: Optional[Dict[Node, int]] = None, ) -> Union[int, Tuple[int, int]] 

Calculate a sorting score for sibling nodes. Lower scores go first.

Parameters:

Name Type Description Default
node Node

Node to score.

required
sort_by_bus bool

Whether to put bus nodes first.

required
sort_by_width bool

Whether to sort nodes by width in terms of leaf nodes.

required
width_cache Optional[Dict[Node, int]]

Dictionary of nodes to displayed children.

None

Returns:

Type Description
Union[int, Tuple[int, int]]

Sibling node sorting score.

Note

Bus nodes first, width in terms of actual (not display) leaf nodes second.

Source code in ragraph/analysis/sequence/_axis.py
 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 def get_sibling_key( node: Node, sort_by_bus: bool, sort_by_width: bool, width_cache: Optional[Dict[Node, int]] = None, ) -> Union[int, Tuple[int, int]]: """Calculate a sorting score for sibling nodes. Lower scores go first. Arguments: node: Node to score. sort_by_bus: Whether to put bus nodes first. sort_by_width: Whether to sort nodes by width in terms of leaf nodes. width_cache: Dictionary of nodes to displayed children. Returns: Sibling node sorting score. Note: Bus nodes first, width in terms of actual (not display) leaf nodes second.""" if sort_by_bus and sort_by_width: return (not node.is_bus, -get_width(node, width_cache)) elif sort_by_bus: return not node.is_bus elif sort_by_width: return -get_width(node, width_cache) else: return 0 

### get_width¶

 1 2 3 4 get_width( node: Node, width_cache: Optional[Dict[Node, int]] = None, ) -> int 

Get width of a node and update width cache in-place.

Parameters:

Name Type Description Default
node Node

Node to get the width of.

required
width_cache Optional[Dict[Node, int]]

Node to node width cache.

None

Returns:

Type Description
int

Node width.

Source code in ragraph/analysis/sequence/_axis.py
 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 def get_width(node: Node, width_cache: Optional[Dict[Node, int]] = None) -> int: """Get width of a node and update width cache in-place. Arguments: node: Node to get the width of. width_cache: Node to node width cache. Returns: Node width. """ width_cache = dict() if width_cache is None else width_cache if node not in width_cache: if node.children: width_cache[node] = sum(get_width(child, width_cache) for child in node.children) else: width_cache[node] = 1 return width_cache[node] 

## _genetic¶

Genetic algorithms for sequencing purposes.

### genetic¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 genetic( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, evaluator: Optional[str] = params["evaluator"].default, n_chromosomes: Optional[int] = params[ "n_chromosomes" ].default, n_generations: Optional[int] = params[ "n_generations" ].default, p_crossover: Optional[float] = params[ "p_crossover" ].default, p_mutation: Optional[float] = params[ "p_mutation" ].default, p_swap: Optional[float] = params["p_swap"].default, inherit: bool = True, edge_weights: Optional[List[str]] = None, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_genetic.py
  63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 @genetic_analysis def genetic( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, evaluator: Optional[str] = params["evaluator"].default, n_chromosomes: Optional[int] = params["n_chromosomes"].default, n_generations: Optional[int] = params["n_generations"].default, p_crossover: Optional[float] = params["p_crossover"].default, p_mutation: Optional[float] = params["p_mutation"].default, p_swap: Optional[float] = params["p_swap"].default, inherit: bool = True, edge_weights: Optional[List[str]] = None, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" lineage = genetic_sequencing( graph, nodes, # type: ignore n_chromosomes, # type: ignore n_generations, # type: ignore 1, p_crossover, # type: ignore p_mutation, # type: ignore p_swap, # type: ignore n_records=1, inherit=inherit, edge_weights=edge_weights, ) best = lineage.hall_of_fame.chromosomes[0].genes seq = [nodes[i] for i in best] # type: ignore return graph, seq 

### genetic_sequencing¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 genetic_sequencing( graph: Graph, nodes: List[Node], n_chromosomes: int, n_generations: int, n_hall_of_fame: int, p_crossover: float, p_mutation: float, p_swap: float, evaluator: str = "feedback_distance", n_records: Optional[int] = None, inherit: bool = True, edge_weights: Optional[List[str]] = None, ) -> Lineage 

Genetic sequencing of nodes in a graph.

Parameters:

Name Type Description Default
graph Graph

Graph holding data.

required
nodes List[Node]

Nodes to sequence.

required
evaluator str

Evaluation method to use. One of "feedback_distance", "feedback_marks", or "lower_left_distance".

'feedback_distance'
n_chromosomes int

Number of chromosomes in each generation.

required
n_generations int

Number of generations to simulate.

required
n_hall_of_fame int

Hall of Fame size of best performing chromosomes.

required
p_crossover float

Probability for a pair to be subjected to crossover.

required
p_mutation float

Probability for each chromosome to be subjected to mutation.

required
p_swap float

Probability for each gene to be swapped with another during mutation.

required
n_records Optional[int]

Number of generation records to keep.

None
inherit bool

Whether to inherit edges between children when getting the adjacency matrix.

True
edge_weights Optional[List[str]]

Edge weights to consider when getting the adjacency matrix.

None

Returns:

Type Description
Lineage

Lineage object containing generations of chromosomes, generation records and

Lineage

a hall of fame of best performing chromosomes.

Source code in ragraph/analysis/sequence/_genetic.py
 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 def genetic_sequencing( graph: Graph, nodes: List[Node], n_chromosomes: int, n_generations: int, n_hall_of_fame: int, p_crossover: float, p_mutation: float, p_swap: float, evaluator: str = "feedback_distance", n_records: Optional[int] = None, inherit: bool = True, edge_weights: Optional[List[str]] = None, ) -> Lineage: """Genetic sequencing of nodes in a graph. Arguments: graph: Graph holding data. nodes: Nodes to sequence. evaluator: Evaluation method to use. One of "feedback_distance", "feedback_marks", or "lower_left_distance". n_chromosomes: Number of chromosomes in each generation. n_generations: Number of generations to simulate. n_hall_of_fame: Hall of Fame size of best performing chromosomes. p_crossover: Probability for a pair to be subjected to crossover. p_mutation: Probability for each chromosome to be subjected to mutation. p_swap: Probability for each gene to be swapped with another during mutation. n_records: Number of generation records to keep. inherit: Whether to inherit edges between children when getting the adjacency matrix. edge_weights: Edge weights to consider when getting the adjacency matrix. Returns: Lineage object containing generations of chromosomes, generation records and a hall of fame of best performing chromosomes. """ matrix = graph.get_adjacency_matrix(nodes, inherit=inherit, only=edge_weights) vec = [x for row in matrix for x in row] ekinds = dict( feedback_distance=EvaluatorKinds.FeedbackDistance, feedback_marks=EvaluatorKinds.FeedbackMarks, lower_left_distance=EvaluatorKinds.LowerLeftDistance, ) evaluator = EvaluatorMatrix(ekinds[evaluator], vec, 1) settings = SequencingSettings( n_genes=len(nodes), p_crossover=p_crossover, p_mutation=p_mutation, n_chromosomes=n_chromosomes, n_generations=n_generations, n_records=n_records, n_hall_of_fame=n_hall_of_fame, ) lineage = sequence_sga( settings, GeneratorKinds.RandomSequence, evaluator, RecorderKinds.FitnessStatistics, SelectorKinds.Roulette, CrossoverKinds.IPX, MutatorSwap(p_swap), ConvergenceKinds.Never, ) return lineage 

## _markov¶

Sorting algorithm based on the node dependency/influence in a Markov chain.

### markov¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 markov( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inf: float = markov_params["inf"].default, dep: float = markov_params["dep"].default, mu: float = markov_params["mu"].default, scale: bool = markov_params["scale"].default, inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_markov.py
 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 @markov_sequencing_analysis def markov( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inf: float = markov_params["inf"].default, # type: ignore dep: float = markov_params["dep"].default, # type: ignore mu: float = markov_params["mu"].default, # type: ignore scale: bool = markov_params["scale"].default, # type: ignore inherit: bool = True, loops: bool = False, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" assert nodes is not None adj = graph.get_adjacency_matrix(nodes=nodes, inherit=inherit, loops=loops, only=edge_weights) assert isinstance(adj, np.ndarray) # Obtain node penalty scores. penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu, scale=scale) # Calculate final sequence. idxs = np.argsort(penalties) # Reorder nodes seq = [nodes[i] for i in idxs] return graph, seq 

## _name¶

Sequence nodes by name.

### name¶

  1 2 3 4 5 6 7 8 9 10 11 12 name( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

Sequence nodes by node name.

Parameters:

Name Type Description Default
graph Graph

Graph to sequence nodes of.

required
nodes Optional[Union[List[str], List[Node]]]

Nodes to sequence.

None

Returns:

Type Description
Tuple[Graph, List[Node]]

Sequenced nodes.

Source code in ragraph/analysis/sequence/_name.py
 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @sort_by_name_analysis def name( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """Sequence nodes by node name. Arguments: graph: Graph to sequence nodes of. nodes: Nodes to sequence. Returns: Sequenced nodes. """ return graph, sorted(nodes, key=lambda n: n.name) 

## _scc_tearing¶

A sequencing algorithm for cyclic graphs that tears cycles based on a metric (penalty) function. By default we use the metric that the Markov sequencing algorithm uses to sort all nodes straight away.

### scc_tearing¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 scc_tearing( graph: Graph, decision: Callable[[Graph, List[Node]], Node], decision_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_scc_tearing.py
 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @scc_tearing_analysis def scc_tearing( graph: Graph, decision: Callable[[Graph, List[Node]], Node], decision_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" sequence = scc_tearing_algorithm( graph, nodes, inherit, loops, decision, decision_args # type: ignore ) seq = list(sequence) return graph, seq 

## _tarjan¶

Tarjan's Depth First Search Algorithm.

### tarjans_dfs¶

  1 2 3 4 5 6 7 8 9 10 11 12 tarjans_dfs( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs ) -> Tuple[Graph, List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/_tarjan.py
 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @tarjans_dfs_analysis def tarjans_dfs( graph: Graph, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[str], List[Node]]] = None, inherit: bool = True, loops: bool = False, edge_weights: Optional[List[str]] = None, inplace: bool = True, names: bool = False, safe: bool = True, **kwargs, ) -> Tuple[Graph, List[Node]]: """docstring stub""" assert nodes out = [] unmarked = [n for n in nodes if isinstance(n, Node)] state = {n: "todo" for n in nodes} targets_of: Dict[Node, List[Node]] = { source: [ target for target in unmarked if any(graph.edges_between(source, target, inherit=inherit, loops=False)) ] for source in unmarked } def visit(n: Node): s = state[n] if s == "done": return if s == "temp": raise ValueError("Not a Directed Acyclic Graph (DAG).") state[n] = "temp" for m in targets_of[n]: visit(m) state[n] = "done" out.append(n) while unmarked: node = unmarked.pop() if state[node] == "done": continue visit(node) return graph, out[::-1] 

## metrics¶

Sequence metrics.

### feedback_crossover¶

 1 2 3 feedback_crossover( mat: ndarray, fb: float = 0.9, co: float = 0.1 ) -> Tuple[float, Tuple[ndarray, ndarray]] 

Jointly measure feedback marks and crossovers in an adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
fb float

Weight of feedback marks.

0.9
co float

Weight of feedback crossovers.

0.1

Returns:

Type Description
float

Resulting metric.

Tuple[ndarray, ndarray]

Cell contribution matrices (feedback, crossover).

Note

Crossovers are where the "axis" from feedback marks towards the diagonal cross, except when this crossing is a feedback mark in itself. Multiple lines from "above" in the matrix are summed, those from the right are NOT (treated as binary).

References

McCulley, C., and Bloebaum, C. L., 1996, “A Genetic Tool for Optimal Design Sequencing in Complex Engineering Systems,” Struct. Optim., 12(2), pp. 186-201.

Source code in ragraph/analysis/sequence/metrics.py
 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 def feedback_crossover( mat: np.ndarray, fb: float = 0.9, co: float = 0.1 ) -> Tuple[float, Tuple[np.ndarray, np.ndarray]]: """Jointly measure feedback marks and crossovers in an adjacency matrix. Arguments: mat: Adjacency matrix to analyze. fb: Weight of feedback marks. co: Weight of feedback crossovers. Returns: Resulting metric. Cell contribution matrices (feedback, crossover). Note: Crossovers are where the "axis" from feedback marks towards the diagonal cross, except when this crossing is a feedback mark in itself. Multiple lines from "above" in the matrix are summed, those from the right are **NOT** (treated as binary). References: McCulley, C., and Bloebaum, C. L., 1996, “A Genetic Tool for Optimal Design Sequencing in Complex Engineering Systems,” Struct. Optim., 12(2), pp. 186-201. """ # Treat binary matrix mat = (mat > 0) * 1.0 # contrib_fb is upper triangularized total_fb, contrib_fb = feedback_marks(mat) # vertical cum sum (number of accumulated marks from the top until diagonal) vcumsum = np.triu(np.cumsum(contrib_fb, axis=0), 1) # horizontal "line" multiplier, 1s toward the left from the rightmost 1 except the # marks themselves: # rows of [0,0,1,0,0,1,0,0,0] # become [1,1,0,1,1,0,0,0,0] hline = np.cumsum(contrib_fb[:, ::-1], axis=1)[:, ::-1] hline = (hline > 0) * 1.0 - contrib_fb # no need to np.triu() here, since vcumsum already is upper triangularized. # Contribution matrix at crossover points and total score. contrib_co = vcumsum * hline total_co = contrib_co.sum() return fb * total_fb + co * total_co, (contrib_fb, contrib_co) 

### feedback_distance¶

 1 feedback_distance(mat: ndarray) -> Tuple[float, ndarray] 

Measure the feedback length, e.g. distance from the adjacency matrix diagonal.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Gebala, D. A., & Eppinger, S. D. (1991). Methods for analyzing design procedures. ASME Design Technical Conferences (Design Theory and Methodology), 227-233. Miami.

Source code in ragraph/analysis/sequence/metrics.py
 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 def feedback_distance(mat: np.ndarray) -> Tuple[float, np.ndarray]: """Measure the feedback length, e.g. distance from the adjacency matrix diagonal. Arguments: mat: Adjacency matrix to analyze. Returns: Resulting metric. Cell contribution matrix. References: Gebala, D. A., & Eppinger, S. D. (1991). Methods for analyzing design procedures. ASME Design Technical Conferences (Design Theory and Methodology), 227-233. Miami. """ dim = len(mat) dimtile = np.tile(np.arange(dim), (dim, 1)) ddist = dimtile - dimtile.T contrib = np.triu(mat, 1) * ddist return contrib.sum(), contrib 

### feedback_lower_left¶

 1 2 3 feedback_lower_left( mat: ndarray, fb: float = 100.0, ff: float = 1.0 ) -> Tuple[float, ndarray] 

Jointly measure lower left distance to the adjacency matrix diagonal and feedback marks. Feedback and feedforward are penalized differently, but both via a quadratic lower-left distance factor.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
fb float

100.0
ff float

1.0

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

Note

Feedback marks above the diagonal are weighted 100 times more than those below the diagonal. The multiplier is offset by (n-1)^2.

References

Scott, J. A. (1999). A strategy for modelling the design-development phase of a product. University of Newcastle. See Equation 6.1 and Figure 6.4.

Source code in ragraph/analysis/sequence/metrics.py
  78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 def feedback_lower_left( mat: np.ndarray, fb: float = 100.0, ff: float = 1.0 ) -> Tuple[float, np.ndarray]: """Jointly measure lower left distance to the adjacency matrix diagonal and feedback marks. Feedback and feedforward are penalized differently, but both via a quadratic lower-left distance factor. Arguments: mat: Adjacency matrix to analyze. fb: Feedback adjacency multiplier. ff: Feedforward adjacency multiplier. Returns: Resulting metric. Cell contribution matrix. Note: Feedback marks above the diagonal are weighted 100 times more than those below the diagonal. The multiplier is offset by (n-1)^2. References: Scott, J. A. (1999). A strategy for modelling the design-development phase of a product. University of Newcastle. See Equation 6.1 and Figure 6.4. """ dim = len(mat) dimtile = np.tile(np.arange(1, dim + 1), (dim, 1)) lldist = dimtile - dimtile.T + dim omega = np.square(lldist) # Lower triangle omgl = ff * omega tril = np.tril(mat, -1) resl = omgl * tril # Upper triangle omgu = fb * omega triu = np.triu(mat, 1) resu = omgu * triu contrib = resl + resu return contrib.sum(), contrib 

### feedback_marks¶

 1 feedback_marks(mat: ndarray) -> Tuple[float, ndarray] 

Measure the sum of feedback marks in an adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Steward, D. V., 1981, Systems Analysis and Management: Structure, Strategy and Design, Petrocelli Books, New York. Kusiak, A., & Wang, J. (1993). Decomposition of the Design Process. Journal of Mechanical Design, 115(4), 687. https://doi.org/10.1115/1.2919255

Source code in ragraph/analysis/sequence/metrics.py
 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def feedback_marks(mat: np.ndarray) -> Tuple[float, np.ndarray]: """Measure the sum of feedback marks in an adjacency matrix. Arguments: mat: Adjacency matrix to analyze. Returns: Resulting metric. Cell contribution matrix. References: Steward, D. V., 1981, Systems Analysis and Management: Structure, Strategy and Design, Petrocelli Books, New York. Kusiak, A., & Wang, J. (1993). Decomposition of the Design Process. Journal of Mechanical Design, 115(4), 687. https://doi.org/10.1115/1.2919255 """ contrib = np.triu(mat, 1) return contrib.sum(), contrib 

### lower_left¶

 1 lower_left(mat: ndarray) -> Tuple[float, ndarray] 

Measure the distance to the lower left of the adjacency matrix.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required

Returns:

Type Description
float

Resulting metric.

ndarray

Cell contribution matrix.

References

Todd, D. S. (1997). Multiple criteria genetic algorithms in engineering design and operation. University of Newcastle.

Source code in ragraph/analysis/sequence/metrics.py
 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 def lower_left(mat: np.ndarray) -> Tuple[float, np.ndarray]: """Measure the distance to the lower left of the adjacency matrix. Arguments: mat: Adjacency matrix to analyze. Returns: Resulting metric. Cell contribution matrix. References: Todd, D. S. (1997). Multiple criteria genetic algorithms in engineering design and operation. University of Newcastle. """ dim = len(mat) dimtile = np.tile(np.arange(dim), (dim, 1)) lldist = dimtile - dimtile.T + dim - 1 contrib = mat * lldist return contrib.sum(), contrib 

 1 2 3 4 5 6 7 net_markov_flow_adjacency( mat: ndarray, inf: float = 1.0, dep: float = 1.0, mu: float = 2.0, scale: bool = True, ) -> ndarray 

Calculate a flow balance as if the matrix would constitute a (weighted) Markov chain.

Parameters:

Name Type Description Default
mat ndarray

Adjacency matrix to analyze.

required
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

2.0
scale bool

Whether to scale the inflow vector according to the adjacency matrix column sums.

True

Returns:

Type Description
ndarray

Nodal flow balance as an array.

Source code in ragraph/analysis/sequence/metrics.py
 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 def net_markov_flow_adjacency( mat: np.ndarray, inf: float = 1.0, dep: float = 1.0, mu: float = 2.0, scale: bool = True, ) -> np.ndarray: """Calculate a flow balance as if the matrix would constitute a (weighted) Markov chain. Arguments: mat: Adjacency matrix to analyze. inf: The weight to subtract outgoing flow by. dep: The weight to add incoming flow by. mu: Evaporation constant when calculating flow through nodes. scale: Whether to scale the inflow vector according to the adjacency matrix column sums. Returns: Nodal flow balance as an array. """ from ragraph.analysis.cluster._markov import MarkovFlow flow = MarkovFlow(mat, mu, scale).flow_matrix[:-1, :-1] # Exclude sink. inf_vector = flow.sum(axis=0) dep_vector = flow.sum(axis=1) return dep * dep_vector - inf * inf_vector 

### net_markov_flow_graph¶

 1 2 3 4 5 6 7 8 9 net_markov_flow_graph( graph: Graph, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, inf: float = 1.0, dep: float = 1.0, mu: float = 2.0, ) -> ndarray 

Calculate a flow balance as if the graph would constitute a (weighted) Markov chain.

Parameters:

Name Type Description Default
graph Graph

Graph to analyze nodes of.

required
nodes Optional[Union[List[Node], List[str]]]

Set of node (names) to calculate the flow with.

None
inherit bool

Whether to take into account edges between descendants of nodes.

True
loops bool

Whether to take into account self-loop edges.

False
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

2.0

Returns:

Type Description
ndarray

Nodal flow balance as an array.

Note

Uses edge inheritance to calculate an adjacency matrix.

Source code in ragraph/analysis/sequence/metrics.py
 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 def net_markov_flow_graph( graph: Graph, nodes: Optional[Union[List[Node], List[str]]] = None, inherit: bool = True, loops: bool = False, inf: float = 1.0, dep: float = 1.0, mu: float = 2.0, ) -> np.ndarray: """Calculate a flow balance as if the graph would constitute a (weighted) Markov chain. Arguments: graph: Graph to analyze nodes of. nodes: Set of node (names) to calculate the flow with. inherit: Whether to take into account edges between descendants of nodes. loops: Whether to take into account self-loop edges. inf: The weight to subtract outgoing flow by. dep: The weight to add incoming flow by. mu: Evaporation constant when calculating flow through nodes. Returns: Nodal flow balance as an array. Note: Uses edge inheritance to calculate an adjacency matrix. """ mat = graph.get_adjacency_matrix(nodes, inherit=inherit, loops=loops) return net_markov_flow_adjacency(mat, inf=inf, dep=dep, mu=mu) 

## utils¶

Sequencing utils.

### branchsort¶

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 branchsort( graph: Graph, algo: Callable, algo_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, leafs: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, recurse: bool = True, names: bool = False, safe: bool = True, ) -> Tuple[Graph, List[Node], List[Node]] 

docstring stub

Source code in ragraph/analysis/sequence/utils.py
  29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 @branchsort_analysis def branchsort( graph: Graph, algo: Callable, algo_args: Optional[Dict[str, Any]] = None, root: Optional[Union[str, Node]] = None, nodes: Optional[Union[List[Node], List[str]]] = None, leafs: Optional[Union[List[Node], List[str]]] = None, edge_weights: Optional[List[str]] = None, inherit: bool = True, loops: bool = False, inplace: bool = True, recurse: bool = True, names: bool = False, safe: bool = True, ) -> Tuple[Graph, List[Node], List[Node]]: """docstring stub""" # Sort roots assert isinstance(algo_args, dict) graph, node_sequence = algo( graph=graph, root=None, nodes=nodes, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, **algo_args, ) assert node_sequence # Start with empty leaf sequence. leaf_sequence: List[Node] = [] # Traverse branches for node in node_sequence: branchsort_analysis.log(f"Traversing '{node.name}'...") # Nothing to do for leaf nodes or nodes that should be treated as leafs. if node.is_leaf or node in leafs: # type: ignore branchsort_analysis.log(f"Skipped leaf node '{node.name}'.") leaf_sequence.append(node) continue # Recursive case, get sorted children as the sorted "roots" of recursive call. if recurse: branchsort_analysis.log(f"Recursing into '{node.name}'...") _, _child_sequence, _leaf_sequence = branchsort( algo=algo, algo_args=algo_args, graph=graph, root=node, leafs=leafs, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, recurse=recurse, ) leaf_sequence.extend(_leaf_sequence) branchsort_analysis.log( f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}." ) # Non-recursive case, get sorted children in this call. else: branchsort_analysis.log("Non-recursive call to algorithm...") _, _leaf_sequence = algo( graph=graph, root=node, inherit=inherit, loops=loops, edge_weights=edge_weights, inplace=inplace, **algo_args, ) leaf_sequence.extend(_leaf_sequence) branchsort_analysis.log( f"Leaf sequence extended with {[n.name for n in _leaf_sequence]}." ) return graph, node_sequence, leaf_sequence 

### markov_decision¶

  1 2 3 4 5 6 7 8 9 10 markov_decision( graph: Graph, options: List[Node], inherit: bool = True, loops: bool = False, inf: float = 1.0, dep: float = 1.0, mu: float = 1.5, context: Optional[List[Node]] = None, ) -> int 

Make a decision based on a Markov flow analysis of the adjacency matrix. The node with the lowest net markov flow is picked.

Parameters:

Name Type Description Default
graph Graph

Graph data.

required
options List[Node]

Nodes to decide between.

required
inf float

The weight to subtract outgoing flow by.

1.0
dep float

The weight to add incoming flow by.

1.0
mu float

Evaporation constant when calculating flow through nodes.

1.5
context Optional[List[Node]]

Optional superset of nodes with respect to the options argument that constitutes the "complete" Markov chain that should be taken into account.

None

Returns:

Type Description
int

Index of node to pick.

Source code in ragraph/analysis/sequence/utils.py
 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 def markov_decision( graph: Graph, options: List[Node], inherit: bool = True, loops: bool = False, inf: float = 1.0, dep: float = 1.0, mu: float = 1.5, context: Optional[List[Node]] = None, ) -> int: """Make a decision based on a Markov flow analysis of the adjacency matrix. The node with the lowest net markov flow is picked. Arguments: graph: Graph data. options: Nodes to decide between. inf: The weight to subtract outgoing flow by. dep: The weight to add incoming flow by. mu: Evaporation constant when calculating flow through nodes. context: Optional superset of nodes with respect to the options argument that constitutes the "complete" Markov chain that should be taken into account. Returns: Index of node to pick. """ if context: adj = graph.get_adjacency_matrix(context, inherit=inherit, loops=loops) mapping = [context.index(option) for option in options] penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu)[mapping] else: adj = graph.get_adjacency_matrix(options, inherit=inherit, loops=loops) penalties = net_markov_flow_adjacency(adj, inf=inf, dep=dep, mu=mu) idx = np.argmin(penalties) logger.debug( f"Markov decision:\nNode {options[idx].name} has the lowest of penalties of " + f"{[n.name for n in options]}.\n{penalties}" ) return np.argmin(penalties)