Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Construct k-local ProductDual instances from 1-local ProductPOVM instances. #105

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
# (C) Copyright IBM 2024.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""dual_from_empirical_frequencies."""

from __future__ import annotations

from copy import deepcopy
from typing import cast

import numpy as np
from qiskit.quantum_info import DensityMatrix

from povm_toolbox.post_processor.povm_post_processor import POVMPostProcessor
from povm_toolbox.quantum_info import ProductDual, ProductPOVM
from povm_toolbox.quantum_info.base import BaseDual


class SparseArray:
def __init__(self, shape: tuple[int, ...]):
self._array = dict()
self._shape = shape

def __getitem__(self, key: tuple[int, ...]):
return self._array.get(key, 0.0)

def __setitem__(self, key: tuple[int, ...], value: float):
if len(key) != len(self._shape):
raise KeyError(
f"Index length ({len(key)}) does not match the dimension ({len(self._shape)}) of the array"
)
for i, n_max in zip(key, self._shape):
if i < 0 or i >= n_max:
raise KeyError(f"Index {key} has some elements out of range.")
self._array[key] = value

def __repr__(self) -> str:
return self._array.__repr__()


class MutualInformationOptimizer:
def __init__(self, outcome_counts, outcome_dims, subsystem_sizes):
self.counts = outcome_counts
self._outcome_dims = outcome_dims
self._subsystem_sizes = subsystem_sizes

def mutual_info(self, subset):
mask = np.asarray(
[idx in subset for idx in range(len(self._outcome_dims))],
dtype=np.bool,
)

shots = sum(self.counts.values())
marginals = [SparseArray(self._outcome_dims[mask]), SparseArray(self._outcome_dims[~mask])]
for outcome, count in self.counts.items():
outcome_arr = np.asarray(outcome)
marginals[0][tuple(outcome_arr[mask])] += count / shots
marginals[1][tuple(outcome_arr[~mask])] += count / shots

mut_info = 0.0
for outcome, count in self.counts.items():
outcome_arr = np.asarray(outcome)
mut_info += (
count
/ shots
* np.log(
count
/ shots
/ (
marginals[0][tuple(outcome_arr[mask])]
* marginals[1][tuple(outcome_arr[~mask])]
)
)
)
return mut_info

def total_mi(self, partition):
total_mi = 0.0
for subset in partition:
total_mi += self.mutual_info(subset)
return total_mi

def subsystem_size(self, subset):
k = 0
for i in subset:
k += self._subsystem_sizes[i]
return k

def greedy_combine(self, partition, k_max):
argmin = None
min_value = self.total_mi(partition)
for i in range(len(partition)):
for j in range(i + 1, len(partition)):
if self.subsystem_size(partition[i] | partition[j]) <= k_max:
test_partition = deepcopy(partition)
test_partition.remove(partition[i])
test_partition.remove(partition[j])
test_partition.append(partition[i] | partition[j])
mut_info = self.total_mi(test_partition)
if mut_info < min_value:
min_value = mut_info
argmin = test_partition
return argmin

def greedy_search(self, k_max):
partition = [{i} for i in range(len(self._outcome_dims))]
argmin = partition
while argmin is not None:
argmin = self.greedy_combine(partition, k_max)
partition = argmin if argmin is not None else partition
return partition


def dual_from_k_local_empirical_frequencies(
povm_post_processor: POVMPostProcessor,
*,
loc: int | tuple[int, ...] | None = None,
bias: int | None = None,
k_max: int = 1,
) -> BaseDual:
"""TODO (Return the k-local Dual frame of ``povm`` based on the frequencies of the sampled outcomes.)
Given outcomes sampled from a :class:`.ProductPOVM`, each local Dual frame is parametrized with
the alpha-parameters set as the marginal outcome frequencies. For stability, the (local)
empirical frequencies can be biased towards the (marginal) outcome probabilities of an
``ansatz`` state.
Args:
povm_post_processor: the :class:`.POVMPostProcessor` object from which to extract the
:attr:`.POVMPostProcessor.povm` and the empirical frequencies to build the Dual frame.
loc: index of the results to use. This is relevant if multiple sets of parameter values were
supplied to the sampler in the same Pub. If ``None``, it is assumed that the supplied
circuit was not parametrized or that a unique set of parameter values was supplied. In
this case, ``loc`` is trivially set to 0.
k_max: TODO.
Raises:
NotImplementedError: if :attr:`.POVMPostProcessor.povm` is not a :class:`.ProductPOVM`
instance.
ValueError: if ``loc`` is ``None`` and :attr:`.POVMPostProcessor.counts` stores more than a
single counter (i.e., multiple sets of parameter values were supplied to the sampler in
a single Pub).
Returns:
TODO (The Dual frame based on the empirical outcome frequencies from the post-processed result.)
"""
povm = povm_post_processor.povm
if not isinstance(povm, ProductPOVM):
raise NotImplementedError("This method is only implemented for `ProductPOVM` objects.")

