Skip to content

ragraph.io

Graph import/export

Supports multiple formats via its submodules.

archimate

XML format support using The Open Group ArchiMate® Model Exchange File Format.

Reference

from_archimate

from_archimate(*args, **kwargs)

Archimate XML import is not implemented.

Source code in ragraph/io/archimate/__init__.py
def from_archimate(*args, **kwargs):
    """Archimate XML import is not implemented."""
    raise NotImplementedError("Archimate XML import is not implemented.")

to_archimate

to_archimate(
    graph: Graph,
    path: Optional[Union[str, Path]] = None,
    elem: Optional[Element] = None,
    tostring_args: Optional[Dict[str, Any]] = None,
    bundle_schemas: bool = False,
) -> str

Encode Graph to an Archimate model XML element.

Parameters:

Name Type Description Default
graph Graph

Graph to convert to XML.

required
path Optional[Union[str, Path]]

Optional file path to write XML to.

None
elem Optional[Element]

Optional object to append the Graph to.

None
tostring_args Optional[Dict[str, Any]]

Optional argument overrides to lxml.etree.tostring.

None
bundle_schemas bool

When exporting to a file, bundle the .xsd schemas.

False

Returns:

Type Description
str

XML element.

Note

For both nodes and edges we use the following: The annotations.archimate["type"] determines the type (and therefore layer). The annotations.archimate["documentation"] populates the documentation field. Refer to https://www.opengroup.org/xsd/archimate/ and the XSD scheme for all possible XML element types.

Source code in ragraph/io/archimate/__init__.py
def to_archimate(
    graph: Graph,
    path: Optional[Union[str, Path]] = None,
    elem: Optional[etree.Element] = None,
    tostring_args: Optional[Dict[str, Any]] = None,
    bundle_schemas: bool = False,
) -> str:
    """Encode Graph to an Archimate model XML element.

    Arguments:
        graph: Graph to convert to XML.
        path: Optional file path to write XML to.
        elem: Optional object to append the Graph to.
        tostring_args: Optional argument overrides to
            [`lxml.etree.tostring`][lxml.etree.tostring].
        bundle_schemas: When exporting to a file, bundle the .xsd schemas.

    Returns:
        XML element.

    Note:
        For both nodes and edges we use the following:
        The `annotations.archimate["type"]` determines the type (and therefore layer).
        The `annotations.archimate["documentation"]` populates the documentation field.
        Refer to https://www.opengroup.org/xsd/archimate/ and the XSD scheme for all
        possible XML element types.
    """
    if elem is None:
        parser = etree.XMLParser(remove_blank_text=True)
        doc = etree.parse(str(here / "bare.xml"), parser)
        root = doc.getroot()
    else:
        root = elem

    elements = maker.elements()
    elements.extend(_node_to_element(n) for n in graph.nodes)
    if len(elements):
        root.append(elements)

    relationships = maker.relationships()
    relationships.extend(_hierarchy_to_relationships(graph))
    relationships.extend(_edge_to_relationship(e) for e in graph.edges)
    if len(relationships):
        root.append(relationships)

    propdefs = _make_property_definitions(graph)
    if len(propdefs):
        root.append(propdefs)

    for n in graph.nodes:
        _expand_requirements_constraints(n, elements, relationships)

    if path is not None:
        p = Path(path)

        args = dict(encoding="UTF-8", xml_declaration=True, pretty_print=True)
        if tostring_args is not None:
            args.update(tostring_args)

        string = etree.tostring(root, **args)
        p.write_bytes(string)

        if bundle_schemas:
            shutil.copy2(here / "archimate3_Model.xsd", p.parent)

    return root

canopy

Canopy by Ratio CASE format support

from_canopy

from_canopy(path: Union[str, Path]) -> List[Graph]

Get the graph(s) from a Canopy data file.

Parameters:

Name Type Description Default
path Union[str, Path]

Path of the file to load.

required

Returns:

Type Description
List[Graph]

Graph objects contained in Canopy data formats being either a graph, tab,

List[Graph]

session, or workspace export of Canopy.

Raises:

Type Description
InconsistencyError

if graph is inconsistent.

Source code in ragraph/io/canopy.py
def from_canopy(
    path: Union[str, Path],
) -> List[Graph]:
    """Get the graph(s) from a Canopy data file.

    Arguments:
        path: Path of the file to load.

    Returns:
        Graph objects contained in Canopy data formats being either a graph, tab,
        session, or workspace export of Canopy.

    Raises:
        InconsistencyError: if graph is inconsistent.
    """
    data = json.loads(Path(path).read_text())
    schema = _match_schema(data.get("$schema", ""))
    if schema == "workspace":
        graphs = list(_decode_workspace(data))
    elif schema == "session":
        graphs = list(_decode_session(data))
    elif schema == "tab":
        graphs = list(_decode_tab(data))
    else:
        graphs = [_decode_graph(data)]

    for graph in graphs:
        graph.check_consistency(raise_error=True)

    return graphs

to_canopy

to_canopy(
    graph: Graph,
    path: Optional[Union[Path, str]] = None,
    fmt: str = "graph",
) -> Optional[str]

Save graph as a Canopy dataset.

Parameters:

Name Type Description Default
graph Graph

Graph to save.

required
path Optional[Union[Path, str]]

Path to write to.

None
fmt str

One of 'session' or 'graph'.

'graph'

Returns:

Type Description
Optional[str]

JSON encoded string if no path was provided to write to.

Source code in ragraph/io/canopy.py
def to_canopy(
    graph: Graph,
    path: Optional[Union[Path, str]] = None,
    fmt: str = "graph",
) -> Optional[str]:
    """Save graph as a Canopy dataset.

    Arguments:
        graph: Graph to save.
        path: Path to write to.
        fmt: One of 'session' or 'graph'.

    Returns:
        JSON encoded string if no path was provided to write to.
    """
    if fmt == "session":
        data = _encode_session(graph, schema=True)
    else:
        data = _encode_graph(graph, schema=True)
    if path is not None:
        enc = json.dumps(data)
        Path(path).write_text(enc, encoding="utf-8")
        return None
    else:
        enc = json.dumps(data, indent=2)
        return enc

