Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ arbitrarily applied mapper pass.
- `CNOT2CZDecomposer` decomposer pass
- `RoutingChecker` routing pass
- Restore SGMQ notation for barrier groups in cQASMv1 Exporter
- `CircuitAnalyzer` analyzer pass for computing structural circuit metrics (size, interaction graph, gate dependency graph, density)
Comment thread
elenbaasc marked this conversation as resolved.
Outdated

### Changed

Expand Down
13 changes: 13 additions & 0 deletions opensquirrel/circuit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
if TYPE_CHECKING:
from opensquirrel.ir.ir import IR
from opensquirrel.ir.unitary import Gate
from opensquirrel.passes.analyzer.general_analyzer import Analyzer
from opensquirrel.passes.decomposer.general_decomposer import Decomposer
from opensquirrel.passes.exporter.general_exporter import Exporter
from opensquirrel.passes.mapper.general_mapper import Mapper
Expand Down Expand Up @@ -148,6 +149,18 @@ def asm_filter(self, backend_name: str) -> None:
or (isinstance(statement, AsmDeclaration) and backend_name in str(statement.backend_name))
]

def analyze(self, analyzer: Analyzer) -> dict[str, Any]:
"""Analyzes the circuit using the specified analyzer.

Args:
analyzer (Analyzer): The analyzer to apply.

Returns:
dict[str, Any]: The metrics computed by the analyzer.

"""
return analyzer.analyze(self)

def decompose(self, decomposer: Decomposer) -> None:
"""Decomposes the circuit using to the specified decomposer.

Expand Down
5 changes: 5 additions & 0 deletions opensquirrel/passes/analyzer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from opensquirrel.passes.analyzer.circuit_analyzer import CircuitAnalyzer

__all__ = [
"CircuitAnalyzer",
]
287 changes: 287 additions & 0 deletions opensquirrel/passes/analyzer/circuit_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
from __future__ import annotations

import math
from typing import TYPE_CHECKING, Any

import networkx as nx

from opensquirrel.ir.two_qubit_gate import TwoQubitGate
from opensquirrel.ir.unitary import Gate
from opensquirrel.passes.analyzer.general_analyzer import Analyzer

if TYPE_CHECKING:
from opensquirrel.circuit import Circuit


class CircuitAnalyzer(Analyzer):
"""Computes structural metrics describing a quantum circuit.

The metrics are grouped into four categories:

* Size: number of qubits, gates, two-qubit gates, two-qubit gate percentage, depth.
* Interaction graph (IG): metrics derived from the qubit interaction graph,
where nodes are qubits and edges are two-qubit gates.
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
* Gate dependency graph (GDG): metrics derived from the directed acyclic graph
of gate-to-gate dependencies on shared qubits.
* Density: parallelisation-related metrics (density score, idling score).

The metric set follows the structural circuit profiling approach proposed
in Bandic et al., "Profiling quantum circuits for their efficient execution
on single- and multi-core architectures" (Quantum Sci. Technol. 10, 015060, 2025).

"""

def analyze(self, circuit: Circuit) -> dict[str, Any]:
"""Run the analyzer on the given circuit and return all metrics.

Args:
circuit (Circuit): The circuit to analyze.

Returns:
dict[str, Any]: A flat dictionary mapping metric name to its value.