if loc is None:
if povm_post_processor.counts.shape == (1,):
loc = (0,)
else:
raise ValueError(
"`loc` has to be specified if the POVM post-processor stores"
" more than one counter (i.e., if multiple sets of parameter"
" values were supplied to the sampler in a single pub). The"
f" array of counters is of shape {povm_post_processor.counts.shape}."
)
counts = povm_post_processor.counts[loc]

povm_shape = np.asarray(povm.shape)
povm_dims = [list(povm._frames.values())[i].num_subsystems for i in range(len(povm._frames))]
optimizer = MutualInformationOptimizer(counts, povm_shape, subsystem_sizes=povm_dims)
partition = optimizer.greedy_search(k_max)
partition = [list(subset) for subset in partition]
povm_grouped = povm.group(partition)

marginals = [np.zeros(np.prod(povm_shape[np.asarray(sub_system)])) for sub_system in partition]

# Computing marginals
shots = sum(counts.values())
for outcome, count in counts.items():
outcome_arr = np.asarray(outcome)
for i, subset in enumerate(partition):
mask = np.asarray([idx in subset for idx in range(len(povm_shape))], dtype=np.bool)
sub_outcome = np.ravel_multi_index(outcome_arr[mask], povm_shape[mask])
marginals[i][sub_outcome] += count / shots

alphas = []
# Computing alphas for each subsystem
for i, sub_system in enumerate(povm_grouped.sub_systems):
sub_povm = povm_grouped[sub_system]
dim = sub_povm.dimension
ansatz_state = DensityMatrix(np.eye(dim) / dim)
sub_bias = bias or sub_povm.num_outcomes

sub_alphas = shots * marginals[i] + sub_bias * cast(
np.ndarray, sub_povm.get_prob(ansatz_state)
)

alphas.append(tuple(sub_alphas / (shots + sub_bias)))