csv

CSV format support

from_csv

from_csv(
    nodes_path: Optional[Union[str, Path]] = None,
    edges_path: Optional[Union[str, Path]] = None,
    csv_delimiter: str = ";",
    iter_delimiter: str = ";",
    node_weights: Optional[List[str]] = None,
    edge_weights: Optional[List[str]] = None,
    **graph_kwargs: Any
) -> Graph

Convert CSV files to a graph.

Nodes file (optional) requires at least a name column. Optional special property columns are kind, labels, parent, children and is_bus.

Edges file requires at least a source and target column. Optional special property columns are kind and labels.

Automatically completes one sided parent-child relationships.

Parameters:

Name Type Description Default
nodes_path Optional[Union[str, Path]]

Path of optional nodes CSV file.

None
edges_path Optional[Union[str, Path]]

Path of edges CSV file.

None
csv_delimiter str

Delimiter of CSV file.

';'
iter_delimiter str

Iterable delimiter (i.e. for children names list).

';'
node_weights Optional[List[str]]

Columns to interpret as node weights.

None
edge_weights Optional[List[str]]

Columns to interpret as edge weights.

None
**graph_kwargs Any

Optional Graph arguments when instantiating.

{}

Returns:

Type Description
Graph

Graph object.

Raises:

Type Description
InconsistencyError

if graph is inconsistent.

Source code in ragraph/io/csv.py
def from_csv(
    nodes_path: Optional[Union[str, Path]] = None,
    edges_path: Optional[Union[str, Path]] = None,
    csv_delimiter: str = ";",
    iter_delimiter: str = ";",
    node_weights: Optional[List[str]] = None,
    edge_weights: Optional[List[str]] = None,
    **graph_kwargs: Any,
) -> Graph:
    """Convert CSV files to a graph.

    Nodes file (optional) requires at least a `name` column. Optional special
    property columns are `kind`, `labels`, `parent`, `children` and `is_bus`.

    Edges file requires at least a `source` and `target` column. Optional special
    property columns are `kind` and `labels`.

    Automatically completes one sided parent-child relationships.

    Arguments:
        nodes_path: Path of optional nodes CSV file.
        edges_path: Path of edges CSV file.
        csv_delimiter: Delimiter of CSV file.
        iter_delimiter: Iterable delimiter (i.e. for children names list).
        node_weights: Columns to interpret as node weights.
        edge_weights: Columns to interpret as edge weights.
        **graph_kwargs: Optional [`Graph`][ragraph.graph.Graph] arguments when instantiating.

    Returns:
        Graph object.

    Raises:
        InconsistencyError: if graph is inconsistent.
    """
    graph = Graph(**graph_kwargs)

    if not nodes_path and not edges_path:
        return graph

    if not node_weights:
        node_weights = []

    if not edge_weights:
        edge_weights = []

    if nodes_path:
        node_dict = _load_nodes(nodes_path, csv_delimiter, iter_delimiter, node_weights)
    else:
        node_dict = _derive_nodes(edges_path, csv_delimiter)

    if edges_path:
        edges = _load_edges(edges_path, csv_delimiter, iter_delimiter, node_dict, edge_weights)
    else:
        edges = []

    graph.add_parents = True
    graph.add_children = True
    graph.nodes = node_dict
    graph.edges = edges
    graph.check_consistency(raise_error=True)
    return graph

to_csv

to_csv(
    graph: Graph,
    stem_path: Union[str, Path],
    csv_delimiter: str = ";",
    iter_delimiter: str = ";",
    use_uuid: bool = False,
)

Save graph to nodes and edges CSV files.

Parameters:

Name Type Description Default
graph Graph

Graph to save.

required
stem_path Union[str, Path]

Stem path for output CSV's. Appended with _nodes.csv and _edges.csv.

required
csv_delimiter str

CSV delimiter.

';'
iter_delimiter str

Iterable delimiter (i.e. for children names list).

';'
use_uuid bool

Whether to export UUIDs, too.

False
Source code in ragraph/io/csv.py
def to_csv(
    graph: Graph,
    stem_path: Union[str, Path],
    csv_delimiter: str = ";",
    iter_delimiter: str = ";",
    use_uuid: bool = False,
):
    """Save graph to nodes and edges CSV files.

    Arguments:
        graph: Graph to save.
        stem_path: Stem path for output CSV's. Appended with _nodes.csv and _edges.csv.
        csv_delimiter: CSV delimiter.
        iter_delimiter: Iterable delimiter (i.e. for children names list).
        use_uuid: Whether to export UUIDs, too.
    """
    stem_path = Path(stem_path)

    nodes_fname = stem_path.name + "_nodes.csv"
    nodes_path = stem_path.with_name(nodes_fname)
    _save_nodes(graph, nodes_path, csv_delimiter, iter_delimiter, use_uuid=use_uuid)

    edges_fname = stem_path.name + "_edges.csv"
    edges_path = stem_path.with_name(edges_fname)
    _save_edges(graph, edges_path, csv_delimiter, iter_delimiter, use_uuid=use_uuid)

esl

Elephant Specification Language format support

from_esl

from_esl(*paths: Union[str, Path]) -> Graph

Convert ESL file(s) into a :obj:ragraph.graph.Graph.

Parameters:

Name Type Description Default
paths Union[str, Path]

Paths to resolve into ESL files. May be any number of files and directories to scan.

()

Returns:

Type Description
Graph

Instantiated graph.