"""
metrics: dict[str, Any] = {}
metrics.update(self._size_metrics(circuit))
metrics.update(self._interaction_graph_metrics(circuit))
metrics.update(self._gate_dependency_graph_metrics(circuit))
metrics.update(self._density_metrics(circuit))
return metrics

# ------------------------------------------------------------------ #
# Size metrics #
# ------------------------------------------------------------------ #
@staticmethod
def _size_metrics(circuit: Circuit) -> dict[str, Any]:
n_qubits = circuit.qubit_register_size
gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)]
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
n_gates = len(gate_statements)
n_two_qubit_gates = sum(1 for s in gate_statements if isinstance(s, TwoQubitGate))
two_qubit_pct = round(n_two_qubit_gates / n_gates, 4) if n_gates > 0 else 0.0
depth = CircuitAnalyzer._compute_depth(circuit, gate_statements)

return {
"n_qubits": n_qubits,
"n_gates": n_gates,
"n_two_qubit_gates": n_two_qubit_gates,
"two_qubit_pct": two_qubit_pct,
"depth": depth,
}

@staticmethod
def _compute_depth(circuit: Circuit, gate_statements: list[Gate]) -> int:
"""ASAP-style circuit depth (longest dependency chain)."""
n_qubits = circuit.qubit_register_size
if n_qubits == 0 or not gate_statements:
return 0

layer = [0] * n_qubits
for gate in gate_statements:
qubit_indices = list(gate.qubit_indices)
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
if not qubit_indices:
continue
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
new_layer = max(layer[i] for i in qubit_indices) + 1
for i in qubit_indices:
layer[i] = new_layer
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
return max(layer)

# ------------------------------------------------------------------ #
# Interaction graph metrics #
# ------------------------------------------------------------------ #
@staticmethod
def _interaction_graph_metrics(circuit: Circuit) -> dict[str, Any]:
empty: dict[str, Any] = {
"ig_avg_shortest_path": 0.0,
"ig_std_adjacency": 0.0,
"ig_diameter": 0,
"ig_central_dominance": 0.0,
"ig_avg_degree": 0.0,
"ig_n_maximal_cliques": 0,
"ig_clustering_coefficient": 0.0,
}

weighted_edges = circuit.interaction_graph
if not weighted_edges:
return empty
Comment thread
elenbaasc marked this conversation as resolved.
Outdated

graph = nx.Graph()
graph.add_nodes_from(range(circuit.qubit_register_size))
for (i, j), weight in weighted_edges.items():
graph.add_edge(i, j, weight=weight)

# avg_shortest_path: computed on the largest connected component.
try:
largest_cc_nodes = max(nx.connected_components(graph), key=len)
largest_cc = graph.subgraph(largest_cc_nodes)
avg_shortest_path = (
round(nx.average_shortest_path_length(largest_cc), 4) if largest_cc.number_of_nodes() > 1 else 0.0
)
except (nx.NetworkXError, ValueError):
avg_shortest_path = 0.0

# std of adjacency matrix entries
adjacency_matrix = nx.to_numpy_array(graph)
std_adjacency = round(float(adjacency_matrix.std()), 4)

# diameter: undefined for disconnected graphs so we use 0 in that case.
try:
diameter = nx.diameter(graph) if nx.is_connected(graph) else 0
except nx.NetworkXError:
diameter = 0

# central point of dominance: max betweenness across nodes.
betweenness = nx.betweenness_centrality(graph)
central_dominance = round(max(betweenness.values()), 4) if betweenness else 0.0

# average degree
degrees = [d for _, d in graph.degree()]
avg_degree = round(sum(degrees) / len(degrees), 4) if degrees else 0.0

# number of maximal cliques
try:
n_maximal_cliques = sum(1 for _ in nx.find_cliques(graph))
except nx.NetworkXError:
n_maximal_cliques = 0

# average clustering coefficient
clustering_coefficient = round(nx.average_clustering(graph), 4)

return {
"ig_avg_shortest_path": avg_shortest_path,
"ig_std_adjacency": std_adjacency,
"ig_diameter": diameter,
"ig_central_dominance": central_dominance,
"ig_avg_degree": avg_degree,
"ig_n_maximal_cliques": n_maximal_cliques,
"ig_clustering_coefficient": clustering_coefficient,
}

# ------------------------------------------------------------------ #
# Gate dependency graph metrics #
# ------------------------------------------------------------------ #
@staticmethod
def _gate_dependency_graph_metrics(circuit: Circuit) -> dict[str, Any]:
gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)]
n_gates = len(gate_statements)
if n_gates == 0:
return {
"gdg_critical_path_length": 0,
"gdg_path_length_mean": 0.0,
"gdg_path_length_std": 0.0,
"gdg_pct_gates_in_critical_path": 0.0,
}

gdg = CircuitAnalyzer._build_gate_dependency_graph(gate_statements)
critical_path_length = CircuitAnalyzer._safe_critical_path_length(gdg)
longest_to, longest_from = CircuitAnalyzer._compute_longest_paths(gdg)
mean_length, std_length = CircuitAnalyzer._path_length_stats(longest_to)
pct_gates_in_cp = CircuitAnalyzer._critical_path_membership_fraction(
gdg, longest_to, longest_from, critical_path_length, n_gates
)

return {
"gdg_critical_path_length": critical_path_length,
"gdg_path_length_mean": round(mean_length, 4),
"gdg_path_length_std": round(std_length, 4),
"gdg_pct_gates_in_critical_path": pct_gates_in_cp,
}

@staticmethod
def _build_gate_dependency_graph(gate_statements: list[Gate]) -> nx.DiGraph:
"""Build a DAG where edge (i, j) means gate i must run before gate j."""
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
gdg: nx.DiGraph = nx.DiGraph()
Comment thread
SoufiTNO marked this conversation as resolved.
Outdated
last_gate_on_qubit: dict[int, int] = {}
for index, gate in enumerate(gate_statements):
gdg.add_node(index)
for qubit_index in gate.qubit_indices:
if qubit_index in last_gate_on_qubit:
gdg.add_edge(last_gate_on_qubit[qubit_index], index)
last_gate_on_qubit[qubit_index] = index
return gdg

@staticmethod
def _safe_critical_path_length(gdg: nx.DiGraph) -> int:
try:
return nx.dag_longest_path_length(gdg)
except nx.NetworkXError:
return 0

@staticmethod
def _compute_longest_paths(gdg: nx.DiGraph) -> tuple[dict[int, int], dict[int, int]]:
"""Return (longest_to, longest_from) for every node in the DAG.