# Building ProductDual from frequencies
return ProductDual.build_dual_from_frame(povm_grouped, alphas=tuple(alphas))
2 changes: 1 addition & 1 deletion povm_toolbox/quantum_info/multi_qubit_dual.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def build_dual_from_frame(
# Convert dual operators from double-ket to operator representation.
dual_operators = [Operator(double_ket_to_matrix(op)) for op in dual_operators_array.T]

return cls(dual_operators)
return cls(dual_operators, shape=frame.shape)

# We could build a ``MultiQubitDual`` instance (i.e. joint dual frame) that
# is a dual frame to a ``ProductFrame``, but we have not implemented this yet.
Expand Down
3 changes: 2 additions & 1 deletion povm_toolbox/quantum_info/multi_qubit_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ def pauli_operators(self) -> list[dict[str, complex]]:
if self._pauli_operators is None:
try:
self._pauli_operators = [
dict(SparsePauliOp.from_operator(op).label_iter()) for op in self.operators
dict(SparsePauliOp.from_operator(op, atol=1e-20, rtol=1e-20).label_iter())
for op in self.operators
]
except QiskitError as exc:
raise QiskitError(
Expand Down
12 changes: 10 additions & 2 deletions povm_toolbox/quantum_info/product_dual.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,23 @@ def is_dual_to(self, frame: BaseFrame) -> bool:
# (1,2) but ``frame`` on (0,1) and (2,). ``self`` could still be a valid dual frame but
# we have not implemented the check for this. Then we should raise an
# NotImplementedError.
return True
raise NotImplementedError

@override
@classmethod
def build_dual_from_frame(
cls, frame: BaseFrame, alphas: tuple[tuple[float, ...] | None, ...] | None = None
cls,
frame_supplied: BaseFrame,
alphas: tuple[tuple[float, ...] | None, ...] | None = None,
indices_grouping: list[list[tuple[int, ...]]] | None = None,
) -> ProductDual:
dual_operators = {}
if isinstance(frame, ProductFrame):
if isinstance(frame_supplied, ProductFrame):
if indices_grouping is not None:
frame = frame_supplied.group(partition=indices_grouping)
else:
frame = frame_supplied
if alphas is None:
alphas = len(frame.sub_systems) * (None,)
elif len(alphas) != len(frame.sub_systems):
Expand Down
73 changes: 73 additions & 0 deletions povm_toolbox/quantum_info/product_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from .base import BaseFrame
from .multi_qubit_frame import MultiQubitFrame
from .multi_qubit_povm import MultiQubitPOVM

T = TypeVar("T", bound=MultiQubitFrame)

Expand Down Expand Up @@ -353,3 +354,75 @@ def analysis(
if isinstance(frame_op_idx, tuple):
return self._trace_of_prod(hermitian_op, frame_op_idx)
raise TypeError("Wrong type for ``frame_op_idx``.")

def _tensor_product(
self, indices: list[tuple[int, ...]]
) -> tuple[tuple[int, ...], list[Operator]]:
"""Explicitly construct the tensor product of some local frames.
Args:
indices: list of tuples specifying the local frames whose operators are to be tensor-
multiplied.
Returns:
The subsystem index and the list of tensor product operators which act upon it.
"""
shape = tuple(self[subsystem_label].num_operators for subsystem_label in indices)
subset = list(idx for subsystem_label in indices for idx in subsystem_label)
joint_operators = np.empty(shape, dtype=object)
for outcome in np.ndindex(shape):
operator = self[indices[0]][outcome[0]]
for i in range(1, len(outcome)):
operator = Operator(np.kron(operator.data, self[indices[i]][outcome[i]].data))
joint_operators[outcome] = operator
return tuple(subset), joint_operators.flatten(order="C")

def group(self, partition: list[list[tuple[int, ...]]]) -> ProductFrame:
"""Group some local frames together into a single multi-qubit frame representation.
Args:
partition: partition of product frame, where all local frames are grouped into different
subsets. Each such subset is specified as a list of integer tuples, each of which
specifies a local frame. All local frames in a given subset are grouped together and
then represented by a single multi-qubit frame (we lose track of the product
structure within the set).
Returns:
A new ``ProductFrame`` instance representing ``self`` as specified by ``partition``.
Raises:
ValueError: if a subset contains twice the same index.
ValueError: if an index is in two different subsets.
ValueError: if ``partition`` does not exactly cover all subsystems.
"""
keys = list(self._frames.keys())
key_partition = [[keys[i] for i in subset] for subset in partition]
# Check that ``partition`` is indeed a partition:
subsystem_indices = set()
for subset in key_partition:
if len(subset) != len(set(subset)):
raise ValueError(
"A subsystem index must not be specified more than once in a subset. However,"
f" the subset {subset} has duplicate elements."
)
if any(i in subsystem_indices for i in subset):
raise ValueError(
"The subsets specifying the partition must be mutually exclusive. However, at"
f" least one of the indices in '{subset}' was already encountered before."
)
subsystem_indices.update(set(subset))
if subsystem_indices != set(self._frames.keys()):
raise ValueError(
"The partition must cover exactly all the subsystems. However, this condition is"
f" not fulfilled by {key_partition}."
)

frames = {}
for set_indices in key_partition:
idx, joint_operators = self._tensor_product(set_indices)
shape = tuple(s for subsystem_label in set_indices for s in self[subsystem_label].shape)
frames[idx] = MultiQubitPOVM(joint_operators, shape=shape)
return type(self)(frames=frames)

def group_all(self) -> MultiQubitFrame:
"""Group all local frames together into a single multi-qubit frame representation.
Returns:
A new ``MultiQubitFrame`` instance representing ``self``.
"""
grouped_frame = self.group(partition=[list(range(len(self._frames)))])
return grouped_frame[grouped_frame.sub_systems[0]]
Loading