Source code in ragraph/io/esl.py
def from_esl(
    *paths: Union[str, Path],
) -> Graph:
    """Convert ESL file(s) into a :obj:`ragraph.graph.Graph`.

    Arguments:
        paths: Paths to resolve into ESL files. May be any number of files and
            directories to scan.

    Returns:
        Instantiated graph.
    """
    return to_graph(*paths, output=None, force=False)

grip

GRIP format support

add_SI_Functie

add_SI_Functie(
    sub: Element,
    func: Dict[str, Any],
    fcount: int,
    node: Node,
)

Add function reference to super element.

Note: This is a bit of a weird quirk of the GRIP data structure.

Parameters:

Name Type Description Default
sub Element

super element to add function element to

required
func Dict[str, Any]

function reference.

required
fcount int

Number of referenced function.

required
node Node

Node to which the reference is added.

required
Source code in ragraph/io/grip.py
def add_SI_Functie(sub: etree.Element, func: Dict[str, Any], fcount: int, node: Node):
    """Add function reference to super element.

    Note: This is a bit of a weird quirk of the GRIP data structure.

    Arguments:
        sub: super element to add function element to
        func: function reference.
        fcount: Number of referenced function.
        node: Node to which the reference is added.
    """
    subsub = etree.SubElement(
        sub,
        "SI_Functie",
        attrib=dict(
            R1Sequence="1",
            R2Sequence=str(fcount),
            Type="RELATION_ELEMENT",
            RootRef=node.annotations["RootRefFunctie"],
        ),
    )

    etree.SubElement(
        subsub,
        func.kind.capitalize(),
        attrib=dict(
            ConfigurationOfRef=func.annotations["ConfigurationOfRef"],
            GUID=func.annotations["GUID"],
        ),
    )

add_labels

add_labels(g: Graph)

Add labels to edges based on the description of Raakvlakken.

Source code in ragraph/io/grip.py
def add_labels(g: Graph):
    """Add labels to edges based on the description of Raakvlakken."""
    for n in [node for node in g.nodes if node.kind == "raakvlak"]:
        if not n.annotations.get("Description", None):
            # No labels to derive.
            continue
        labels = [l.strip() for l in n.annotations.Description.split(",")]
        variants = [s for s in g.sources_of(n)] + [t for t in g.targets_of(n)]
        for e in g.edges_between_all(variants, variants):
            e.labels = list(set(e.labels + labels))

            if len(e.labels) >= 1 and "default" in e.labels:
                e.labels.remove("default")

add_raakvlak_annotations

add_raakvlak_annotations(g: Graph)

Adding Raakvlak ID's as a string to edges annotations.

Source code in ragraph/io/grip.py
def add_raakvlak_annotations(g: Graph):
    """Adding Raakvlak ID's as a string to edges annotations."""
    for e in g.edges:
        if e.source.kind != "object" or e.target.kind != "object":
            continue

        sr = set([t for t in g.targets_of(e.source) if t.kind == "raakvlak"])
        tr = set([t for t in g.targets_of(e.target) if t.kind == "raakvlak"])

        if not sr.intersection(tr):
            e.annotations.raakvlakken = ""
            continue

        raakvlakken = ",".join([r.name.split(" | ")[-1] for r in sr.intersection(tr)])

        e.annotations.raakvlakken = raakvlakken

from_grip

from_grip(
    path: Union[str, Path], hierarchy_mode: str = "top-down"
) -> Graph

Decode GRIP XML file, string, or element into a Graph.

Parameters:

Name Type Description Default
path Union[str, Path]

GRIP XML file path.

required
hierarchy_mode str

One of "bottom-up" or "top-down". Defines how the hierarchy relations are stored.

'top-down'

Returns:

Type Description
Graph

Graph object.

Source code in ragraph/io/grip.py
def from_grip(path: Union[str, Path], hierarchy_mode: str = "top-down") -> Graph:
    """Decode GRIP XML file, string, or element into a Graph.

    Arguments:
        path: GRIP XML file path.
        hierarchy_mode: One of "bottom-up" or "top-down". Defines how
           the hierarchy relations are stored.

    Returns:
        Graph object.
    """

    tree = etree.parse(str(path))
    root = tree.getroot()

    graph = Graph(
        uuid=root.attrib.get("WorkspaceID"),
        name=root.attrib.get("WorkspaceName"),
        annotations=dict(root.attrib),
    )
    parse_params(graph, root)

    parse_collection(graph, root, "Objecten", "Object", OBJECT_KIND)
    parse_objectenboom(graph, root, hierarchy_mode)

    parse_collection(graph, root, "Objecttypen", "Objecttype", OBJECTTYPE_KIND)
    parse_objecttypenboom(graph, root)

    parse_collection(graph, root, "Functies", "Functie", FUNCTIE_KIND)
    parse_collection(graph, root, "Systeemeisen", "Systeemeis", SYSTEEMEIS_KIND)
    parse_collection(graph, root, "Scope", "Scope", SCOPE_KIND)
    parse_collection(graph, root, "Raakvlakken", "Raakvlak", RAAKVLAK_KIND)

    parse_systeemeis_edges(graph, root)
    parse_object_edges(graph, root)
    parse_scope_edges(graph, root)
    parse_raakvlak_edges(graph, root)

    # Enrich graph.
    add_raakvlak_annotations(graph)
    add_labels(graph)

    return graph

parse_CI_MEEisObjects

parse_CI_MEEisObjects(el: Element, annotations: Dict)

Parse CI_MEEisObjects and store within annotations.

Parameters:

Name Type Description Default
el Element

Systeemeis to be parsed

required
annotations Dict

Dictionary in which information must be stored