longest_to[n] = length of the longest path ending at n
longest_from[n] = length of the longest path starting at n
"""
topo_order = list(nx.topological_sort(gdg))
longest_to: dict[int, int] = dict.fromkeys(gdg.nodes, 0)
for node in topo_order:
for successor in gdg.successors(node):
if longest_to[node] + 1 > longest_to[successor]:
longest_to[successor] = longest_to[node] + 1

longest_from: dict[int, int] = dict.fromkeys(gdg.nodes, 0)
for node in reversed(topo_order):
for successor in gdg.successors(node):
if longest_from[successor] + 1 > longest_from[node]:
longest_from[node] = longest_from[successor] + 1

return longest_to, longest_from
Comment thread
elenbaasc marked this conversation as resolved.
Outdated

@staticmethod
def _path_length_stats(longest_to: dict[int, int]) -> tuple[float, float]:
path_lengths = list(longest_to.values())
if not path_lengths:
return 0.0, 0.0
mean_length = sum(path_lengths) / len(path_lengths)
variance = sum((x - mean_length) ** 2 for x in path_lengths) / len(path_lengths)
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
return mean_length, math.sqrt(variance)

@staticmethod
def _critical_path_membership_fraction(
gdg: nx.DiGraph,
longest_to: dict[int, int],
longest_from: dict[int, int],
critical_path_length: int,
n_gates: int,
) -> float:
"""Fraction of gates that lie on some critical path.

A node lies on a critical path iff the longest path through it
(longest_to[n] + longest_from[n]) equals the overall critical path length.
"""
n_in_cp = sum(1 for node in gdg.nodes if longest_to[node] + longest_from[node] == critical_path_length)
return round(n_in_cp / n_gates, 4)

# ------------------------------------------------------------------ #
# Density metrics #
# ------------------------------------------------------------------ #
@staticmethod
def _density_metrics(circuit: Circuit) -> dict[str, Any]:
n_qubits = circuit.qubit_register_size
gate_statements = [s for s in circuit.ir.statements if isinstance(s, Gate)]
n_gates = len(gate_statements)
n_two_qubit_gates = sum(1 for s in gate_statements if isinstance(s, TwoQubitGate))
n_one_qubit_gates = n_gates - n_two_qubit_gates
depth = CircuitAnalyzer._compute_depth(circuit, gate_statements)

# Density score: parallelisation level of the circuit (0..1).
# Formula from Bandic et al. 2025, eq. (1).
if n_qubits > 1 and depth > 1:
density_score = (2 * n_two_qubit_gates + n_one_qubit_gates) / ((depth - 1) * (n_qubits - 1))
density_score = round(min(density_score, 1.0), 4)
else:
density_score = 0.0

# Idling score: average qubit idling fraction (0..1).
if n_qubits > 0 and depth > 0:
qubit_active_layers: dict[int, int] = dict.fromkeys(range(n_qubits), 0)
for gate in gate_statements:
for qubit_index in gate.qubit_indices:
Comment thread
elenbaasc marked this conversation as resolved.
Outdated
qubit_active_layers[qubit_index] = qubit_active_layers.get(qubit_index, 0) + 1
total_idle = sum(depth - active for active in qubit_active_layers.values())
idling_score = round(max(0.0, min(total_idle / (n_qubits * depth), 1.0)), 4)
else:
idling_score = 0.0

return {"density_score": density_score, "idling_score": idling_score}
14 changes: 14 additions & 0 deletions opensquirrel/passes/analyzer/general_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from opensquirrel.circuit import Circuit


class Analyzer(ABC):
def __init__(self, **kwargs: Any) -> None: ...

@abstractmethod
def analyze(self, circuit: Circuit) -> dict[str, Any]: ...
Empty file.
Loading
Loading