# Copyright (C) Unitary Foundation
#
# This source code is licensed under the GPL license (v3) found in the
# LICENSE file in the root directory of this source tree.
"""Defines utilities for efficiently running collections of circuits generated
by error mitigation techniques to compute expectation values."""
import collections
import inspect
import warnings
from collections import Counter
from typing import (
Any,
Callable,
Iterable,
List,
Optional,
Sequence,
Tuple,
Union,
cast,
get_args,
)
import numpy as np
import numpy.typing as npt
from mitiq import QPROGRAM, MeasurementResult, QuantumResult
from mitiq.interface import convert_from_mitiq, convert_to_mitiq
from mitiq.observable.observable import Observable
from mitiq.observable.pauli import PauliString
DensityMatrixLike = [
np.ndarray,
Iterable[np.ndarray], # type: ignore
List[np.ndarray], # type: ignore
Sequence[np.ndarray], # type: ignore
Tuple[np.ndarray],
npt.NDArray[np.complex64],
list[npt.NDArray[np.complex64]],
list[np.ndarray], # type: ignore
tuple[npt.NDArray[np.complex64]],
]
FloatLike = [
None, # Untyped executors are assumed to return floats.
float,
Iterable[float],
List[float],
Sequence[float],
Tuple[float],
list[float],
tuple[float],
]
MeasurementResultLike = [
MeasurementResult,
Iterable[MeasurementResult],
List[MeasurementResult],
Sequence[MeasurementResult],
Tuple[MeasurementResult],
list[MeasurementResult],
tuple[MeasurementResult],
]
[docs]
class Executor:
"""Tool for efficiently scheduling/executing quantum programs and storing
the results.
Args:
executor: A function which inputs a program and outputs a
``mitiq.QuantumResult``, or inputs a sequence of programs and
outputs a sequence of ``mitiq.QuantumResult`` s.
max_batch_size: Maximum number of programs that can be sent in a
single batch (if the executor is batched).
"""
def __init__(
self,
executor: Callable[[Union[QPROGRAM, Sequence[QPROGRAM]]], Any],
max_batch_size: int = 75,
) -> None:
self._executor = executor
executor_annotation = inspect.getfullargspec(executor).annotations
self._executor_return_type = executor_annotation.get("return")
self._max_batch_size = max_batch_size
self._executed_circuits: List[QPROGRAM] = []
self._quantum_results: List[QuantumResult] = []
self._calls_to_executor: int = 0
@property
def can_batch(self) -> bool:
"""Returns True if the executor is recognized as a "batched executor",
else False.
The executor is detected as "batched" if and only if it is annotated
with a return type that is a subclass of ``Iterable``. Common examples
include:
* ``Iterable[QuantumResult]``
* ``List[QuantumResult]``/``list[QuantumResult]``
* ``Sequence[QuantumResult]``
* ``Tuple[QuantumResult]``/``tuple[QuantumResult]``
Otherwise, it is considered "serial".
Batched executors can *run several quantum programs in a single call*.
Returns:
True if the executor is detected as batched, else False.
"""
return_type = self._executor_return_type
if return_type is None:
return False
return return_type in (
BatchedType[T] # type: ignore[index]
for BatchedType in [
Iterable,
List,
Sequence,
Tuple,
list,
tuple,
collections.abc.Sequence,
]
for T in get_args(QuantumResult)
)
@property
def executed_circuits(self) -> List[QPROGRAM]:
return self._executed_circuits
@property
def quantum_results(self) -> List[QuantumResult]:
return self._quantum_results
@property
def calls_to_executor(self) -> int:
return self._calls_to_executor
[docs]
def evaluate(
self,
circuits: Union[QPROGRAM, List[QPROGRAM]],
observable: Optional[Observable] = None,
force_run_all: bool = True,
**kwargs: Any,
) -> List[float]:
"""Returns the expectation value Tr[ρ O] for each circuit in
``circuits`` where O is the observable provided or implicitly defined
by the ``executor``. (The observable is implicitly defined when the
``executor`` returns float(s).)
All executed circuits are stored in ``self.executed_circuits``, and all
quantum results are stored in ``self.quantum_results``.
Args:
circuits: A single circuit or list of circuits.
observable: Observable O in the expression Tr[ρ O]. If None,
the ``executor`` must return a float (which corresponds to
Tr[ρ O] for a specific, fixed observable O).
force_run_all: If True, force every circuit in the input sequence
to be executed (if some are identical). Else, detects identical
circuits and runs a minimal set.
Returns:
List of real valued expectation values.
"""
if not isinstance(circuits, List):
circuits = [circuits]
warn_non_hermitian = False
if observable:
if isinstance(observable, PauliString):
if observable.coeff.imag > 0.0001:
warn_non_hermitian = True
elif isinstance(observable, Observable):
if any(
pauli.coeff.imag > 0.0001 for pauli in observable._paulis
):
warn_non_hermitian = True
if warn_non_hermitian:
warnings.warn(
"Expected observable to be hermitian. Continue with caution."
)
# Check executor and observable compatability with type hinting
# If FloatLike is specified as a return and observable is used
if self._executor_return_type in FloatLike and observable is not None:
# Type hinted as FloatLike and observable passed
if self._executor_return_type is not None:
raise ValueError(
"When using an executor which returns a float-like "
"result, measurements should be added before the circuit "
"is executed instead of with an observable."
)
else:
# Using an observable but no type hinting
raise ValueError(
"When using an observable, the return type of the "
"executor must be specified using typehinting."
)
elif observable is None:
# Type hinted as DensityMatrixLike but no observable is set
if self._executor_return_type in DensityMatrixLike:
raise ValueError(
"When using a density matrix result, an observable "
"is required."
)
# Type hinted as MeasurementResulteLike but no observable is set
elif self._executor_return_type in MeasurementResultLike:
raise ValueError(
"When using a measurement, or bitstring, like result, an "
"observable is required."
)
# Get all required circuits to run.
if (
observable is not None
and self._executor_return_type in MeasurementResultLike
):
all_circuits = [
circuit_with_measurements
for circuit in circuits
for circuit_with_measurements in observable.measure_in(circuit)
]
result_step = observable.ngroups
else:
all_circuits = circuits
result_step = 1
# Run all required circuits.
all_results = self.run(all_circuits, force_run_all, **kwargs)
# Parse the results.
if self._executor_return_type in FloatLike:
results = np.real_if_close(
cast(Sequence[float], all_results)
).tolist()
elif self._executor_return_type in DensityMatrixLike:
observable = cast(Observable, observable)
all_results = cast(List[npt.NDArray[np.complex64]], all_results)
results = [
observable._expectation_from_density_matrix(density_matrix)
for density_matrix in all_results
]
elif self._executor_return_type in MeasurementResultLike:
observable = cast(Observable, observable)
all_results = cast(List[MeasurementResult], all_results)
results = [
observable._expectation_from_measurements(
all_results[i : i + result_step]
)
for i in range(len(all_results) // result_step)
]
else:
raise ValueError(
f"Could not parse executed results from executor with type "
f"{self._executor_return_type}."
)
return results
[docs]
def run(
self,
circuits: Union[QPROGRAM, Sequence[QPROGRAM]],
force_run_all: bool = True,
**kwargs: Any,
) -> Sequence[QuantumResult]:
"""Runs all input circuits using the least number of possible calls to
the executor.
Args:
circuits: Circuit or sequence thereof to execute with the executor.
force_run_all: If True, force every circuit in the input sequence
to be executed (if some are identical). Else, detects identical
circuits and runs a minimal set.
"""
if not isinstance(circuits, Sequence):
circuits = [circuits]
start_result_index = len(self._quantum_results)
if force_run_all:
to_run = circuits
else:
# Make circuits hashable.
# Note: Assumes all circuits are the same type.
# TODO: Bug! These conversions to/from Mitiq are not safe in that,
# e.g., they do not preserve classical register structure in
# Qiskit circuits, potentially causing executed results to be
# incorrect. Safe conversions should follow the logic in
# mitiq.interface.noise_scaling_converter.
_, conversion_type = convert_to_mitiq(circuits[0])
hashable_circuits = [
convert_to_mitiq(circ)[0].freeze() for circ in circuits
]
# Get the unique circuits and counts
collection = Counter(hashable_circuits)
to_run = [
convert_from_mitiq(circ.unfreeze(), conversion_type)
for circ in collection.keys()
]
if not self.can_batch:
for circuit in to_run:
self._call_executor(circuit, **kwargs)
else:
stop = len(to_run)
step = self._max_batch_size
for i in range(int(np.ceil(stop / step))):
batch = to_run[i * step : (i + 1) * step]
self._call_executor(batch, **kwargs)
results = self._quantum_results[start_result_index:]
if not force_run_all:
# Expand computed results to all results using counts.
results_dict = dict(zip(collection.keys(), results))
results = [results_dict[key] for key in hashable_circuits]
return self._post_run(results)
def _post_run(
self, results: Sequence[QuantumResult]
) -> Sequence[QuantumResult]:
"""Post-processes the measurement results.
For example, this method can be overridden by a
readout error mitigation function.
"""
return results
def _call_executor(
self, to_run: Union[QPROGRAM, Sequence[QPROGRAM]], **kwargs: Any
) -> None:
"""Calls the executor on the input circuit(s) to run. Stores the
executed circuits in ``self._executed_circuits`` and the quantum
results in ``self._quantum_results``.
Args:
to_run: Circuit(s) to run.
"""
result = self._executor(to_run, **kwargs)
self._calls_to_executor += 1
if self.can_batch:
self._quantum_results.extend(result)
self._executed_circuits.extend(to_run)
else:
self._quantum_results.append(result)
self._executed_circuits.append(to_run)