required
Source code in ragraph/io/grip.py
def parse_CI_MEEisObjects(el: etree.Element, annotations: Dict):
    """Parse CI_MEEisObjects and store within annotations.

    Arguments:
       el: Systeemeis to be parsed
       annotations: Dictionary in which information must be stored
    """
    annotations["CI_MEEisObjects"] = []

    for MObj in el.iterfind("CI_MEEisObject"):
        annotations["CI_MEEisObjects"].append(
            {
                "CI_MEEisObject": {
                    "R1Sequence": MObj.attrib["R1Sequence"],
                    "R2Sequence": MObj.attrib["R2Sequence"],
                    "RootRef": MObj.attrib["RootRef"],
                    "Type": MObj.attrib["Type"],
                    "EisObject": {
                        "Name": MObj.find("EisObject").attrib["Name"],
                        "ConfigurationOfRef": MObj.find("EisObject").attrib["ConfigurationOfRef"],
                        "Type": MObj.find("EisObject").attrib["Type"],
                        "GUID": MObj.find("EisObject").attrib["GUID"],
                        "SI_Object": {
                            "R1Sequence": MObj.find("EisObject")
                            .find("SI_Object")
                            .attrib["R1Sequence"],
                            "R2Sequence": MObj.find("EisObject")
                            .find("SI_Object")
                            .attrib["R2Sequence"],
                            "RootRef": MObj.find("EisObject").find("SI_Object").attrib["RootRef"],
                            "Type": MObj.find("EisObject").find("SI_Object").attrib["Type"],
                            "Object": MObj.find("EisObject")
                            .find("SI_Object")
                            .find("Object")
                            .attrib,
                        },
                    },
                }
            }
        )

parse_CI_MEEisObjecttypen

parse_CI_MEEisObjecttypen(el: Element, annotations: Dict)

Parse CI_MEEisObjectypen and store within annotations.

Parameters:

Name Type Description Default
el Element

Systeemeis to be parsed

required
annotations Dict

Dictionary in which information must be stored

required
Source code in ragraph/io/grip.py
def parse_CI_MEEisObjecttypen(el: etree.Element, annotations: Dict):
    """Parse CI_MEEisObjectypen and store within annotations.

    Arguments:
       el: Systeemeis to be parsed
       annotations: Dictionary in which information must be stored
    """
    annotations["CI_MEEisObjecttypen"] = []

    for MObjtype in el.iterfind("CI_MEEisObjecttype"):
        annotations["CI_MEEisObjecttypen"].append(
            {
                "CI_MEEisObjecttype": {
                    "R1Sequence": MObjtype.attrib["R1Sequence"],
                    "R2Sequence": MObjtype.attrib["R2Sequence"],
                    "RootRef": MObjtype.attrib["RootRef"],
                    "Type": MObjtype.attrib["Type"],
                    "EisObjecttype": {
                        "Name": MObjtype.find("EisObjecttype").attrib["Name"],
                        "ConfigurationOfRef": MObjtype.find("EisObjecttype").attrib[
                            "ConfigurationOfRef"
                        ],
                        "Type": MObjtype.find("EisObjecttype").attrib["Type"],
                        "GUID": MObjtype.find("EisObjecttype").attrib["GUID"],
                        "SI_Objecttype": {
                            "R1Sequence": MObjtype.find("EisObjecttype")
                            .find("SI_Objecttype")
                            .attrib["R1Sequence"],
                            "R2Sequence": MObjtype.find("EisObjecttype")
                            .find("SI_Objecttype")
                            .attrib["R2Sequence"],
                            "RootRef": MObjtype.find("EisObjecttype")
                            .find("SI_Objecttype")
                            .attrib["RootRef"],
                            "Type": MObjtype.find("EisObjecttype")
                            .find("SI_Objecttype")
                            .attrib["Type"],
                            "Objecttype": MObjtype.find("EisObjecttype")
                            .find("SI_Objecttype")
                            .find("Objecttype")
                            .attrib,
                        },
                    },
                }
            }
        )

parse_collection

parse_collection(
    graph: Graph,
    root: Element,
    collection: str,
    item: str,
    kind: str,
)

Decode contents of XML file.

Parameters:

Name Type Description Default
graph Graph

Graph object to add nodes to

required
root Element

Root of XML file

required
collection str

Sub-collection to parse (e.g. objecten or Objecttypen)

required
item str

item kind to parse.

required
kind str

kind to assign to created nodes.

required
Source code in ragraph/io/grip.py
def parse_collection(graph: Graph, root: etree.Element, collection: str, item: str, kind: str):
    """Decode contents of XML file.

    Arguments:
       graph: Graph object to add nodes to
       root: Root of XML file
       collection: Sub-collection to parse (e.g. objecten or Objecttypen)
       item: item kind to parse.
       kind: kind to assign to created nodes.
    """
    coll = root.find(collection)
    for el in coll.iterfind(item):
        name, annotations = parse_element(el=el, item=item)
        graph.add_node(
            Node(
                uuid=el.attrib.get("GUID"),
                name=name,
                kind=kind,
                annotations=annotations,
            )
        )

parse_eisteksten

parse_eisteksten(el: Element, annotations: Dict)

Parse eisteksten and store within annotations.

Parameters:

Name Type Description Default
el Element

Systeemeis to be parsed

required
annotations Dict

Dictionary in which information must be stored

