diff --git a/glue/generalized_sim/circuit_simulation.py b/glue/generalized_sim/circuit_simulation.py new file mode 100644 index 00000000..e78546de --- /dev/null +++ b/glue/generalized_sim/circuit_simulation.py @@ -0,0 +1,451 @@ +import os +import time +from itertools import product +from typing import TypeAlias, Optional, Union +import random + +import numpy as np +import numpy.typing as npt +import sinter +import stim +from panqec.codes import StabilizerCode, Color666PlanarCode + +from pauli_sum import PauliSum +from utils import stringify, str_to_bsf, bsf_to_str, isclose +from logger import Logger +from circuits import generate_circuit +from gates import propagation + +PauliBSF: TypeAlias = npt.NDArray[np.uint8] +BinaryVect: TypeAlias = npt.NDArray[np.uint8] +WeightedPauliBSF: TypeAlias = tuple[float, PauliBSF] + +np.random.seed(0) + + +logical_I = PauliSum({'I': 1}) +logical_H = PauliSum({'X': np.sqrt(2), 'Z': np.sqrt(2)}) +logical_X = PauliSum({'X': 1}) + +direction_dict = { + 'DEPOLARIZE1': tuple([1/3 for _ in range(3)]), + 'DEPOLARIZE2': tuple([1/15 for _ in range(15)]), + 'X_ERROR': (1, 0, 0), + 'Y_ERROR': (0, 1, 0), + 'Z_ERROR': (0, 0, 1), +} + + +def bs_product(matrix: npt.NDArray[np.uint8], pauli: npt.NDArray[np.uint8]) -> bool: + """ + Calculate the syndrome of a Pauli vector with respect to a matrix, + both given in the bsf format. + """ + n = matrix.shape[1] // 2 + + matrix_X = matrix[:, :n] + matrix_Z = matrix[:, n:] + pauli_X = pauli[:n] + pauli_Z = pauli[n:] + + prod = (matrix_X.dot(pauli_Z) + matrix_Z.dot(pauli_X)) % 2 + + return prod + + +class CircuitSimulation: + def __init__( + self, + code: StabilizerCode, + circuit: stim.Circuit, + global_measured_op: str = 'H', + logger=None + ): + if logger is None: + self.logger = Logger('log.txt', stdout_print=False) + else: + self.logger = logger + + self.code = code + self.circuit = circuit + self.n_qubits_circuit = circuit.num_qubits + self.global_measured_op = global_measured_op + + self.Hx = self.code.Hx.toarray() + self.Hz = self.code.Hz.toarray() + self.Lx = self.code.logicals_x[0][:self.code.n] + self.Lz = self.code.logicals_z[0][self.code.n:] + + def generate_error( + self, + positions: list[int], + p: float, + direction: tuple[float] + ) -> PauliSum: + if len(direction) == 3: + restricted_error = random.choices( + ['I', 'X', 'Y', 'Z'], + k=len(positions), + weights=[1-p, p * direction[0], p * direction[1], p * direction[2]] + ) + else: + restricted_error = ''.join(random.choices( + list(map(''.join, list(product(['I', 'X', 'Y', 'Z'], repeat=2)))), + k=len(positions)//2, + weights=[1-p] + [p/15] * 15 + )) + + error = ['I'] * self.n_qubits_circuit + for i, pos in enumerate(positions): + error[pos] = restricted_error[i] + + return PauliSum({''.join(error): 1}) + + def get_syndrome(self, error_bsf: npt.NDArray) -> str: + """Measure the syndrome of a given error using the stabilizer matrices""" + n = self.code.n + + syndrome = ( + stringify(self.Hx@error_bsf[n:] % 2) + + stringify(self.Hz@error_bsf[:n] % 2) + ) + + return syndrome + + # @profile + def logical_errors(self, error_bsf: npt.NDArray) -> str: + """Measure the syndrome of a given error using the stabilizer matrices""" + n = self.code.n + + logical_syndrome = [self.Lz@error_bsf[:n] % 2, self.Lx@error_bsf[n:] % 2] + + return logical_syndrome + + # @profile + def project( + self, + meas_positions: list, + syndrome: str, + error: PauliSum, + pauli_type: str = 'Z' + ) -> PauliSum: + projected_op = PauliSum() + + for pauli, coeff in error: + commutation = stringify([ + int(pauli[i] not in [pauli_type, 'I']) + for i in meas_positions + ]) + if commutation == syndrome: + projected_op[pauli] = coeff + + return projected_op + + # @profile + def decompose_error(self, error: PauliSum): + """Decompose error into Pauli error times logical gate, assuming + all the terms in the error have the same syndrome + """ + + first_pauli, coeff = list(error.items())[0] + pauli_error = PauliSum({first_pauli: 1}) + factorized_error = pauli_error * error + logical_error = self.physical_to_logical(factorized_error) + + return pauli_error, logical_error + + # @profile + def physical_to_logical(self, pauli_sum: PauliSum) -> PauliSum: + """Convert a physical error to a logical error using the code's stabilizer matrix.""" + logical_terms = dict() + + for pauli, coeff in pauli_sum.items(): + logical_pauli = bsf_to_str( + self.logical_errors( + str_to_bsf(pauli[:self.code.n]) + ) + ) + logical_terms[logical_pauli] = logical_terms.get(logical_pauli, 0) + coeff + + return PauliSum(logical_terms) + + # @profile + def measurement_distribution( + self, + meas_positions: list, + error: PauliSum, + is_H_meas: bool = False, + pauli_type: str = 'Z' + ) -> tuple[dict[str, float], list[PauliSum]]: + new_error: dict[PauliSum] = dict() + syndrome_prob: dict[float] = dict() + + possible_syndromes = set([ + stringify([ + int(pauli[i] != pauli_type and pauli[i] != 'I') + for i in meas_positions + ]) + for pauli in error.keys() + ]) + + for s in possible_syndromes: + self.logger.print(f"Syndrome: {s}") + syndrome_prob[s] = 0 + + new_error[s] = self.project(meas_positions, s, error, pauli_type) + self.logger.print(f"New error: {new_error[s]}") + + # If H measurement, we cannot rely on the fact that all terms + # in the Pauli sum have the same syndrome + if is_H_meas: + sandwiched_op: PauliSum = error.dag() * new_error[s] + self.logger.print(f"Initial sandwiched op: {sandwiched_op}") + simplified_sandwiched_op = PauliSum() + for pauli, coeff in sandwiched_op: + restricted_pauli_bsf = str_to_bsf(pauli[:self.code.n]) + if not ('1' in self.get_syndrome(restricted_pauli_bsf)): + simplified_sandwiched_op[pauli] = coeff + sandwiched_op = simplified_sandwiched_op + self.logger.print(f"Final sandwiched op: {sandwiched_op}") + else: + sandwiched_op: PauliSum = new_error[s].dag() * new_error[s] + self.logger.print(f"Sandwiched op: {sandwiched_op}") + + sandwiched_op_log = self.physical_to_logical(sandwiched_op) + self.logger.print(f"Logical sandwiched op: {sandwiched_op_log}") + + for pauli, coeff in sandwiched_op_log: + if pauli == 'I': + syndrome_prob[s] += coeff + elif pauli in ['X', 'Z']: + syndrome_prob[s] += coeff / np.sqrt(2) + + if not isclose(abs(syndrome_prob[s]), syndrome_prob[s]): + raise ValueError( + f"Syndrome probability {syndrome_prob[s]} is not a positive real number" + ) + + syndrome_prob[s] = abs(syndrome_prob[s]) + + sum_prob = sum(list(syndrome_prob.values())) + + if sum_prob == 0: + raise ValueError( + f"The syndrome probabilities are all zero." + f"\nPossible syndromes {syndrome_prob}" + ) + + for s in syndrome_prob.keys(): + syndrome_prob[s] /= sum_prob + + return syndrome_prob, new_error + + # @profile + def measure( + self, + meas_positions: list, + error: PauliSum, + is_H_meas: bool = False + ) -> tuple[dict[str, float], list[PauliSum]]: + possible_syndromes, possible_propagations = self.measurement_distribution( + meas_positions, error, is_H_meas + ) + + selected_syndrome = random.choices( + list(possible_syndromes.keys()), + weights=list(possible_syndromes.values()) + )[0] + + self.logger.print(f"Possible syndromes: {possible_syndromes}") + self.logger.print(f"Possible propagations\n {possible_propagations}") + self.logger.print(f"Selected syndrome: {selected_syndrome}") + + norm = np.sqrt(possible_syndromes[selected_syndrome]) + new_error = possible_propagations[selected_syndrome] / norm + + self.logger.print(f"New error: {new_error}") + + return selected_syndrome, new_error + + # @profile + def run_once(self): + self.logger.print("-" * 40 + " Run " + "-" * 40) + + result = {'accept': False, 'fail': False} + + error = PauliSum(self.n_qubits_circuit) + # logical_error = PauliSum(1) + for layer in self.circuit: + self.logger.print('\n' + '=' * 50) + self.logger.print(layer) + self.logger.print('=' * 50) + + if layer.name in ['TICK', 'R', 'QUBIT_COORDS', 'DETECTOR']: + pass + elif layer.name in ['DEPOLARIZE1', 'DEPOLARIZE2', 'X_ERROR', 'Y_ERROR', 'Z_ERROR']: + new_error = self.generate_error( + list(map(lambda t: t.value, layer.targets_copy())), + layer.gate_args_copy()[0], + direction_dict[layer.name] + ) + # new_pauli = 'IIIIZXX' if len(error.keys()) == 1 else 'IIXXZIX' + # new_error = PauliSum({new_pauli + 'I'*13: 1}) + + self.logger.print(f"New error: {new_error}") + + error = new_error * error + elif layer.name == 'MR': + syndrome, error = self.measure( + list(map(lambda t: t.value, layer.targets_copy())), + error, + (layer.tag == 'H') + ) + + if '1' in syndrome: + return result + + error = error.restrict_up_to(self.code.n) + + # if layer.tag != 'H': + # if '1' in self.get_syndrome(str_to_bsf(list(error.keys())[0][:self.code.n])): + # error, new_logical_error = self.decompose_error(error) + # logical_error *= new_logical_error + # else: + # logical_error = self.physical_to_logical(error) + # error = PauliSum(self.n_qubits_circuit) + + # self.logger.print(f"Logical error: {logical_error}") + else: + propagated_error = propagation(layer, error) + + self.logger.print(f"Propagated error {error}") + + # if len(error.keys()) > 1: + # raise ValueError( + # f"The final error must be a Pauli error. Current error: {error}" + # ) + + restricted_error_bsf = str_to_bsf(list(error.keys())[0][:self.code.n]) + + if '1' in self.get_syndrome(restricted_error_bsf): + raise ValueError( + f"The final error must be in the codespace. Current error: {error}" + ) + + # logical_error *= self.physical_to_logical(error) + logical_error = self.physical_to_logical(error) + + self.logger.print(f"Final physical error {error}") + self.logger.print(f"Final logical error {logical_error}") + + # print(f"Logical error {logical_error}") + + result['accept'] = True + + if not ( + logical_error.proportional_to(logical_I) or + (self.global_measured_op == 'H' and logical_error.proportional_to(logical_H)) or + (self.global_measured_op == 'X' and logical_error.proportional_to(logical_X)) + ): + result['fail'] = True + self.logger.print("Failure") + self.logger.print(f"Logical error {logical_error}") + + self.logger.print("Terminates") + return result + + +class Sampler(sinter.Sampler): + def compiled_sampler_for_task(self, task: sinter.Task) -> sinter.CompiledSampler: + n_layers = task.json_metadata['n_layers'] + p_phys = task.json_metadata['p_phys'] + p_meas = task.json_metadata['p_meas'] + replace_T_with_S = task.json_metadata['replace_T_with_S'] + L = task.json_metadata['L'] + + return CompiledSampler(n_layers, p_phys, p_meas, replace_T_with_S, L) + + +class CompiledSampler(sinter.CompiledSampler): + def __init__(self, n_layers, p_phys, p_meas, replace_T_with_S, L): + self.n_layers = n_layers + self.p_phys = p_phys + self.p_meas = p_meas + self.L = L + + logger = Logger('log.txt', logging=False, stdout_print=False) + code = Color666PlanarCode(L) + circuit = generate_circuit( + code, + n_rounds=n_layers, + p_phys=p_phys, + p_meas=p_meas, + noise_model='pheno', + replace_T_with_S=replace_T_with_S + ) + global_measured_op = 'X' if replace_T_with_S else 'H' + + self.sim = CircuitSimulation( + code, circuit, global_measured_op=global_measured_op, logger=logger + ) + + def sample(self, shots: int) -> sinter.AnonTaskStats: + t0 = time.monotonic() + n_accept, n_fail = 0, 0 + for k in range(shots): + run = self.sim.run_once() + n_fail += int(run['fail']) + n_accept += int(run['accept']) + + t1 = time.monotonic() + + return sinter.AnonTaskStats( + shots=shots, + errors=n_fail, + discards=shots - n_accept, + seconds=t1-t0 + ) + + +if __name__ == "__main__": + max_shots = int(1e9) + max_errors = 2000 + p_phys_list = [0.01, 0.02, 0.05, 0.1, 0.15, 0.2] + n_layers_list = [1] + L_list = [2] + replace_T_with_S_list = [True, False] + + parameters = product( + p_phys_list, n_layers_list, replace_T_with_S_list, L_list + ) + + tasks = [] + for p_phys, n_layers, replace_T_with_S, L in parameters: + tasks.append(sinter.Task(circuit=stim.Circuit(), json_metadata={ + 'n_layers': n_layers, + 'p_phys': p_phys, + 'p_meas': np.sqrt(p_phys), + 'replace_T_with_S': replace_T_with_S, + 'L': L + })) + + state = sinter.collect( + print_progress=True, + save_resume_filepath='result-circuit-2.csv', + tasks=tasks, + max_shots=max_shots, + max_errors=max_errors, + num_workers=os.cpu_count(), + decoders=['simulation'], + custom_decoders={'simulation': Sampler()} + ) + + +# if __name__ == "__main__": +# code = Color666PlanarCode(1) +# logger = Logger('log.txt', logging=False, stdout_print=False) + +# sim = Simulation(code, logger=logger) + +# for i in range(100): +# sim.run_once() \ No newline at end of file diff --git a/glue/generalized_sim/circuits.py b/glue/generalized_sim/circuits.py new file mode 100644 index 00000000..e04a774f --- /dev/null +++ b/glue/generalized_sim/circuits.py @@ -0,0 +1,227 @@ +import stim +import numpy as np +from panqec.codes import StabilizerCode + + +def generate_circuit_measure_all( + code: StabilizerCode, + n_rounds=1, + p_phys=0.1, + p_meas=0.1, + noise_model="pheno", + replace_T_with_S=False, + y_state_init=True +) -> stim.Circuit: + """ + Generate a circuit for the given stabilizer code. + + Args: + code (StabilizerCode): The stabilizer code to use. + n_rounds (int): The number of rounds of error correction. + p_phys (float): The physical error rate. + p_meas (float): The measurement error rate. + noise_model (str): The noise model to use. Can be "pheno" or "full". + replace_T_with_S (bool): If True, replace T gates with S gates. + Returns: + stim.Circuit: The generated circuit. + """ + + if noise_model not in ["pheno", "full"]: + raise ValueError("noise_model must be 'pheno' or 'full'") + + circuit = stim.Circuit() + data_qubit_indices = list(range(code.n)) + data_qubit_indices_part1 = [1, 3, 5, 6] + data_qubit_indices_part2 = [0, 2, 4] + n_z = code.Hz.shape[0] + z_stabilizer_indices = list(range(code.n, code.n + n_z)) + x_stabilizer_indices = list( + range(code.n + n_z, code.n + code.n_stabilizers) + ) + h_meas_index = code.n + code.n_stabilizers + + if y_state_init: + # Initialize the data qubits in the Y state + circuit.append("H", data_qubit_indices) + circuit.append("S", data_qubit_indices_part1) + circuit.append("S_DAG", data_qubit_indices_part2) + + def h_measurement(noisy=True): + if replace_T_with_S: + circuit.append("SQRT_Y_DAG", data_qubit_indices) + else: + circuit.append("I", data_qubit_indices, tag="Tdag") + + for data_qubit in data_qubit_indices: + circuit.append("CNOT", [data_qubit, h_meas_index]) + + if replace_T_with_S: + circuit.append("SQRT_Y", data_qubit_indices) + else: + circuit.append("I", data_qubit_indices, tag="T") + + if noisy: + circuit.append("X_ERROR", [h_meas_index], p_meas) + circuit.append("MR", [h_meas_index], tag='H') + + + def x_syndrome_extraction(noisy=True): + circuit.append("H", x_stabilizer_indices) + for x_stab in range(code.Hx.shape[0]): + qubits = code.Hx[x_stab].nonzero()[1] + for qubit in qubits: + circuit.append("CNOT", [code.n + n_z + x_stab, qubit]) + circuit.append("H", x_stabilizer_indices) + if noisy: + circuit.append("X_ERROR", x_stabilizer_indices, p_meas) + + def z_syndrome_extraction(noisy=True): + for z_stab in range(n_z): + qubits = code.Hz[z_stab].nonzero()[1] + for qubit in qubits: + circuit.append("CNOT", [qubit, code.n + z_stab]) + if noisy: + circuit.append("X_ERROR", z_stabilizer_indices, p_meas) + + for i_round in range(n_rounds): + noisy = (i_round != n_rounds - 1) + + circuit.append("DEPOLARIZE1", data_qubit_indices, p_phys) + x_syndrome_extraction(noisy) + if i_round > 0 or y_state_init: + z_syndrome_extraction(noisy) + + circuit.append("MR", z_stabilizer_indices + x_stabilizer_indices) + circuit.append("DEPOLARIZE1", data_qubit_indices, p_phys) + + # if i_round == 0: + # circuit.append("CNOT", [1, 3]) + # circuit.append("CNOT", [5, 3]) + # circuit.append("I", [3], tag="T") + # circuit.append("CNOT", [5, 3]) + # circuit.append("CNOT", [1, 3]) + h_measurement(noisy) + + return circuit + + +def generate_circuit_cn( + code: StabilizerCode, + n_rounds=1, + p_phys=0.1, + p_meas=0.1, + noise_model="pheno", + replace_T_with_S=False, +) -> stim.Circuit: + """ + Generate the Chamberland-Noh H state preparation circuit + + Args: + code (StabilizerCode): The stabilizer code to use. + n_rounds (int): The number of rounds of error correction. + p_phys (float): The physical error rate. + p_meas (float): The measurement error rate. + noise_model (str): The noise model to use. Can be "pheno" or "full". + replace_T_with_S (bool): If True, replace T gates with S gates. + Returns: + stim.Circuit: The generated circuit. + """ + + if noise_model not in ["pheno", "full"]: + raise ValueError("noise_model must be 'pheno' or 'full'") + + circuit = stim.Circuit() + data_qubit_indices = list(range(code.n)) + n_z = code.Hz.shape[0] + n_x = code.Hx.shape[0] + z_stabilizer_indices = list(range(code.n, code.n + n_z)) + x_stabilizer_indices = list( + range(code.n + n_z, code.n + code.n_stabilizers) + ) + weight2_x_meas_index = code.n + code.n_stabilizers + weight2_z_meas_index = code.n + code.n_stabilizers + 1 + h_meas_index = code.n + code.n_stabilizers + 2 + + def h_measurement(noisy=True): + if replace_T_with_S: + circuit.append("SQRT_Y_DAG", data_qubit_indices) + else: + circuit.append("I", data_qubit_indices, tag="Tdag") + + for data_qubit in data_qubit_indices: + circuit.append("CNOT", [data_qubit, h_meas_index]) + + if replace_T_with_S: + circuit.append("SQRT_Y", data_qubit_indices) + else: + circuit.append("I", data_qubit_indices, tag="T") + + if noisy: + circuit.append("X_ERROR", [h_meas_index], p_meas) + circuit.append("MR", [h_meas_index], tag='H') + + + def x_syndrome_extraction(noisy=True, subindices=None): + if subindices is None: + subindices = range(n_x) + circuit.append("H", x_stabilizer_indices) + for x_stab in subindices: + qubits = code.Hx[x_stab].nonzero()[1] + for qubit in qubits: + circuit.append("CNOT", [code.n + n_z + x_stab, qubit]) + circuit.append("H", x_stabilizer_indices) + if noisy: + circuit.append("X_ERROR", x_stabilizer_indices, p_meas) + + def z_syndrome_extraction(noisy=True, subindices=None): + if subindices is None: + subindices = range(n_z) + + for z_stab in subindices: + qubits = code.Hz[z_stab].nonzero()[1] + for qubit in qubits: + circuit.append("CNOT", [qubit, code.n + z_stab]) + if noisy: + circuit.append("X_ERROR", z_stabilizer_indices, p_meas) + + def weight_2_syndrome_extraction(noisy=True, x_only=False): + # Extracts the weight-2 boundary stabilizers + data_qubits = [2, 3] + circuit.append("H", [weight2_x_meas_index]) + for qubit in data_qubits: + circuit.append("CNOT", [weight2_x_meas_index, qubit]) + circuit.append("H", [weight2_x_meas_index]) + + if not x_only: + for qubit in data_qubits: + circuit.append("CNOT", [qubit, weight2_z_meas_index]) + + if noisy: + circuit.append("X_ERROR", [weight2_x_meas_index, weight2_z_meas_index], p_meas) + + noisy = False + circuit.append("I", 5, tag="T") + x_syndrome_extraction(noisy, subindices=[0, 2]) + weight_2_syndrome_extraction(noisy, x_only=True) + circuit.append("TICK") + circuit.append("MR", [ + code.n + n_z + 0, code.n + n_z + 2, weight2_x_meas_index + ]) + x_syndrome_extraction(noisy, subindices=[1]) + z_syndrome_extraction(noisy, subindices=[1]) + circuit.append("MR", [code.n + 1, code.n + n_z + 1]) + + for i_round in range(n_rounds): + noisy = (i_round != n_rounds - 1) + + circuit.append("DEPOLARIZE1", data_qubit_indices, p_phys) + h_measurement(noisy) + + circuit.append("DEPOLARIZE1", data_qubit_indices, p_phys) + x_syndrome_extraction(noisy) + z_syndrome_extraction(noisy) + + circuit.append("MR", z_stabilizer_indices + x_stabilizer_indices) + + return circuit + diff --git a/glue/generalized_sim/example.ipynb b/glue/generalized_sim/example.ipynb new file mode 100644 index 00000000..ad2725fe --- /dev/null +++ b/glue/generalized_sim/example.ipynb @@ -0,0 +1,71 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "04f4e3a0-3b73-4af8-a13b-dfb6734338a3", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "b09d4bf6-e7e5-45cd-9e0d-649a829f4d14", + "metadata": {}, + "outputs": [], + "source": [ + "from math import prod, sqrt\n", + "\n", + "import numpy as np\n", + "from circuits import generate_circuit_cn, generate_circuit_measure_all\n", + "from panqec.codes import Color666PlanarCode\n", + "\n", + "from pauli_sum import PauliSum, X, Z\n", + "from general_tableau import GeneralTableau\n", + "from utils import str_to_bsf, stringify\n", + "from logger import Logger" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d1c55fda-5fed-448f-8d8e-15b8239088a3", + "metadata": {}, + "outputs": [], + "source": [ + "circuit = generate_circuit_cn(Color666PlanarCode(1), n_rounds=3, p_phys=0.1, p_meas=0.1)\n", + "logger = Logger('log.txt', stdout_print=False)\n", + "\n", + "for i in range(10):\n", + " logger = Logger('log.txt', stdout_print=False)\n", + " tab = GeneralTableau(16, logger=logger)\n", + " result = tab.run_circuit(circuit)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/glue/generalized_sim/gates.py b/glue/generalized_sim/gates.py new file mode 100644 index 00000000..65b05d78 --- /dev/null +++ b/glue/generalized_sim/gates.py @@ -0,0 +1,87 @@ +from pauli_sum import PauliSum +import numpy as np +import stim + + +def stim_to_pauli(pauli_string): + pauli_string = str(pauli_string) + coeff = {'+': 1, '-': -1}[pauli_string[0]] + if pauli_string[1] == 'i': + coeff *= 1j + pauli = pauli_string[2:] + else: + pauli = pauli_string[1:] + + return coeff, pauli.replace('_', 'I') + + +def single_pauli(pauli_type: str, pos: int, n: int) -> str: + """Generate the n-qubit Pauli string corresponding to $X_i$, $Y_i$ or $Z_i$""" + error = ['I'] * n + error[pos] = pauli_type + return ''.join(error) + + +def propagator_T(positions: list[int], pauli: str): + n = len(pauli) + initial_pauli = list(pauli) + for i_pos in positions: + if initial_pauli[i_pos] in ['X', 'Z']: + initial_pauli[i_pos] = 'I' + + propagated_error = PauliSum({''.join(initial_pauli): 1}) + for i in range(n): + if i in positions and pauli[i] in ['X', 'Z']: + propagated_error *= PauliSum({ + single_pauli('X', i, n): 1/np.sqrt(2), + single_pauli('Z', i, n): {'X': -1, 'Z': 1}[pauli[i]]*1/np.sqrt(2) + }) + + return propagated_error + + +def propagator_Tdag(positions: list[int], pauli: str): + n = len(pauli) + initial_pauli = list(pauli) + for i_pos in positions: + if initial_pauli[i_pos] in ['X', 'Z']: + initial_pauli[i_pos] = 'I' + + propagated_error = PauliSum({''.join(initial_pauli): 1}) + for i in range(n): + if i in positions and pauli[i] in ['X', 'Z']: + propagated_error *= PauliSum({ + single_pauli('X', i, n): {'Z': -1, 'X': 1}[pauli[i]]*1/np.sqrt(2), + single_pauli('Z', i, n): 1/np.sqrt(2), + }) + + return propagated_error + +def propagation(layer: stim.CircuitInstruction, error: PauliSum, dag=False) -> PauliSum: + propagated_error = PauliSum({}) + for pauli, coeff in error.items(): + if layer.name == 'I' and len(layer.tag) > 0: + positions = list(map(lambda t: t.value, layer.targets_copy())) + prop_fn = propagator_dag[layer.tag] if dag else propagator[layer.tag] + new_pauli_sum = prop_fn(positions, pauli) * coeff + propagated_error += new_pauli_sum + else: + if dag: + new_pauli_stim = stim.PauliString(pauli).before(layer) + else: + new_pauli_stim = stim.PauliString(pauli).after(layer) + + new_coeff, new_pauli = stim_to_pauli(new_pauli_stim) + propagated_error += PauliSum({new_pauli: coeff * new_coeff}) + return propagated_error + + +propagator = { + 'T': propagator_T, + 'Tdag': propagator_Tdag +} + +propagator_dag = { + 'T': propagator_Tdag, + 'Tdag': propagator_T +} \ No newline at end of file diff --git a/glue/generalized_sim/general_tableau.py b/glue/generalized_sim/general_tableau.py new file mode 100644 index 00000000..cb7fde99 --- /dev/null +++ b/glue/generalized_sim/general_tableau.py @@ -0,0 +1,793 @@ +from math import prod, sqrt, acos +from cmath import exp +import random +from itertools import product +from typing import Union + +import numpy as np +import stim +from ldpc.mod2 import nullspace, rank + +from pauli_sum import PauliSum, X, Z, single_pauli_sum +from utils import commute, multiply_paulis, str_to_bsf, bsf_to_str, support +from gates import propagation +from logger import Logger + + +direction_dict = { + 'DEPOLARIZE1': tuple([1/3 for _ in range(3)]), + 'DEPOLARIZE2': tuple([1/15 for _ in range(15)]), + 'X_ERROR': (1, 0, 0), + 'Y_ERROR': (0, 1, 0), + 'Z_ERROR': (0, 0, 1), +} + +logical_H = PauliSum({'IIXXIXIIIIIIIIII': 1/np.sqrt(2), 'ZZIIIZIIIIIIIIII': 1/np.sqrt(2)}) +nonclifford_gates = ['T', 'Tdag'] + +class GeneralTableau: + def __init__(self, n: int, logger=None): + self.n = n + self.stabilizers: list[PauliSum] = [Z(i, n) for i in range(n)] + self.destabilizers: list[PauliSum] = [X(i, n) for i in range(n)] + self.nonpauli_index = -1 + self.buffer_layers: list[stim.CircuitInstruction] = [] + self.is_circuit_buffer_nonclifford = False + self.decomposition_threshold = self.n // 2 - 1 + + self.buffer_errors: list[PauliSum] = [] + self.nontrivial_logicals = None + + if logger is None: + self.logger = Logger('log.txt', stdout_print=False) + else: + self.logger = logger + + def check_commutation_relations(self, stabilizers=None, destabilizers=None): + """Check that all stabilizers/destabilizers commute with each other + and each stabilizer anticommute only with its corresponding destabilizer.""" + + if stabilizers is None: + stabilizers = self.stabilizers + if destabilizers is None: + destabilizers = self.destabilizers + + for i, stab in enumerate(stabilizers): + for j, destab in enumerate(destabilizers): + if i != j and not stab.commutes_with(destab): + raise ValueError( + f"Stabilizer {i} {stab}\nand destabilizer {j} {destab}\ndo not commute." + ) + if i == j and stab.commutes_with(destab): + raise ValueError( + f"Stabilizer {i} {stab}\nand destabilizer {j} {destab}\ncommute." + ) + + for j, other_stab in enumerate(stabilizers): + if i != j and not stab.commutes_with(other_stab): + raise ValueError( + f"Stabilizers {i} {stab}\nand {j} {other_stab}\ndo not commute." + ) + + for i, destab in enumerate(destabilizers): + for j, other_destab in enumerate(destabilizers): + if i != j and not destab.commutes_with(other_destab): + raise ValueError( + f"Destabilizers {i} {destab}\nand {j} {other_destab}\ndo not commute." + ) + + def weak_commute(self, op1: list[PauliSum], op2: PauliSum) -> bool: + commutant = op2 + for op in op1: + commutant = op * commutant * op.dag() + commutant = commutant * op2.dag() + # self.logger.print(f"Commutant: {commutant}") + + if commutant == PauliSum({'I'*self.n: 1}): + return True + + if commutant == PauliSum({'I'*self.n: -1}): + return False + + for i_stab, stab in enumerate(self.stabilizers): + if not self.weak_commute([commutant], stab): + self.logger.print(f"Weak commute failed with stabilizer {i_stab}: {stab}") + self.logger.print(f"Commutant: {commutant}") + return False + + return True + + def weak_anticommute(self, op1: list[PauliSum], op2: PauliSum) -> bool: + commutant = op2 + for op in op1: + commutant = op * commutant * op.dag() + commutant = commutant * op2.dag() + # self.logger.print(f"Commutant: {commutant}") + + if commutant == PauliSum({'I'*self.n: 1}): + return False + + if commutant == PauliSum({'I'*self.n: -1}): + return True + + for i_stab, stab in enumerate(self.stabilizers): + if not self.weak_anticommute([commutant], stab): + self.logger.print(f"Weak commute failed with stabilizer {i_stab}: {stab}") + return False + + return True + + def is_stabilizer(self, op: list[PauliSum], index: int = None) -> bool: + """Check if an operator is a stabilizer of the tableau. + If index is provided, check that the stabilizer anticommutes with the destabilizer + at that index and commutes with all other destabilizers. + """ + self.logger.print(f"Is {index} stabilizer? {op}") + # if index is not None: + # for j, destab in enumerate(self.destabilizers): + # # self.logger.print(f"Checking commutation between destabilizer {j} {destab}\nand stabilizer {index} {op}.") + # if j != index and not self.weak_commute(op, destab): + # self.logger.print(f"Destabilizer {j} {destab}\ndoes not commute with stabilizer {index} {op}.") + # return False + + for i_stab, stab in enumerate(self.stabilizers): + # self.logger.print(f"Checking commutation between stabilizer {i_stab} {stab}\nand stabilizer {op}.") + if not self.weak_commute(op, stab): + self.logger.print(f"Stabilizer {i_stab} {stab}\n does not commute with operator {op}.") + return False + + self.logger.print("Yes") + return True + + def is_destabilizer(self, op: list[PauliSum], index: int = None) -> bool: + """Check if an operator is a destabilizer of the tableau. + If index is provided, check that the destabilizer anticommutes with the stabilizer + at that index and commutes with all other destabilizers/stabilizers. + """ + self.logger.print(f"Is {index} destabilizer? {op}") + # if index is not None: + # if not self.weak_anticommute(op, self.stabilizers[index]): + # self.logger.print(f"Stabilizer {index} {self.destabilizers[index]}\ndoes not anticommute with destabilizer {index} {op}.") + # return False + # for j, stab in enumerate(self.stabilizers): + # if j != index and not self.weak_commute(op, stab): + # self.logger.print(f"Stabilizer {j} {stab}\ndoes not commute with destabilizer {index} {op}.") + # return False + + for i_destab, destab in enumerate(self.destabilizers): + if not self.weak_commute(op, destab): + self.logger.print(f"Destabilizer {i_destab} {destab}\n does not commute with destabilizer {op}.") + return False + + self.logger.print("Yes") + return True + + def generate_error( + self, + positions: list[int], + p: float, + direction: tuple[float] + ) -> PauliSum: + if len(direction) == 3: + restricted_error = random.choices( + ['I', 'X', 'Y', 'Z'], + k=len(positions), + weights=[1-p, p * direction[0], p * direction[1], p * direction[2]] + ) + else: + restricted_error = ''.join(random.choices( + list(map(''.join, list(product(['I', 'X', 'Y', 'Z'], repeat=2)))), + k=len(positions)//2, + weights=[1-p] + [p/15] * 15 + )) + + error = ['I'] * self.n + for i, pos in enumerate(positions): + error[pos] = restricted_error[i] + + return PauliSum({''.join(error): 1}) + + def get_pauli_tableau_matrix(self): + bsf_stabilizers = [] + bsf_destabilizers = [] + for i_stab in range(self.n): + if i_stab != self.nonpauli_index: + stab_pauli = list(self.stabilizers[i_stab].keys())[0] + destab_pauli = list(self.destabilizers[i_stab].keys())[0] + bsf_stabilizers.append(str_to_bsf(stab_pauli)) + bsf_destabilizers.append(str_to_bsf(destab_pauli)) + + tableau_matrix = np.array(bsf_stabilizers + bsf_destabilizers, dtype=np.uint8) + + return tableau_matrix + + def get_nontrivial_logicals(self): + tableau_matrix = self.get_pauli_tableau_matrix() + normalizer = nullspace(tableau_matrix).toarray() + logicals = [] + + for i in range(normalizer.shape[0]): + normalizer_bsf = np.hstack( + [normalizer[i, self.n:], normalizer[i, :self.n]] + ) + if rank(np.vstack([tableau_matrix, normalizer_bsf])) > len(tableau_matrix): + logicals.append( + bsf_to_str(normalizer_bsf) + ) + tableau_matrix = np.vstack([tableau_matrix, normalizer_bsf]) + + self.logger.print(f"\nLogicals: {logicals}\n") + + self.nontrivial_logicals = logicals + + return logicals + + def simplify(self, op: Union[PauliSum, list[PauliSum]], round_eps=13): + # print("Op", op) + # print("Nontrivial logicals:", self.nontrivial_logicals) + + if self.nontrivial_logicals is None: + raise ValueError("Non-trivial logicals are not set, cannot simplify operator.") + elif len(self.nontrivial_logicals) == 0: + raise ValueError("Set of non-trivial logicals is empty, cannot simplify operator.") + + + X_L_pauli, Z_L_pauli = self.nontrivial_logicals + Y_L_pauli = multiply_paulis(X_L_pauli, Z_L_pauli)[1] + + X_L = PauliSum({X_L_pauli: 1}) + Y_L = PauliSum({Y_L_pauli: 1}) + Z_L = PauliSum({Z_L_pauli: 1}) + + UXU = X_L.apply_unitary(op) + UYU = Y_L.apply_unitary(op) + UZU = Z_L.apply_unitary(op) + + tr_xx = self.lds_decompose(UXU * X_L).coefficient('I'*self.n).real + tr_zz = self.lds_decompose(UZU * Z_L).coefficient('I'*self.n).real + tr_xy = self.lds_decompose(UXU * Y_L).coefficient('I'*self.n).real + tr_xz = self.lds_decompose(UXU * Z_L).coefficient('I'*self.n).real + tr_yy = self.lds_decompose(UYU * Y_L).coefficient('I'*self.n).real + + self.logger.print(f"op {op}") + self.logger.print(f"UXU {(UXU)}") + self.logger.print(f"UXUZ {(UXU * Z_L)}") + self.logger.print(f"UXUZ decompose {self.lds_decompose(UXU * Z_L)}") + self.logger.print(f"UXUY {(UXU * Y_L)}") + self.logger.print(f"UXUY decompose {self.lds_decompose(UXU * Y_L)}") + self.logger.print(f"tr_xx {tr_xx}") + self.logger.print(f"tr_yy {tr_yy}") + self.logger.print(f"tr_zz {tr_zz}") + self.logger.print(f"tr_xy {tr_xy}") + self.logger.print(f"tr_xz {tr_xz}") + + try: + mod_d = sqrt(round(1/4 * (tr_xx + tr_yy + tr_zz + 1).real, round_eps)) + mod_a = sqrt(round(-0.5 * (tr_yy + tr_zz).real + mod_d**2, round_eps)) + mod_b = sqrt(round(-0.5 * (tr_xx + tr_zz).real + mod_d**2, round_eps)) + mod_c = sqrt(round(-0.5 * (tr_xx + tr_yy).real + mod_d**2, round_eps)) + except ValueError as e: + print("Op", op) + print("Nontrivial logicals:", self.nontrivial_logicals) + print("UXUX", (UXU * X_L)) + print("UXUX decompose", self.lds_decompose(UXU * X_L)) + print("UYUY", (UYU * Y_L)) + print("UYUY decompose", self.lds_decompose(UYU * Y_L)) + print("UZUZ", (UZU * Z_L)) + print("UZUZ decompose", self.lds_decompose(UZU * Z_L)) + # print("LDS decomposed op", self.lds_decompose(prod(op))) + + print("tr_xx", tr_xx) + print("tr_yy", tr_yy) + print("tr_zz", tr_zz) + print("mod_a_square", -0.5 * (tr_yy + tr_zz).real) + print("mod_b_square", -0.5 * (tr_xx + tr_zz).real) + print("mod_c_square", -0.5 * (tr_xx + tr_yy).real) + print("mod_d_square", 1/4 * (tr_xx + tr_yy + tr_zz + 1).real) + print(self) + # raise ValueError(e) + + theta_a = 0 + + if mod_a * mod_d != 0: + tr_xi = self.lds_decompose(UXU).coefficient('I'*self.n).real + theta_d = acos(round(0.5 * tr_xi / (mod_d * mod_a), round_eps)) + elif mod_b * mod_d != 0: + tr_yi = self.lds_decompose(UYU).coefficient('I'*self.n).real + theta_d = acos(round(0.5 * tr_yi / (mod_d * mod_b), round_eps)) + elif mod_c * mod_d != 0: + tr_zi = self.lds_decompose(UZU).coefficient('I'*self.n).real + theta_d = acos(round(0.5 * tr_zi / (mod_d * mod_c), round_eps)) + else: + theta_d = 0 + + if mod_a * mod_b != 0: + cos_theta_b = round(0.5 * (tr_xy / (mod_a * mod_b)), round_eps) + theta_b = acos(cos_theta_b) + else: + theta_b = 0 + + if mod_a * mod_c != 0: + cos_theta_c = round(0.5 * (tr_xz / (mod_a * mod_c)), round_eps) + theta_c = acos(cos_theta_c) + elif mod_b * mod_c != 0: + tr_yz = self.lds_decompose(UYU * Z_L).coefficient('I'*self.n).real + cos_theta_c = round(0.5 * (tr_yz / (mod_b * mod_c)), round_eps) + theta_c = acos(cos_theta_c) + theta_b + else: + theta_c = 0 + + self.logger.print(f"theta_a {theta_a}") + self.logger.print(f"theta_b {theta_b}") + self.logger.print(f"theta_c {theta_c}") + + simplified_op = PauliSum({ + X_L_pauli: mod_a * exp(1j * theta_a), + Y_L_pauli: mod_b * exp(1j * theta_b), + Z_L_pauli: mod_c * exp(1j * theta_c), + 'I'*self.n: mod_d * exp(1j * theta_d) + }).remove_zeros() + + return simplified_op + + def lds_decompose(self, op: PauliSum): + """Simplify a Pauli operator by removing identity terms.""" + + new_op = PauliSum({}) + for pauli, coeff in op.items(): + # self.logger.print("Old pauli:", pauli, coeff) + anticommuting_stabilizers = [] + anticommuting_destabilizers = [] + anticommuting_logicals = [] + for i in range(self.n): + if i != self.nonpauli_index: + stab_pauli = list(self.stabilizers[i].keys())[0] + destab_pauli = list(self.destabilizers[i].keys())[0] + if not commute(stab_pauli, pauli): + anticommuting_stabilizers.append(i) + if not commute(destab_pauli, pauli): + anticommuting_destabilizers.append(i) + + if self.nontrivial_logicals is not None: + for i in range(len(self.nontrivial_logicals)): + if not commute(self.nontrivial_logicals[i], pauli): + anticommuting_logicals.append(i) + + if ( + len(anticommuting_stabilizers) == 0 and + len(anticommuting_destabilizers) == 0 and + len(anticommuting_logicals) == 0 + ): + new_pauli, new_coeff = pauli, coeff + else: + decomposition_d = 'I'*self.n + decomposition_s = 'I'*self.n + decomposition_l = 'I'*self.n + phase_s = 1 + for i in anticommuting_stabilizers: + destab_pauli = list(self.destabilizers[i].keys())[0] + _, decomposition_d = multiply_paulis(decomposition_d, destab_pauli) + for i in anticommuting_destabilizers: + # We keep the stabilizer phase because it is absorbed in the state + # when simplifying the operator + stab_pauli = list(self.stabilizers[i].keys())[0] + coeff_s = self.stabilizers[i][stab_pauli] + new_phase_s, decomposition_s = multiply_paulis(decomposition_s, stab_pauli) + phase_s *= new_phase_s * coeff_s + for i in anticommuting_logicals: + logical = self.nontrivial_logicals[1-i] + _, decomposition_l = multiply_paulis(decomposition_l, logical) + + # self.logger.print("Decomposition S:", decomposition_s, phase_s) + + LD_coeff, LD_pauli = multiply_paulis(decomposition_l, decomposition_d) + LDS_coeff, LDS_pauli = multiply_paulis(LD_pauli, decomposition_s) + LDS_coeff *= LD_coeff + + final_phase = LDS_coeff / phase_s + new_coeff = coeff * final_phase + new_pauli = LD_pauli + + if LDS_pauli != pauli: + raise ValueError( + "Decomposition does not match original pauli\n" + + f"Original Pauli: {pauli}\n" + + f"Decomposition S: {decomposition_s}\n" + + f"Decomposition D: {decomposition_d}\n" + + f"Decomposition L: {decomposition_l}\n" + + f"Recomposition: {LDS_pauli}\n" + + f"Anticommuting stabilizers: {anticommuting_stabilizers}" + + f"Anticommuting destabilizers: {anticommuting_destabilizers}" + + f"Anticommuting logicals: {anticommuting_logicals}" + + f"Tableau:\n {self}" + ) + + new_op += {new_pauli: new_coeff} + # self.logger.print("New pauli:", new_pauli, new_coeff) + return new_op + + def get_measurement_probability(self, qubit: int) -> float: + """Get the probability of measuring a qubit in the |0> state.""" + Zq = Z(qubit, self.n) + M = self.backpropagate_buffer(Zq) + M_simplified = self.simplify(M) + a = (M_simplified * self.stabilizers[self.nonpauli_index]).coefficient('I'*self.n) + d = M_simplified.coefficient('I'*self.n) + p = 0.5 * (1 + abs(a) + abs(d)) + + return p + + def get_orthogonal(self, op: PauliSum): + """Find element that anticommutes with op, assuming it is a sum of logicals. + We use the Gram-Schmidt process to the find an orthogonal element to the vector (a, b, c) + where op=aX + bY + cZ + """ + + if self.nontrivial_logicals is None: + raise ValueError("Non-trivial logicals are not set. Call get_nontrivial_logicals() first.") + + X_L_pauli, Z_L_pauli = self.nontrivial_logicals + Y_L_pauli = multiply_paulis(X_L_pauli, Z_L_pauli)[1] + + if not set(op.keys()).issubset(set([X_L_pauli, Y_L_pauli, Z_L_pauli])): + raise ValueError( + f"Operator {op} is not a sum of logicals. " + "It should be a sum of X, Y, Z logicals" + f"\nNon-trivial logicals:\n{self.nontrivial_logicals}." + ) + + a, b, c = op.coefficient(X_L_pauli), op.coefficient(Y_L_pauli), op.coefficient(Z_L_pauli) + + if b == c == 0: + ortho_vector = [0, 0, 1] + else: + ortho_vector = [1-a**2, -a*b, -a*c] + + ortho_op = PauliSum({ + X_L_pauli: ortho_vector[0], + Y_L_pauli: ortho_vector[1], + Z_L_pauli: ortho_vector[2] + }).remove_zeros() + + return ortho_op + + + def measure_and_reset(self, qubit: int, meas_prob=None) -> int: + self.logger.print(f"------- Measure and reset qubit {qubit} -------") + noncommuting_stabilizers = [] + noncommuting_destabilizers = [] + anticommuting_destabilizers = [] + Zq = Z(qubit, self.n) + Xq = X(qubit, self.n) + + # self.logger.print("Check commutation relations before measurement") + # self.check_commutation_relations(self.stabilizers, self.destabilizers) + + i_s0 = -1 # Index of a Pauli stabilizer that anticommutes with Zq + commuting_non_pauli_index = -1 + for i, s in enumerate(self.stabilizers): + # self.logger.print("Check stabilizer ", i, ":", s) + if s * Zq != Zq * s: + noncommuting_stabilizers.append(i) + if i_s0 == -1 and len(s) == 1: + i_s0 = i + elif len(s) > 1: + if commuting_non_pauli_index == -1: + commuting_non_pauli_index = i + else: + raise ValueError( + "More than one non-Pauli stabilizer that commutes with Zq. " + "This is not supported." + ) + + for i, d in enumerate(self.destabilizers): + if d * Zq != Zq * d: + noncommuting_destabilizers.append(i) + if d * Zq == - Zq * d: + anticommuting_destabilizers.append(i) + + self.logger.print(f"Noncommuting stabilizers:\n {noncommuting_stabilizers}") + self.logger.print(f"Pivot:\n {i_s0}") + self.logger.print(f"Anticommuting destabilizers:\n {anticommuting_destabilizers}") + self.logger.print(f"Noncommuting destabilizers:\n {noncommuting_destabilizers}") + + if len(noncommuting_stabilizers) == 0: + S: PauliSum = prod([self.stabilizers[s] for s in anticommuting_destabilizers]) + S *= Zq + self.logger.print(f"Deterministic. This should be r x I: {S}") + assert list(S.keys()) == ['I'*self.n], ( + f"Not identity. Tableau:\n{self.stabilizers}\n{self.destabilizers}" + ) + + r = S['I'*self.n] + else: + if i_s0 == -1: + if len(noncommuting_stabilizers) > 1: + raise ValueError( + "More than one non-Pauli stabilizer that does not commute with Zq. " + "This is not supported." + ) + else: + i_s0 = noncommuting_stabilizers[0] + else: + pivot_pauli = list(self.stabilizers[i_s0].keys())[0] + pivot_coeff = self.stabilizers[i_s0][pivot_pauli] + + for i_s in noncommuting_stabilizers: + if i_s != i_s0: + new_stab = PauliSum({}) + for pauli, coeff in self.stabilizers[i_s].items(): + if pauli[qubit] in ['X', 'Y']: + new_coeff, new_pauli = multiply_paulis(pauli, pivot_pauli) + new_coeff *= pivot_coeff * coeff + new_stab += {new_pauli: new_coeff} + else: + new_stab += {pauli: coeff} + self.stabilizers[i_s] = new_stab + + for i_d in noncommuting_destabilizers: + if i_d != i_s0: + new_destab = PauliSum({}) + for pauli, coeff in self.destabilizers[i_d].items(): + if pauli[qubit] in ['X', 'Y']: + new_coeff, new_pauli = multiply_paulis(pauli, pivot_pauli) + new_coeff *= pivot_coeff * coeff + new_destab += {new_pauli: new_coeff} + else: + new_destab += {pauli: coeff} + self.destabilizers[i_d] = new_destab + + self.destabilizers[i_s0] = Xq + p = 0.5 if meas_prob is None else meas_prob + # r = np.random.choice([1, -1], p=[p, 1-p]) + r = 1 # Cheating for testing purposes + # It is not r * Zq because of resetting + self.stabilizers[i_s0] = Zq + + if commuting_non_pauli_index != -1: + self.nonpauli_index = commuting_non_pauli_index + self.get_nontrivial_logicals() + self.logger.print(f"Non-trivial logicals: {self.nontrivial_logicals}") + self.logger.print(f"Commuting non-Pauli stabilizer index: {commuting_non_pauli_index}") + self.logger.print(f"Old stabilizer: {self.stabilizers[commuting_non_pauli_index]}") + self.logger.print(f"Old destabilizer: {self.destabilizers[commuting_non_pauli_index]}") + + new_stab_without_Zq = self.simplify(self.stabilizers[commuting_non_pauli_index]) + new_stab = self.simplify(self.stabilizers[commuting_non_pauli_index]) * Zq + + self.logger.print(f"New stabilizer: {new_stab}") + self.stabilizers[commuting_non_pauli_index] = new_stab + + # Calculate the new destablizer by finding an orthogonal element + new_destab = self.get_orthogonal(new_stab_without_Zq) + self.destabilizers[commuting_non_pauli_index] = new_destab + + + + # Reseting by multiplying with r all stabilizers/destabilizers containing Zq + # for i in range(self.n): + # if i != i_s0: + # for tableau in [self.stabilizers, self.destabilizers]: + # for pauli, _ in tableau[i].items(): + # if pauli[qubit] == 'Z': + # tableau[i][pauli] *= r + # Reseting by multiplying by Zq/D[Zq] all the stabilizers/destabilizers containing Zq + + # self.logger.print("Tableau before resetting:") + # self.logger.print(self) + + for i in range(self.n): + if i != i_s0: + new_stab = PauliSum({}) + for pauli, coeff in self.stabilizers[i]: + if pauli[qubit] == 'Z': + new_pauli = pauli[:qubit] + 'I' + pauli[qubit+1:] + new_coeff = coeff * r + else: + new_pauli = pauli + new_coeff = coeff + new_stab += {new_pauli: new_coeff} + self.stabilizers[i] = new_stab + + new_destab = PauliSum({}) + for pauli, coeff in self.destabilizers[i]: + if pauli[qubit] == 'Z': + new_pauli = pauli[:qubit] + 'I' + pauli[qubit+1:] + new_coeff = coeff * r + else: + new_pauli = pauli + new_coeff = coeff + new_destab += {new_pauli: new_coeff} + self.destabilizers[i] = new_destab + + # self.logger.print() + # self.logger.print("Check commutation relations after measurement") + self.check_commutation_relations(self.stabilizers, self.destabilizers) + + self.logger.print(f"Result: {r}") + return r + + def backpropagate_buffer(self, op: PauliSum): + """Backpropagate a Pauli operator through the buffer layers.""" + backpropagated_op = [op.copy()] + for i_layer in range(len(self.buffer_layers)-1, -1, -1): + if (self.buffer_layers[i_layer].tag in nonclifford_gates and + len(backpropagated_op) == 1 and + len(backpropagated_op[0]) == 1 and + support((list(backpropagated_op[0].keys())[0])) >= self.decomposition_threshold + ): + # Decompose the operator before a non-Clifford gate if not already decomposed + current_op = backpropagated_op[0] + pauli = list(current_op.keys())[0] + backpropagated_op = [] + for i in range(self.n): + if pauli[i] != 'I': + backpropagated_op.append(single_pauli_sum(i, pauli[i], self.n)) + backpropagated_op[0] *= current_op[pauli] + + for i_op, op in enumerate(backpropagated_op): + new_op = propagation( + self.buffer_layers[i_layer], op, dag=True + ) + backpropagated_op[i_op] = new_op + + # self.logger.print(f"Backpropagated operator:\n{backpropagated_op}") + return backpropagated_op + + def propagate_buffer(self, op: PauliSum): + """Propagate a Pauli operator through the buffer layers.""" + propagated_op = op.copy() + for layer in self.buffer_layers: + propagated_op = propagation(layer, propagated_op) + + return propagated_op + + def apply_buffer(self, check_preservation=True): + new_stabilizers = [] + new_destabilizers = [] + new_nonpauli_index = -1 + + for i_stab, stab in enumerate(self.stabilizers): + self.logger.print(f"Original stabilizer {i_stab}: {stab}") + destab = self.destabilizers[i_stab] + + backpropagated_stab = self.backpropagate_buffer(stab) + backpropagated_destab = self.backpropagate_buffer(destab) + self.logger.print(f"Backpropagated stabilizer {i_stab}: {backpropagated_stab}") + self.logger.print(f"Backpropagated destabilizer {i_stab}: {backpropagated_destab}") + + # if check_preservation and ( + # backpropagated_stab == PauliSum({'I'*self.n: 1}) or + # backpropagated_stab == self.stabilizers[self.nonpauli_index] + # ): + if (check_preservation and + self.is_stabilizer(backpropagated_stab, i_stab) and True + # self.is_destabilizer(backpropagated_destab, i_stab) + ): + self.logger.print("Preserve stabilizer\n") + propagated_stab = self.stabilizers[i_stab] + propagated_destab = self.destabilizers[i_stab] + else: + propagated_stab = self.propagate_buffer(stab) + propagated_destab = self.propagate_buffer(self.destabilizers[i_stab]) + self.logger.print(f"Change stabilizer to: {propagated_stab}\n") + + new_stabilizers.append(propagated_stab) + new_destabilizers.append(propagated_destab) + + if len(propagated_stab) != 1: + # if new_nonpauli_index != -1: + # raise ValueError( + # "More than one non-Pauli stabilizer found. " + # "This is not currently supported." + # ) + new_nonpauli_index = i_stab + + self.stabilizers = new_stabilizers + self.destabilizers = new_destabilizers + self.nonpauli_index = new_nonpauli_index + self.buffer_layers = [] + self.is_circuit_buffer_nonclifford = False + + def apply_error(self): + self.logger.print(f"Applying error to the tableau:\n{self.buffer_errors}") + + for tableau in [self.stabilizers, self.destabilizers]: + for i in range(self.n): + for error in self.buffer_errors: + tableau[i] = error * tableau[i] * error.dag() + + self.buffer_errors = [] + + def evolve(self, layer: stim.CircuitInstruction): + """Evolve the tableau with a given layer of the circuit.""" + measurement_results = [] + ignored_gates = [ + 'QUBIT_COORDS', 'DETECTOR' + ] + if layer.name == 'RX': + layer = stim.CircuitInstruction('H', layer.targets_copy()) + + if layer.name in ignored_gates: + pass + elif layer.name in ['DEPOLARIZE1', 'DEPOLARIZE2', 'X_ERROR', 'Y_ERROR', 'Z_ERROR']: + new_error = self.generate_error( + list(map(lambda t: t.value, layer.targets_copy())), + layer.gate_args_copy()[0], + direction_dict[layer.name] + ) + self.buffer_errors.append(new_error) + + elif layer.name == 'TICK': + self.logger.print("Tableau before TICK:") + self.logger.print(self) + + self.apply_buffer(check_preservation=False) + self.apply_error() + + self.logger.print("Tableau after TICK:") + self.logger.print(self) + + elif layer.name == 'MR' or layer.name == 'R': + self.get_nontrivial_logicals() + meas_indices = list(map(lambda t: t.value, layer.targets_copy())) + + # if self.is_circuit_buffer_nonclifford: + meas_prob = self.get_measurement_probability(meas_indices[0]) + self.logger.print(f"Measurement probability: {meas_prob}") + + self.apply_buffer(check_preservation=self.is_circuit_buffer_nonclifford) + + self.logger.print("Tableau before applying error:") + self.logger.print(self) + self.apply_error() + + self.logger.print("Tableau before measurement:") + self.logger.print(self) + + for qubit in meas_indices: + measurement_results.append(self.measure_and_reset(qubit, meas_prob=meas_prob)) + self.check_commutation_relations() + + self.logger.print("Tableau after measurement:") + self.logger.print(self) + else: + self.buffer_layers.append(layer) + if layer.tag in nonclifford_gates: + self.is_circuit_buffer_nonclifford = True + for i in range(len(self.buffer_errors)): + self.buffer_errors[i] = propagation(layer, self.buffer_errors[i]) + + return measurement_results + + def run_circuit(self, circuit: stim.Circuit): + """Run a circuit on the tableau.""" + result = {'accept': False, 'fail': False} + for layer in circuit: + self.logger.print("=======================================================================") + self.logger.print(layer) + self.logger.print("=======================================================================") + measurement_results = self.evolve(layer) + + if -1 in measurement_results: + return result + + result['accept'] = True + if self.stabilizers[self.nonpauli_index] != logical_H: + result['fail'] = True + + return result + + def __str__(self): + lines = ["== General Tableau =="] + lines.append("Stabilizers") + for stabilizer in self.stabilizers: + lines.append(f"{stabilizer}") + lines.append("\nDestabilizers") + for destabilizer in self.destabilizers: + lines.append(f"{destabilizer}") + + return "\n".join(lines) + + def __repr__(self): + return self.__str__() \ No newline at end of file diff --git a/glue/generalized_sim/logger.py b/glue/generalized_sim/logger.py new file mode 100644 index 00000000..318b7f9b --- /dev/null +++ b/glue/generalized_sim/logger.py @@ -0,0 +1,15 @@ +class Logger: + def __init__(self, filename, logging=True, stdout_print=False): + self.filename = filename + self.logging = logging + self.stdout_print = stdout_print + # Overwrite the file at initialization + with open(self.filename, 'w'): + pass # Just truncate the file + + def print(self, message): + if self.logging: + with open(self.filename, 'a') as f: + f.write(str(message) + '\n') + if self.stdout_print: + print(message) diff --git a/glue/generalized_sim/pauli_sum.py b/glue/generalized_sim/pauli_sum.py new file mode 100644 index 00000000..b2203a15 --- /dev/null +++ b/glue/generalized_sim/pauli_sum.py @@ -0,0 +1,197 @@ +import numpy as np +from typing import Union, TypeAlias, Optional +from math import prod + +from utils import multiply_paulis, isclose + +SumPauliStrings: TypeAlias = dict[str, complex] + + +class PauliSum: + def __init__(self, terms: Optional[Union[int, SumPauliStrings]] = None): + if terms is None: + self._terms = dict() + elif isinstance(terms, int): + self._terms = {'I' * terms: 1} + elif isinstance(terms, dict): + self._terms = terms + else: + raise TypeError("Invalid type for terms. Expected int or dict.") + + self._n_qubits = None + + # @profile + def __add__(self, op: Union["PauliSum", SumPauliStrings]): + new_dict = self._terms.copy() + + for pauli, coeff in op.items(): + new_coeff = new_dict.get(pauli, 0) + coeff + if isclose(new_coeff, 0): + new_dict.pop(pauli, None) # Remove key if it exists + else: + new_dict[pauli] = new_coeff + + return PauliSum(new_dict) + + def __sub__(self, op: Union["PauliSum", SumPauliStrings]): + return self + -1 * op + + # @profile + def __mul__(self, op: Union[int, float, complex, "PauliSum"]): + if isinstance(op, (int, float, complex)): + if op == 0: + new_dict = {} + else: + new_dict = {pauli: op * coeff for pauli, coeff in self} + + elif isinstance(op, PauliSum): + new_dict = {} + for pauli1, coeff1 in self._terms.items(): + for pauli2, coeff2 in op._terms.items(): + phase, new_pauli = multiply_paulis(pauli1, pauli2) + new_coeff = coeff1 * coeff2 * phase + new_value = new_dict.get(new_pauli, 0) + new_coeff + if isclose(new_value, 0): + new_dict.pop(new_pauli, None) # Remove key if it exists + else: + new_dict[new_pauli] = new_value + else: + raise NotImplementedError("Multiplication not implemented yet for those types") + + return PauliSum(new_dict) + + def remove(self, pauli: str): + self._terms.pop(pauli, None) + + @property + def n_qubits(self) -> int: + if self._n_qubits is None: + if len(self) == 0: + self._n_qubits = 0 + else: + self._n_qubits = len(list(self.keys())[0]) + + return self._n_qubits + + def remove_zeros(self): + """Remove terms with zero coefficients from the PauliSum.""" + self._terms = {pauli: coeff for pauli, coeff in self._terms.items() if not isclose(coeff, 0)} + + return self + + def dag(self) -> "PauliSum": + new_dict = {pauli: complex(coeff).conjugate() for pauli, coeff in self} + + return PauliSum(new_dict) + + def normalize(self) -> "PauliSum": + norm = np.linalg.norm(list(self.values())) + if norm == 0: + raise ValueError("Cannot normalize a zero vector.") + + return self / norm + + def proportional_to(self, op: "PauliSum") -> bool: + self_coeffs = list(self.values()) + op_coeffs = list(op.values()) + if self.keys() == op.keys(): + prop_coeff = self_coeffs[0] / op_coeffs[0] + if np.isclose(self_coeffs, prop_coeff * np.array(op_coeffs)).all(): + return True + + return False + + def commutes_with(self, op: "PauliSum") -> bool: + return self * op == op * self + + def anticommutes_with(self, op: "PauliSum") -> bool: + return self * op == - op * self + + def restrict_up_to(self, end: int, include_identities: bool = True) -> "PauliSum": + new_dict = {} + for pauli, coeff in self.items(): + new_key = pauli[:end] + if include_identities: + new_key += 'I' * (len(pauli) - end) + new_dict[new_key] = coeff + + return PauliSum(new_dict) + + def apply_unitary(self, unitary: Union["PauliSum", list["PauliSum"]]) -> "PauliSum": + evolved_op = self + if not isinstance(unitary, list): + unitary = [unitary] + for u in unitary: + evolved_op = u * evolved_op * u.dag() + + return evolved_op + + def coefficient(self, pauli: str) -> complex: + return self._terms.get(pauli, 0) + + def keys(self): + return self._terms.keys() + + def values(self): + return self._terms.values() + + def items(self): + return self._terms.items() + + def copy(self) -> "PauliSum": + return PauliSum(self._terms.copy()) + + def __len__(self): + return len(self._terms) + + def __eq__(self, op: "PauliSum"): + return ( + self.keys() == op.keys() and + all([isclose(op[p], self[p]) for p in op.keys()]) + ) + + def __rmul__(self, op: Union[int, float, complex, "PauliSum"]): + return self.__mul__(op) + + def __pow__(self, power): + return prod([self for _ in range(power)]) + + def __neg__(self): + return -1 * self + + def __truediv__(self, coeff: Union[int, float, complex]): + return 1 / coeff * self + + def __iter__(self): + return iter(self._terms.items()) + + def __setitem__(self, key, value): + self._terms[key] = value + + def __getitem__(self, key): + return self._terms[key] + + def __str__(self): + lines = ["PauliSum"] + for pauli, coeff in self: + lines.append(f" {coeff:+.2f} * {pauli}") + return "\n".join(lines) + + def __repr__(self): + return self.__str__() + + +def single_pauli_sum(i: int, pauli_type: str, n: int) -> PauliSum: + pauli_string = ['I'] * n + pauli_string[i] = pauli_type + + return PauliSum({''.join(pauli_string): 1.0}) + +def X(i, n=7): + return single_pauli_sum(i, 'X', n) + +def Z(i, n=7): + return single_pauli_sum(i, 'Z', n) + +def Y(i, n=7): + return single_pauli_sum(i, 'Y', n) diff --git a/glue/generalized_sim/pheno_simulation.py b/glue/generalized_sim/pheno_simulation.py new file mode 100644 index 00000000..d2d4f37e --- /dev/null +++ b/glue/generalized_sim/pheno_simulation.py @@ -0,0 +1,417 @@ +import os +import time +from itertools import product +from typing import TypeAlias, Optional + +import numpy as np +import numpy.typing as npt +import sinter +import stim +from panqec.codes import StabilizerCode, Color666PlanarCode + +from pauli_sum import PauliSum +from utils import stringify, unstringify, str_to_bsf, bsf_to_str, isclose +from logger import Logger + +PauliBSF: TypeAlias = npt.NDArray[np.uint8] +BinaryVect: TypeAlias = npt.NDArray[np.uint8] +WeightedPauliBSF: TypeAlias = tuple[float, PauliBSF] + + +def hadamard(op: PauliSum) -> np.ndarray: + new_terms = dict() + + h_map = {'I': 'I', 'X': 'Z', 'Y': 'Y', 'Z': 'X'} + for pauli, coeff in op._terms.items(): + new_pauli = ''.join([h_map[p] for p in pauli]) + new_coeff = -coeff if pauli.count('Y') % 2 == 1 else coeff + new_terms[new_pauli] = new_coeff + + h_op = PauliSum(new_terms) + + return h_op + + +logical_I = PauliSum({'I': 1}) +logical_H = PauliSum({'X': np.sqrt(2), 'Z': np.sqrt(2)}) +logical_X = PauliSum({'X': 1}) +logical_Y = PauliSum({'Y': 1}) +logical_Z = PauliSum({'Z': 1}) + + +class PhenomenologicalSimulation: + def __init__( + self, + code: StabilizerCode, + error_model: tuple[float] = (1/3, 1/3, 1/3), + logger=None + ): + self.code = code + self.error_model = error_model + + self.H = code.stabilizer_matrix.toarray() + self.Hx = code.Hx.toarray() + self.Hz = code.Hz.toarray() + + if logger is None: + self.logger = Logger('log.txt', stdout_print=False) + else: + self.logger = logger + + self.project = { + 'H': self.project_H, + 'C': self.project_codespace, + 'Y': self.project_Y, + 'X': self.project_X, + 'Z': self.project_Z} + + def is_stabilizer(self, pauli: str): + pauli_bsf = str_to_bsf(pauli) + return ( + np.all(self.get_syndrome(pauli_bsf) == 0) and + np.all(self.code.logical_errors(pauli_bsf) == 0) + ) + + def generate_error(self, p_phys): + sx, sy, sz = self.error_model + + return ''.join(list(np.random.choice( + ['I', 'X', 'Y', 'Z'], + size=self.code.n, + p=[1-p_phys, sx*p_phys, sy*p_phys, sz*p_phys] + ))) + + def physical_to_logical(self, pauli_sum: PauliSum) -> PauliSum: + """Convert a physical error to a logical error using the code's stabilizer matrix.""" + logical_terms = dict() + + for pauli, coeff in pauli_sum.items(): + logical_pauli = bsf_to_str(self.code.logical_errors(str_to_bsf(pauli))) + logical_terms[logical_pauli] = logical_terms.get(logical_pauli, 0) + coeff + + return PauliSum(logical_terms) + + def project_H(self, syndrome: BinaryVect, error: PauliSum) -> PauliSum: + """Calculate Pi^s E when acting on the state |H>, + for a syndrome s and error E that takes the form of a Pauli sum. + The syndrome can only be [0] (corresponding to +1) or [1] (corresponding to -1). + """ + projected_op = 0.5 * (error + (-1)**syndrome[0] * hadamard(error)) + + return projected_op + + def project_X(self, syndrome: BinaryVect, error: PauliSum) -> PauliSum: + """Calculate Pi^s E when acting on the state |Y>, + for a syndrome s and error E that takes the form of a Pauli sum. + The syndrome can only be [0] (corresponding to +1) or [1] (corresponding to -1). + """ + projected_op = PauliSum() + + for pauli, coeff in error: + if (pauli.count('Z') + pauli.count('Y')) % 2 == syndrome[0]: + projected_op += {pauli: coeff} + + return projected_op + + def project_Y(self, syndrome: BinaryVect, error: PauliSum) -> PauliSum: + """Calculate Pi^s E when acting on the state |Y>, + for a syndrome s and error E that takes the form of a Pauli sum. + The syndrome can only be [0] (corresponding to +1) or [1] (corresponding to -1). + """ + projected_op = PauliSum() + + for pauli, coeff in error: + if (pauli.count('X') + pauli.count('Z')) % 2 == syndrome[0]: + projected_op += {pauli: coeff} + + return projected_op + + def project_Z(self, syndrome: BinaryVect, error: PauliSum) -> PauliSum: + """Calculate Pi^s E when acting on the state |Z>, + for a syndrome s and error E that takes the form of a Pauli sum. + The syndrome can only be [0] (corresponding to +1) or [1] (corresponding to -1). + """ + projected_op = PauliSum() + + for pauli, coeff in error: + if (pauli.count('X') + pauli.count('Y')) % 2 == syndrome[0]: + projected_op += {pauli: coeff} + + return projected_op + + def project_codespace(self, syndrome: BinaryVect, error: PauliSum) -> PauliSum: + projected_op = PauliSum() + + for pauli, coeff in error: + pauli_bsf = str_to_bsf(pauli) + if np.all(self.get_syndrome(pauli_bsf) == syndrome): + projected_op += {pauli: coeff} + + return projected_op + + def measure(self, proj_key: str, error: PauliSum) -> tuple[dict[str, float], list[PauliSum]]: + new_error = dict() + syndrome_prob = dict() + + if proj_key in ['H', 'Y', 'X', 'Z']: + possible_syndromes = {'0', '1'} + elif proj_key == "C": + possible_syndromes = set([ + stringify(self.get_syndrome(str_to_bsf(pauli))) for pauli in error.keys() + ]) + else: + raise ValueError("Invalid projection operator. Use 'H', 'X', 'Y', 'Z' or 'C'.") + + for s in possible_syndromes: + syndrome_prob[s] = 0 + + new_error[s] = self.project[proj_key](unstringify(s), error) + + sandwiched_op: PauliSum = error.dag() * new_error[s] + self.logger.print(f"Initial sandwiched op: {sandwiched_op}") + + simplified_sandwiched_op = PauliSum() + for pauli, coeff in sandwiched_op: + pauli_bsf = str_to_bsf(pauli) + if np.all(self.get_syndrome(pauli_bsf) == 0): + simplified_sandwiched_op[pauli] = coeff + sandwiched_op = simplified_sandwiched_op + + self.logger.print(f"Final sandwiched op: {sandwiched_op}") + + sandwiched_op_log = self.physical_to_logical(sandwiched_op) + self.logger.print(f"Logical sandwiched op: {sandwiched_op_log}") + + for pauli, coeff in sandwiched_op_log: + if pauli == 'I': + syndrome_prob[s] += coeff + elif pauli in ['X', 'Z']: + syndrome_prob[s] += coeff / np.sqrt(2) + + if not isclose(abs(syndrome_prob[s]), syndrome_prob[s]): + raise ValueError( + f"Syndrome probability {syndrome_prob[s]} is not a positive real number" + ) + + syndrome_prob[s] = abs(syndrome_prob[s]) + + sum_prob = np.sum(list(syndrome_prob.values())) + + if sum_prob == 0: + raise ValueError("The syndrome probabilities are all zero") + + for s in syndrome_prob.keys(): + syndrome_prob[s] /= sum_prob + + return syndrome_prob, new_error + + def get_syndrome(self, error_bsf: npt.NDArray) -> npt.NDArray: + """Measure the syndrome of a given error using the stabilizer matrices""" + n = self.Hx.shape[1] + + syndrome = np.concatenate([self.Hx@error_bsf[n:], self.Hz@error_bsf[:n]]) % 2 + + return syndrome + + def noisy_measurement_layer( + self, + measured_op: str, + p_phys: float, + p_meas: float = 0, + init_error: Optional[PauliSum] = None, + ) -> tuple[BinaryVect, PauliSum]: + """Perform a noisy measurement layer on the code. + + Args: + measured_op (str): + The operator to measure ('H', 'C', 'Y', or 'X'). + p_phys (float): + The physical error probability. + p_meas (float, optional): + The measurement error probability. Defaults to 0. + init_error (PauliSum, optional): + Initial error to apply before measurement. Defaults to None. + Returns: + tuple: A tuple containing the syndrome and new error + """ + self.logger.print('\n' + '=' * 50) + self.logger.print(f"Measuring {measured_op}") + self.logger.print('=' * 50) + self.logger.print(f" Initial error:\n\t{init_error}") + + random_pauli = self.generate_error(p_phys) + # random_pauli = 'IIIIZXX' if len(init_error.keys()) == 1 else 'IIXXZIX' + new_error = PauliSum({random_pauli: 1}) + + total_error_before = new_error * init_error if init_error else new_error + + self.logger.print(f"New error:\n\t{new_error}") + self.logger.print(f"Total error before measurement:\n\t{total_error_before}") + + probs, possible_propagations = self.measure(measured_op, total_error_before) + + selected_syndrome = np.random.choice(list(probs.keys()), p=list(probs.values())) + + norm = np.sqrt(probs[selected_syndrome]) + propagated_error = possible_propagations[selected_syndrome] / norm + + if p_meas > 0: + syndrome_perturbation = np.random.choice( + [0, 1], + size=len(selected_syndrome), + p=[1-p_meas, p_meas] + ) + selected_syndrome_perturbed = stringify( + (syndrome_perturbation + unstringify(selected_syndrome)) % 2 + ) + else: + selected_syndrome_perturbed = selected_syndrome + + self.logger.print(f"Syndrome probabilities:\n\t{probs}") + self.logger.print(f"Possible propagations:\n\t{possible_propagations}") + self.logger.print(f"Selected syndrome: {selected_syndrome}") + self.logger.print(f"Selected syndrome perturbed: {selected_syndrome_perturbed}") + self.logger.print(f"Final error:\n\t{propagated_error}") + + return selected_syndrome_perturbed, propagated_error + + def decompose_error(self, error: PauliSum): + """Decompose error into Pauli error times logical gate, assuming + all the terms in the error have the same syndrome + """ + + first_pauli, coeff = list(error.items())[0] + pauli_error = PauliSum({first_pauli: 1}) + factorized_error = pauli_error * error + logical_error = self.physical_to_logical(factorized_error) + + return pauli_error, logical_error + + # @profile + def run_once(self, n_layers, p_phys, p_meas=0, global_measured_op='H'): + result = {'accept': False, 'fail': False} + + physical_error = PauliSum({'I' * self.code.n: 1}) + logical_error = PauliSum({'I': 1}) + for layer in range(n_layers): + if layer == n_layers - 1: + p_meas = 0 + + for measured_op in [global_measured_op, 'C']: + syndrome, physical_error = self.noisy_measurement_layer( + measured_op, + p_phys, + p_meas=p_meas, + init_error=physical_error, + ) + + if '1' in syndrome: + return result + + self.logger.print(f"Physical error:\n{physical_error}") + + if np.all(self.get_syndrome(str_to_bsf(list(physical_error.keys())[0])) == 0): + log_err = self.physical_to_logical(physical_error) + physical_error = PauliSum(self.code.n) + else: + physical_error, log_err = self.decompose_error(physical_error) + self.logger.print(f"Decomposition:\n{physical_error}\n{log_err}\n") + logical_error *= log_err + + + result['accept'] = True + + self.logger.print("Accept!") + self.logger.print(f"Logical error:\n{logical_error}") + + # print(f"Logical error {logical_error}") + + if not ( + logical_error.proportional_to(logical_I) or + (global_measured_op == 'Y' and logical_error.proportional_to(logical_Y)) or + (global_measured_op == 'X' and logical_error.proportional_to(logical_X)) or + (global_measured_op == 'Z' and logical_error.proportional_to(logical_Z)) or + (global_measured_op == 'H' and logical_error.proportional_to(logical_H)) + ): + self.logger.print("Failure!") + self.logger.print(f"Logical error {logical_error}") + result['fail'] = True + + return result + + +class Sampler(sinter.Sampler): + def compiled_sampler_for_task(self, task: sinter.Task) -> sinter.CompiledSampler: + n_layers = task.json_metadata['n_layers'] + measured_op = task.json_metadata['measured_op'] + p_phys = task.json_metadata['p_phys'] + p_meas = task.json_metadata['p_meas'] + + return CompiledSampler(n_layers, measured_op, p_phys, p_meas) + + +class CompiledSampler(sinter.CompiledSampler): + def __init__(self, n_layers, measured_op, p_phys, p_meas): + self.n_layers = n_layers + self.measured_op = measured_op + self.p_phys = p_phys + self.p_meas = p_meas + + logger = Logger('log.txt', logging=False, stdout_print=False) + code = Color666PlanarCode(1) + + self.sim = PhenomenologicalSimulation(code, logger=logger) + + def sample(self, shots: int) -> sinter.AnonTaskStats: + t0 = time.monotonic() + n_accept, n_fail = 0, 0 + for k in range(shots): + run = self.sim.run_once( + self.n_layers, + self.p_phys, + global_measured_op=self.measured_op, + p_meas=self.p_meas + ) + n_fail += int(run['fail']) + n_accept += int(run['accept']) + + t1 = time.monotonic() + + return sinter.AnonTaskStats( + shots=shots, + errors=n_fail, + discards=shots - n_accept, + seconds=t1-t0 + ) + + +if __name__ == "__main__": + max_shots = int(1e8) + max_errors = 1000 + p_phys_list = [0.01, 0.02, 0.05] + measured_op_list = ['H', 'X'] + n_layers_list = [1] + + parameters = product( + p_phys_list, measured_op_list, n_layers_list + ) + + tasks = [] + for p_phys, measured_op, n_layers in parameters: + tasks.append(sinter.Task(circuit=stim.Circuit(), json_metadata={ + 'n_layers': n_layers, + 'measured_op': measured_op, + 'p_phys': p_phys, + 'p_meas': np.sqrt(p_phys) + })) + + state = sinter.collect( + print_progress=True, + save_resume_filepath='result-pheno.csv', + tasks=tasks, + max_shots=max_shots, + max_errors=max_errors, + num_workers=os.cpu_count(), + decoders=['simulation'], + custom_decoders={'simulation': Sampler()} + ) \ No newline at end of file diff --git a/glue/generalized_sim/utils.py b/glue/generalized_sim/utils.py new file mode 100644 index 00000000..00f4894c --- /dev/null +++ b/glue/generalized_sim/utils.py @@ -0,0 +1,114 @@ +from typing import Union + +import numpy as np +import numpy.typing as npt + +table_mult = { + 'II': ('I', 1), + 'IX': ('X', 1), + 'IY': ('Y', 1), + 'IZ': ('Z', 1), + 'XI': ('X', 1), + 'XX': ('I', 1), + 'XY': ('Z', 1j), + 'XZ': ('Y', -1j), + 'YI': ('Y', 1), + 'YX': ('Z', -1j), + 'YY': ('I', 1), + 'YZ': ('X', 1j), + 'ZI': ('Z', 1), + 'ZX': ('Y', 1j), + 'ZY': ('X', -1j), + 'ZZ': ('I', 1), +} + +table_anticommute = { + 'II': 0, + 'IX': 0, + 'IY': 0, + 'IZ': 0, + 'XI': 0, + 'XX': 0, + 'XY': 1, + 'XZ': 1, + 'YI': 0, + 'YX': 1, + 'YY': 0, + 'YZ': 1, + 'ZI': 0, + 'ZX': 1, + 'ZY': 1, + 'ZZ': 0 +} + + +def stringify(vect: Union[npt.NDArray, list]) -> str: + if isinstance(vect, np.ndarray): + vect = vect.tolist() + vect = [str(a) for a in vect] + return ''.join(vect) + + +def unstringify(string: str) -> npt.NDArray: + vect = np.array(list(string), dtype=np.uint8) + return vect + + +def bsf_to_str(bsf_vec: npt.NDArray[np.uint8]) -> str: + n = len(bsf_vec) // 2 + x_part = bsf_vec[:n] + z_part = bsf_vec[n:] + result = [] + for x, z in zip(x_part, z_part): + if x == 0 and z == 0: + result.append('I') + elif x == 1 and z == 0: + result.append('X') + elif x == 0 and z == 1: + result.append('Z') + elif x == 1 and z == 1: + result.append('Y') + return ''.join(result) + + +def str_to_bsf(str_pauli: Union[str, list[str]]) -> npt.NDArray[np.uint8]: + x = np.array([int(c in ('X', 'Y')) for c in str_pauli], dtype=np.uint8) + z = np.array([int(c in ('Z', 'Y')) for c in str_pauli], dtype=np.uint8) + + return np.concatenate([x, z]) + + +def multiply_single_qubit_paulis(p1: str, p2: str): + # Returns (resulting_pauli: str, phase: complex) + return table_mult[p1 + p2] + + +# @profile +def multiply_paulis(p1: str, p2: str) -> tuple[complex, str]: + phase = 1 + pauli_string = [] + for a, b in zip(p1, p2): + pauli, p = multiply_single_qubit_paulis(a, b) + pauli_string.append(pauli) + phase *= p + pauli_string = ''.join(pauli_string) + + return phase, pauli_string + + +def commute(p1: str, p2: str) -> bool: + n_anticommute = 0 + for a, b in zip(p1, p2): + if table_anticommute[a + b]: + n_anticommute += 1 + + return 1 - (n_anticommute % 2) + + +def isclose(a, b, eps=1e-9): + return abs(a - b) < eps + + +def support(pauli: str) -> int: + """Returns the support of a Pauli string, i.e., the number of non-identity terms.""" + return sum(c != 'I' for c in pauli)