required
Source code in ragraph/io/grip.py
def parse_eisteksten(el: etree.Element, annotations: Dict):
    """Parse eisteksten and store within annotations.

    Arguments:
       el: Systeemeis to be parsed
       annotations: Dictionary in which information must be stored
    """
    if el.find("CI_EistekstDefinitief") is not None:
        annotations.update(
            {
                "CI_EistekstDefinitief": {
                    "R1Sequence": el.find("CI_EistekstDefinitief").attrib["R1Sequence"],
                    "R2Sequence": el.find("CI_EistekstDefinitief").attrib["R2Sequence"],
                    "RootRef": el.find("CI_EistekstDefinitief").attrib["RootRef"],
                    "Type": el.find("CI_EistekstDefinitief").attrib["Type"],
                    "Eistekst": el.find("CI_EistekstDefinitief").find("Eistekst").attrib,
                }
            }
        )

    if el.find("CI_EistekstOrigineel") is not None:
        annotations.update(
            {
                "CI_EistekstOrigineel": {
                    "R1Sequence": el.find("CI_EistekstOrigineel").attrib["R1Sequence"],
                    "R2Sequence": el.find("CI_EistekstOrigineel").attrib["R2Sequence"],
                    "RootRef": el.find("CI_EistekstOrigineel").attrib["RootRef"],
                    "Type": el.find("CI_EistekstOrigineel").attrib["Type"],
                    "EistekstOrigineel": el.find("CI_EistekstOrigineel")
                    .find("EistekstOrigineel")
                    .attrib,
                }
            }
        )

parse_element

parse_element(el: Element, item: str) -> Dict

Parsing element and store relevant information within Node annotations

Parameters:

Name Type Description Default
el Element

Element to parse.

required
item str

item kind being parsed.

required

Returns:

Type Description
Dict

Name string and Annotations dictionary.

Source code in ragraph/io/grip.py
def parse_element(el: etree.Element, item: str) -> Dict:
    """Parsing element and store relevant information within Node annotations

    Arguments:
      el: Element to parse.
      item: item kind being parsed.

    Returns:
      Name string and Annotations dictionary.
    """
    annotations = dict(el.attrib)

    if el.find("ID") is not None:
        annotations.update(dict(ID=el.find("ID").attrib))
        id1 = el.find("ID").attrib.get("ID1")
        name = el.attrib.get("Name")
        if name and id1 not in name:
            name = f"{name} | {id1}"
        elif not name:
            name = id1
    else:
        name = el.attrib.get("Name")

    for key in ["Code", "BronID", "Eiscodering", "ExterneCode"]:
        if el.find(key) is None:
            continue
        annotations.update({key: el.find(key).attrib})

    if item == "Systeemeis":
        parse_eisteksten(el, annotations)

        if el.find("CI_MEEisObject") is not None:
            parse_CI_MEEisObjects(el=el, annotations=annotations)

        if el.find("CI_MEEisObjecttype") is not None:
            parse_CI_MEEisObjecttypen(el=el, annotations=annotations)

    # Parse hierarchy relations for objects
    if el.find("SI_Onderliggend") is not None:
        annotations.update({"RootRefOnderliggend": el.find("SI_Onderliggend").attrib["RootRef"]})

    if el.find("SI_Bovenliggend") is not None:
        annotations.update({"RootRefBovenliggend": el.find("SI_Bovenliggend").attrib["RootRef"]})

    # Parse hiearchy relations for objecttypes.
    if el.find("SI_Onderliggende") is not None:
        annotations.update({"RootRefOnderliggende": el.find("SI_Onderliggende").attrib["RootRef"]})

    # Parse relations to functions.
    if el.find("SI_Functie") is not None:
        annotations.update({"RootRefFunctie": el.find("SI_Functie").attrib["RootRef"]})

    # Parse assignment relations to objectttpes.
    if el.find("SI_Objecttype") is not None:
        annotations.update({"RootRefObjecttype": el.find("SI_Objecttype").attrib["RootRef"]})

    return name, annotations

parse_params

parse_params(graph: Graph, root: Element)

Decode parameter section of GRIP XML file.

Parameters:

Name Type Description Default
graph Graph

Graph object to add nodes to.

required
root Element

Root of XML file.

required
Source code in ragraph/io/grip.py
def parse_params(graph: Graph, root: etree.Element):
    """Decode parameter section of GRIP XML file.

    Arguments:
       graph: Graph object to add nodes to.
       root: Root of XML file.
    """
    scope = root.find("Scope")
    for param in scope.find("RelaticsParameters").iterfind("RelaticsParameter"):
        graph.add_node(Node(kind=PARAMETER_KIND, annotations=param.attrib))

to_grip

to_grip(
    graph: Graph,
    path: Optional[Union[str, Path]] = None,
    hierarchy_mode: str = "top-down",
) -> Optional[str]

Convert a graph with GRIP content structure to a GRIP XML.

Parameters:

Name Type Description Default
graph Graph

Graph to convert.

required
path Optional[Union[str, Path]]

Optional path to write converted XML text to.

None

Returns:

Type Description
Optional[str]

String contents when no path was given.

Source code in ragraph/io/grip.py
def to_grip(
    graph: Graph, path: Optional[Union[str, Path]] = None, hierarchy_mode: str = "top-down"
) -> Optional[str]:
    """Convert a graph with GRIP content structure to a GRIP XML.

    Arguments:
        graph: Graph to convert.
        path: Optional path to write converted XML text to.

    Returns:
        String contents when no path was given.
    """
    report = _build_report(graph, hierarchy_mode)
    byte = etree.tostring(
        report,
        encoding="UTF-8",
        xml_declaration=True,
        pretty_print=True,
    )
    if path:
        Path(path).write_bytes(byte)
    else:
        return byte.decode()

json

JSON format support

from_json

from_json(
    path: Optional[Union[str, Path]] = None,
    enc: Optional[str] = None,
    use_uuid: bool = True,
) -> Graph

Decode JSON file or string into a Graph.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

JSON file path.

None
enc Optional[str]

JSON encoded string.

None

Returns:

Type Description
Graph

Graph object.

Source code in ragraph/io/json.py
def from_json(
    path: Optional[Union[str, Path]] = None,
    enc: Optional[str] = None,
    use_uuid: bool = True,
) -> Graph:
    """Decode JSON file or string into a Graph.

    Arguments:
        path: JSON file path.
        enc: JSON encoded string.

    Returns:
        Graph object.
    """
    if path is None and enc is None:
        raise ValueError("`path` and `enc` arguments cannot both be `None`.")
    if path is not None and enc is not None:
        raise ValueError("`path` and `enc` arguments cannot both be set.")

    if path:
        enc = Path(path).read_text(encoding="utf-8")

    json_dict = json.loads(enc)

    return graph_from_json_dict(json_dict, use_uuid=use_uuid)

graph_from_json_dict

graph_from_json_dict(
    graph_dict: Dict[str, Any], use_uuid: bool = True
) -> Graph

Recreate Graph object from JSON dictionary.

Source code in ragraph/io/json.py
def graph_from_json_dict(graph_dict: Dict[str, Any], use_uuid: bool = True) -> Graph:
    """Recreate Graph object from JSON dictionary."""
    nodes = graph_dict.get("nodes", dict())
    edges = graph_dict.get("edges", dict())
    g = Graph(
        name=graph_dict.get("name"),
        kind=graph_dict.get("kind"),
        labels=graph_dict.get("labels"),
        weights=graph_dict.get("weights"),
        annotations=graph_dict.get("annotations"),
        uuid=graph_dict.get("uuid"),
    )

    # Get node dictionaries.
    if isinstance(nodes, dict):
        node_dicts = list(nodes.values())
    elif isinstance(nodes, list):
        node_dicts = nodes
    else:
        raise ValueError("Unrecognizable nodes property.")

    # Get edge dictionaries.
    if isinstance(edges, dict):
        edge_dicts = list(edges.values())
    elif isinstance(edges, list):
        edge_dicts = edges
    else:
        raise ValueError("Unrecognizable edges property.")

    # Add initial nodes.
    for nd in node_dicts:
        g.add_node(
            Node(
                name=nd.get("name"),
                kind=nd.get("kind"),
                labels=nd.get("labels"),
                parent=None,
                children=[],
                weights=nd.get("weights"),
                annotations=nd.get("annotations"),
                uuid=nd.get("uuid"),
            )
        )

    node_ref = {str(node.uuid): node for node in g.nodes} if use_uuid else g.node_dict
    ref_key = "uuid" if use_uuid else "name"

    # Resolve references.
    for nd in node_dicts:
        if nd.get("parent", False):
            node_ref[nd[ref_key]].parent = node_ref[nd["parent"]]
        if nd.get("children", False):
            node_ref[nd[ref_key]].children = [node_ref[c] for c in nd["children"]]
        if nd.get("is_bus", False):
            node_ref[nd[ref_key]].is_bus = True

    # Add edges.
    for ed in edge_dicts:
        g.add_edge(
            Edge(
                node_ref[ed["source"]],
                node_ref[ed["target"]],
                name=ed.get("name"),
                kind=ed.get("kind"),
                labels=ed.get("labels"),
                weights=ed.get("weights"),
                annotations=ed.get("annotations"),
                uuid=ed.get("uuid"),
            ),
        )

    return g

to_json

to_json(
    graph: Graph, path: Optional[Union[str, Path]] = None
) -> Optional[str]

Encode Graph to JSON file or string.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

Optional file path to write JSON to.

None

Returns:

Type Description
Optional[str]

JSON string.

Source code in ragraph/io/json.py
def to_json(graph: Graph, path: Optional[Union[str, Path]] = None) -> Optional[str]:
    """Encode Graph to JSON file or string.

    Arguments:
        path: Optional file path to write JSON to.

    Returns:
        JSON string.
    """

    enc = json.dumps(graph, cls=GraphEncoder, sort_keys=True, indent=4)
    if path is not None:
        path = Path(path)
        path.write_text(enc, encoding="utf-8")
        return None
    else:
        return enc

matrix

Adjacency and mapping matrices support

from_matrix

from_matrix(
    matrix: Union[
        ndarray, List[List[int]], List[List[float]]
    ],
    rows: Optional[Union[List[Node], List[str]]] = None,
    cols: Optional[Union[List[Node], List[str]]] = None,
    weight_label: str = "default",
    empty: Optional[Union[int, float]] = 0.0,
    convention: Convention = IR_FAD,
    **graph_args: Dict[str, Any]
) -> Graph

Create a graph from an adjacency or mapping matrix.

Parameters:

Name Type Description Default
matrix Union[ndarray, List[List[int]], List[List[float]]]

Matrix to convert into a graph.

required
rows Optional[Union[List[Node], List[str]]]

Nodes or node labels corresponding to the rows of the matrix.

None
cols Optional[Union[List[Node], List[str]]]

Nodes or node labels corresponding to the columns of the matrix. If none are provided, the row labels are re-used.

None
weight_label str

Weight label to use for matrix values.

'default'
empty Optional[Union[int, float]]

Cell value to be considered "empty", e.g. no edge should be created.

0.0
**graph_args Dict[str, Any]

Additional arguments to Graph constructor.

{}

Returns:

Type Description
Graph

Graph object.

Note

If no row labels are provided, they are generated in a "node#" format. If no column labels are provided, they are assumed to be equal to the rows. For non-square matrices, you should provide node labels!

Source code in ragraph/io/matrix.py
def from_matrix(
    matrix: Union["np.ndarray", List[List[int]], List[List[float]]],
    rows: Optional[Union[List[Node], List[str]]] = None,
    cols: Optional[Union[List[Node], List[str]]] = None,
    weight_label: str = "default",
    empty: Optional[Union[int, float]] = 0.0,
    convention: Convention = Convention.IR_FAD,
    **graph_args: Dict[str, Any],
) -> Graph:
    """Create a graph from an adjacency or mapping matrix.

    Arguments:
        matrix: Matrix to convert into a graph.
        rows: Nodes or node labels corresponding to the rows of the matrix.
        cols: Nodes or node labels corresponding to the columns of the matrix. If none
            are provided, the row labels are re-used.
        weight_label: Weight label to use for matrix values.
        empty: Cell value to be considered "empty", e.g. no edge should be created.
        **graph_args: Additional arguments to [`Graph`][ragraph.graph.Graph] constructor.

    Returns:
        Graph object.

    Note:
        If no row labels are provided, they are generated in a "node#" format.
        If no column labels are provided, they are assumed to be equal to the rows.
        For non-square matrices, you should provide node labels!
    """

    rows = [Node(f"node{i}") for i in range(len(matrix))] if rows is None else rows

    # init empty graph
    graph = Graph(**graph_args)  # type: ignore

    # Parse row labels and create new nodes.
    node_rows = _parse_labels(graph, rows)

    # Init nodes from provided labels.
    if cols is None:
        node_cols = node_rows
    else:
        node_cols = _parse_labels(graph, cols)

    # Dimension check of matrix and labels.
    dim = (len(matrix), len(matrix[0]))
    labdim = (len(node_rows), len(node_cols))
    if dim != labdim:
        raise ValueError(f"Matrix dimensions {dim} do not agree with label dimensions {labdim}.")

    # Generate edges and return.
    if convention == Convention.IR_FAD:
        edges = [
            Edge(
                source,
                target,
                name=f"{source.name}->{target.name}",
                weights={weight_label: matrix[row][col]},
            )
            for row, target in enumerate(node_rows)
            for col, source in enumerate(node_cols)
            if matrix[row][col] != empty
        ]
    elif convention == Convention.IC_FBD:
        edges = [
            Edge(
                source,
                target,
                name=f"{source.name}->{target.name}",
                weights={weight_label: matrix[row][col]},
            )
            for row, source in enumerate(node_rows)
            for col, target in enumerate(node_cols)
            if matrix[row][col] != empty
        ]
    else:
        raise ValueError("Unknown convention for matrix conversion.")

    graph.edges = edges
    return graph

to_matrix

to_matrix(
    graph: Graph,
    rows: Optional[Union[List[Node], List[str]]] = None,
    cols: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = False,
    loops: bool = False,
    only: Optional[List[str]] = None,
    convention: Convention = IR_FAD,
) -> Union[ndarray, List[List[float]]]

Convert graph data into a directed numerical adjacency or mapping matrix.

Parameters:

Name Type Description Default
graph Graph

Graph to fetch data from.

required
rows Optional[Union[List[Node], List[str]]]

Nodes representing the matrix rows.

None
cols Optional[Union[List[Node], List[str]]]

Nodes representing the matrix columns if different from the rows.

None
inherit bool

Whether to count weights between children of the given nodes.

False
loops bool

Whether to calculate self-loops from a node to itself.

False
only Optional[List[str]]

Optional subset of edge weights to consider. See ragraph.edge.Edge for default edge weight implementation.

None

Returns:

Type Description
Union[ndarray, List[List[float]]]

Adjacency matrix as a 2D numpy array if numpy is present. Otherwise it will return a 2D

Union[ndarray, List[List[float]]]

nested list.

Note

Note that the matrix is directed! Columns are inputs to rows.

Source code in ragraph/io/matrix.py
def to_matrix(
    graph: Graph,
    rows: Optional[Union[List[Node], List[str]]] = None,
    cols: Optional[Union[List[Node], List[str]]] = None,
    inherit: bool = False,
    loops: bool = False,
    only: Optional[List[str]] = None,
    convention: Convention = Convention.IR_FAD,
) -> Union["np.ndarray", List[List[float]]]:
    """Convert graph data into a directed numerical adjacency or mapping matrix.

    Arguments:
        graph: Graph to fetch data from.
        rows: Nodes representing the matrix rows.
        cols: Nodes representing the matrix columns if different from the rows.
        inherit: Whether to count weights between children of the given nodes.
        loops: Whether to calculate self-loops from a node to itself.
        only: Optional subset of edge weights to consider. See
            [`ragraph.edge.Edge`][ragraph.edge.Edge] for default edge weight implementation.

    Returns:
        Adjacency matrix as a 2D numpy array if numpy is present. Otherwise it will return a 2D
        nested list.

    Note:
        Note that the matrix is directed! Columns are inputs to rows.
    """
    if rows is None:
        rows = graph.leafs
    else:
        rows = [n if isinstance(n, Node) else graph.node_dict[n] for n in rows]

    if cols is None:
        cols = rows
    else:
        cols = [n if isinstance(n, Node) else graph.node_dict[n] for n in cols]

    dim = (len(rows), len(cols))
    if np:
        matrix = np.zeros(dim, dtype=float)
    else:
        matrix = [[0.0 for j in range(dim[1])] for i in range(dim[0])]

    for col, col_node in enumerate(cols):
        for row, row_node in enumerate(rows):
            if col_node == row_node and not loops:
                continue

            if convention == Convention.IR_FAD:
                source = col_node
                target = row_node
            elif convention == Convention.IC_FBD:
                source = row_node
                target = col_node
            else:
                raise ValueError("Unknown convention for matrix conversion.")

            sources = [source]
            targets = [target]

            if inherit:
                sources.extend(source.descendants)
                targets.extend(target.descendants)

            weight = sum(
                [_get_weight(e, only=only) for e in graph.edges_between_all(sources, targets)]
            )
            matrix[row][col] = float(weight)

    return matrix

xml

XML format support using the XML Metadata Interchange (XMI) standard.

from_xml

from_xml(
    path: Optional[Union[str, Path]] = None,
    enc: Optional[str] = None,
    elem: Optional[Element] = None,
    validate: bool = True,
) -> Graph

Decode XML file, string, or element into a Graph.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

XML file path.

None
enc Optional[str]

XML encoded string.

None
elem Optional[Element]

XML Element.

None
validate bool

Whether to validate the XML input.

True

Returns:

Type Description
Graph

Graph object.

Note

You should only provide one of path, enc, or elem, which are handled in that order of precedence.

Source code in ragraph/io/xml/__init__.py
def from_xml(
    path: Optional[Union[str, Path]] = None,
    enc: Optional[str] = None,
    elem: Optional[etree.Element] = None,
    validate: bool = True,
) -> Graph:
    """Decode XML file, string, or element into a Graph.

    Arguments:
        path: XML file path.
        enc: XML encoded string.
        elem: XML Element.
        validate: Whether to validate the XML input.

    Returns:
        Graph object.

    Note:
        You should only provide one of `path`, `enc`, or `elem`, which are handled in that order of
        precedence.
    """
    # Parse input XML.
    if path is not None:
        elem = etree.parse(str(Path(path))).getroot()
    elif enc is not None:
        elem = etree.fromstring(enc)
    elif elem is not None:
        assert isinstance(elem, etree._Element)
    else:
        raise ValueError("At least one of the 'path', 'enc', or 'elem' arguments should be set.")

    # Validate if required.
    if validate and not schema.validate(elem):
        raise ValueError(f"Schema validation failed: \n\n{schema.error_log}")

    # Find the Graph XML element.
    g = elem.find(f".//{q_graph}")

    # Parse nodes.
    node_dict = dict()
    for elem in g.findall(f".//{q_node}"):
        node = _element_to_node(elem)
        node_dict[elem.attrib[q_id]] = node
    _resolve_node_references(node_dict)

    # Parse edges.
    edges = [_element_to_edge(e, node_dict) for e in g.findall(f".//{q_edge}")]

    # Build graph and return.
    graph = Graph(nodes=node_dict.values(), edges=edges)
    _element_to_metadata(g, graph)
    return graph

to_xml

to_xml(
    graph: Graph,
    path: Optional[Union[str, Path]] = None,
    elem: Optional[Element] = None,
    tostring_args: Optional[Dict[str, Any]] = None,
    bundle_schemas: bool = False,
) -> str

Encode Graph to an XML element.

Parameters:

Name Type Description Default
graph Graph

Graph to convert to XML.

required
path Optional[Union[str, Path]]

Optional file path to write XML to.

None
elem Optional[Element]

Optional object to append the Graph to.

None
tostring_args Optional[Dict[str, Any]]

Optional argument overrides to lxml.etree.tostring.

None
bundle_schemas bool

When exporting to a file, bundle the .xsd schemas.

False

Returns:

Type Description
str

XML element.

Source code in ragraph/io/xml/__init__.py
def to_xml(
    graph: Graph,
    path: Optional[Union[str, Path]] = None,
    elem: Optional[etree.Element] = None,
    tostring_args: Optional[Dict[str, Any]] = None,
    bundle_schemas: bool = False,
) -> str:
    """Encode Graph to an XML element.

    Arguments:
        graph: Graph to convert to XML.
        path: Optional file path to write XML to.
        elem: Optional object to append the Graph to.
        tostring_args: Optional argument overrides to
            [`lxml.etree.tostring`][lxml.etree.tostring].
        bundle_schemas: When exporting to a file, bundle the .xsd schemas.

    Returns:
        XML element.
    """

    if elem is None:
        parser = etree.XMLParser(remove_blank_text=True)
        doc = etree.parse(str(here / "bare.xml"), parser)
        elem = doc.getroot()

    graph_elem = etree.SubElement(elem, q_graph, nsmap=nsmap)
    _metadata_to_subelements(graph_elem, graph)
    graph_elem.extend(_node_to_element(n) for n in graph.nodes)
    graph_elem.extend(_edge_to_element(e) for e in graph.edges)

    if path is not None:
        p = Path(path)

        args = dict(encoding="UTF-8", xml_declaration=True, pretty_print=True)
        if tostring_args is not None:
            args.update(tostring_args)

        string = etree.tostring(elem, **args)
        p.write_bytes(string)

        if bundle_schemas:
            shutil.copy2(here / "ragraph.xsd", p.parent)
            shutil.copy2(here / "XMI-Canonical.xsd", p.parent)

    return elem

yaml

YAML format support

from_yaml

from_yaml(
    path: Optional[Union[str, Path]] = None,
    enc: Optional[str] = None,
) -> Graph

Decode YAML file or string into a Graph.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

YAML file path.

None
enc Optional[str]

YAML encoded string.

None

Returns:

Type Description
Graph

Graph object.

Source code in ragraph/io/yaml.py
def from_yaml(path: Optional[Union[str, Path]] = None, enc: Optional[str] = None) -> Graph:
    """Decode YAML file or string into a Graph.

    Arguments:
        path: YAML file path.
        enc: YAML encoded string.

    Returns:
        Graph object.
    """
    if path is None and enc is None:
        raise ValueError("`path` and `enc` arguments cannot both be `None`.")
    if path is not None and enc is not None:
        raise ValueError("`path` and `enc` arguments cannot both be set.")

    if path:
        json_dict = yaml.load(Path(path))
    else:
        json_dict = yaml.load(enc)

    return graph_from_json_dict(json_dict)

to_yaml

to_yaml(
    graph: Graph, path: Optional[Union[str, Path]] = None
) -> Optional[str]

Encode Graph to YAML file or string.

Parameters:

Name Type Description Default
path Optional[Union[str, Path]]

Optional file path to write YAML to.

None

Returns:

Type Description
Optional[str]

YAML string.

Source code in ragraph/io/yaml.py
def to_yaml(graph: Graph, path: Optional[Union[str, Path]] = None) -> Optional[str]:
    """Encode Graph to YAML file or string.

    Arguments:
        path: Optional file path to write YAML to.

    Returns:
        YAML string.
    """
    stream = Path(path) if path else None
    return yaml.dump(graph.json_dict, stream=stream)