|
""" |
|
quantum_utils.py |
|
Helper library for running a simplified VQE or quantum simulation |
|
that references CUDA-Q for computations. |
|
|
|
References: |
|
- @Cuda-Q_install.md for installation details |
|
- @VQE_Example.md for example VQE code using nvidia or nvidia-mqpu backend |
|
""" |
|
|
|
import cudaq |
|
import cudaq_solvers as solvers |
|
import numpy as np |
|
from scipy.optimize import minimize |
|
import spaces |
|
from typing import Dict, List, Union, Any, Tuple |
|
import sys |
|
import logging |
|
import os |
|
from logging.handlers import RotatingFileHandler |
|
import openfermion |
|
import openfermionpyscf |
|
from openfermion.transforms import jordan_wigner, get_fermion_operator |
|
|
|
|
|
os.makedirs('logs', exist_ok=True) |
|
|
|
|
|
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
console_formatter = logging.Formatter('%(levelname)s - %(message)s') |
|
|
|
|
|
file_handler = RotatingFileHandler( |
|
'logs/vqe_simulation.log', |
|
maxBytes=10*1024*1024, |
|
backupCount=5 |
|
) |
|
file_handler.setFormatter(file_formatter) |
|
file_handler.setLevel(logging.DEBUG) |
|
|
|
|
|
console_handler = logging.StreamHandler() |
|
console_handler.setFormatter(console_formatter) |
|
console_handler.setLevel(logging.INFO) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
logger.setLevel(logging.DEBUG) |
|
|
|
|
|
for handler in logger.handlers[:]: |
|
logger.removeHandler(handler) |
|
|
|
|
|
logger.addHandler(file_handler) |
|
logger.addHandler(console_handler) |
|
|
|
|
|
logger.info("VQE Simulation module initialized") |
|
|
|
print("quantum_utils imported", file=sys.stderr, flush=True) |
|
|
|
def setup_target(): |
|
"""Set up CUDA-Q target based on available hardware.""" |
|
try: |
|
print("Setting up target...", file=sys.stderr, flush=True) |
|
gpu_count = cudaq.num_available_gpus() |
|
print(f"Number of available GPUs: {gpu_count}", file=sys.stderr, flush=True) |
|
|
|
if gpu_count > 0: |
|
print("Attempting to set NVIDIA GPU target...", file=sys.stderr, flush=True) |
|
cudaq.set_target("nvidia") |
|
print("Successfully set NVIDIA GPU target", file=sys.stderr, flush=True) |
|
else: |
|
print("No GPU found, attempting to set CPU target...", file=sys.stderr, flush=True) |
|
cudaq.set_target("qpp-cpu") |
|
print("Successfully set CPU target", file=sys.stderr, flush=True) |
|
except Exception as e: |
|
print(f"Error setting up quantum target: {str(e)}", file=sys.stderr, flush=True) |
|
import traceback |
|
print(f"Traceback:\n{traceback.format_exc()}", file=sys.stderr, flush=True) |
|
raise |
|
|
|
def vqe_callback(xk): |
|
"""Callback function for VQE optimization to track progress.""" |
|
try: |
|
logging.info(f"VQE iteration - Current parameters: {xk}") |
|
return True |
|
except Exception as e: |
|
logging.error(f"Error in VQE callback: {str(e)}") |
|
return False |
|
|
|
def validate_ansatz(kernel_generator, init_params, qubit_num, electron_num): |
|
"""Validate the ansatz kernel before VQE.""" |
|
try: |
|
logging.debug("Validating ansatz kernel...") |
|
|
|
setup_target() |
|
|
|
|
|
kernel = kernel_generator(qubit_num, electron_num) |
|
|
|
|
|
logging.debug(f"Kernel parameters type: {type(init_params)}, values: {init_params}") |
|
logging.debug(f"Kernel qubits type: {type(qubit_num)}, value: {qubit_num}") |
|
logging.debug(f"Kernel electrons type: {type(electron_num)}, value: {electron_num}") |
|
|
|
|
|
result = cudaq.sample(kernel, init_params.tolist()) |
|
logging.debug(f"Ansatz validation successful. Sample result: {result}") |
|
return True |
|
except Exception as e: |
|
logging.error(f"Ansatz validation failed: {str(e)}") |
|
return False |
|
|
|
|
|
|
|
|
|
def create_molecular_hamiltonian(geometry: list[tuple[str, tuple[float, float, float]]], |
|
basis: str, |
|
multiplicity: int, |
|
charge: int) -> cudaq.SpinOperator: |
|
""" |
|
Create a SpinOperator for a given molecule using PySCF + OpenFermion + CUDA-Q. |
|
|
|
Parameters |
|
---------- |
|
geometry : list of (str, (float, float, float))) |
|
List describing each atom, e.g. [('H', (0.,0.,0.)), ('H',(0.,0.,bond_length))] |
|
basis : str |
|
Basis set, e.g. "sto-3g" |
|
multiplicity : int |
|
Spin multiplicity |
|
charge : int |
|
Net charge of the molecule |
|
|
|
Returns |
|
------- |
|
cudaq.SpinOperator |
|
The qubit Hamiltonian in CUDA-Q spin-operator form. |
|
""" |
|
try: |
|
|
|
setup_target() |
|
|
|
|
|
molecule = openfermionpyscf.run_pyscf( |
|
openfermion.MolecularData(geometry, basis, multiplicity, charge) |
|
) |
|
|
|
|
|
molecular_hamiltonian = molecule.get_molecular_hamiltonian() |
|
fermion_hamiltonian = get_fermion_operator(molecular_hamiltonian) |
|
qubit_hamiltonian = jordan_wigner(fermion_hamiltonian) |
|
|
|
|
|
spin_op = cudaq.SpinOperator(qubit_hamiltonian) |
|
return spin_op |
|
except Exception as e: |
|
logging.error(f"Failed to create molecular Hamiltonian: {str(e)}") |
|
raise |
|
|
|
|
|
|
|
|
|
@cudaq.kernel |
|
def kernel(qubit_num: int, electron_num: int, thetas: list[float]): |
|
""" |
|
Generate a UCCSD ansatz kernel given the number of qubits and electrons. |
|
|
|
Parameters |
|
---------- |
|
qubit_num : int |
|
Number of qubits in the system |
|
electron_num : int |
|
Number of electrons (which is the same as number of X gates for HF ref.) |
|
|
|
Returns |
|
------- |
|
A CUDA-Q kernel that accepts parameters for the UCCSD ansatz. |
|
""" |
|
qubits = cudaq.qvector(qubit_num) |
|
|
|
for i in range(electron_num): |
|
x(qubits[i]) |
|
|
|
|
|
cudaq.kernels.uccsd(qubits, thetas, electron_num, qubit_num) |
|
|
|
|
|
def cost_function(kernel, spin_hamiltonian, qubit_count, electron_count, thetas): |
|
|
|
exp_val = cudaq.observe(kernel, spin_hamiltonian, qubit_count, electron_count, thetas).expectation() |
|
|
|
logging.info(f"Cost function evaluation: {exp_val:.6f}") |
|
return exp_val |
|
|
|
def expand_geometry(geometry_template: List[List[Any]], scale_factor: float) -> List[Tuple[str, Tuple[float, float, float]]]: |
|
""" |
|
Expand or contract a molecule's geometry by a scale factor. |
|
|
|
Args: |
|
geometry_template: List of [atom_symbol, [x, y, z]] coordinates |
|
scale_factor: Factor to scale the geometry by (1.0 = no change) |
|
|
|
Returns: |
|
List of (atom_symbol, (x, y, z)) tuples with scaled coordinates |
|
""" |
|
scaled_geometry = [] |
|
logging.debug("expand_geometry: Starting with scale_factor=%f", scale_factor) |
|
logging.debug("expand_geometry: Input geometry_template=%s", geometry_template) |
|
|
|
|
|
coords = np.array([coord for _, coord in geometry_template]) |
|
logging.debug("expand_geometry: Computed coordinates array: %s", coords) |
|
center = np.mean(coords, axis=0) |
|
logging.debug("expand_geometry: Computed center of mass: %s", center) |
|
|
|
for atom_symbol, coord in geometry_template: |
|
|
|
coord = np.array(coord) |
|
logging.debug("expand_geometry: Processing atom: %s", atom_symbol) |
|
logging.debug("expand_geometry: Original coordinate: %s", coord) |
|
|
|
vec = coord - center |
|
logging.debug("expand_geometry: Computed vector from center: %s", vec) |
|
|
|
|
|
scaled_coord = center + vec * scale_factor |
|
logging.debug("expand_geometry: Scaled coordinate: %s", scaled_coord) |
|
|
|
scaled_geometry.append((atom_symbol, tuple(scaled_coord))) |
|
|
|
logging.debug("expand_geometry: Final scaled geometry: %s", scaled_geometry) |
|
return scaled_geometry |
|
|
|
|
|
def generate_hamiltonian(molecule_data: Dict[str, Any], |
|
scale_factor: float) -> Dict[str, Any]: |
|
""" |
|
Generate the Hamiltonian and its parameters for a given molecule without running VQE optimization. |
|
|
|
Parameters |
|
---------- |
|
molecule_data : Dict[str, Any] |
|
Dictionary containing all molecule metadata and parameters |
|
scale_factor : float |
|
Factor to scale the molecule geometry by (1.0 = original size) |
|
|
|
Returns |
|
------- |
|
Dict[str, Any] |
|
Dictionary containing Hamiltonian information: |
|
- 'hamiltonian': The CUDA-Q SpinOperator Hamiltonian |
|
- 'qubit_count': Number of qubits needed |
|
- 'electron_count': Number of electrons in the system |
|
- 'parameter_count': Number of UCCSD parameters needed |
|
- 'hamiltonian_terms': Number of terms in the Hamiltonian |
|
- 'circuit_latex': LaTeX representation of the quantum circuit |
|
""" |
|
|
|
gpu_time = molecule_data.get('GPU_time', 60) |
|
logger.info(f"Generating Hamiltonian with GPU time: {gpu_time}") |
|
|
|
def _generate_hamiltonian_inner(): |
|
logging.info(f"Generating Hamiltonian for {molecule_data['name']} with scale factor {scale_factor}") |
|
|
|
|
|
setup_target() |
|
|
|
|
|
geometry = expand_geometry(molecule_data['geometry_template'], scale_factor) |
|
logging.info(f"Created scaled geometry with factor = {scale_factor}") |
|
|
|
|
|
spin_hamiltonian = create_molecular_hamiltonian( |
|
geometry=geometry, |
|
basis=molecule_data['basis'], |
|
multiplicity=molecule_data['multiplicity'], |
|
charge=molecule_data['charge'] |
|
) |
|
|
|
|
|
term_count = spin_hamiltonian.get_term_count() |
|
logging.debug(f"Hamiltonian has {term_count} terms.") |
|
|
|
|
|
|
|
|
|
qubit_count = 2 * molecule_data['spatial_orbitals'] |
|
electron_count = molecule_data['electron_count'] |
|
|
|
|
|
parameter_count = cudaq.kernels.uccsd_num_parameters(electron_count, qubit_count) |
|
logging.info(f"Number of UCCSD parameters needed = {parameter_count}") |
|
|
|
|
|
try: |
|
logging.info("Starting circuit LaTeX generation") |
|
thetas_draw = np.random.normal(0, 1, parameter_count) |
|
|
|
circuit_latex = cudaq.draw(kernel, qubit_count, electron_count, thetas_draw) |
|
|
|
|
|
MAX_CIRCUIT_LENGTH = 15000 |
|
if len(circuit_latex) > MAX_CIRCUIT_LENGTH: |
|
circuit_latex = circuit_latex[:MAX_CIRCUIT_LENGTH] + "\n... (circuit visualization clipped for size)" |
|
|
|
logging.info("Successfully generated circuit LaTeX representation") |
|
logging.debug(f"Circuit LaTeX length: {len(circuit_latex)} characters") |
|
except Exception as e: |
|
logging.error(f"Failed to generate circuit LaTeX: {str(e)}") |
|
circuit_latex = "Error generating circuit visualization" |
|
|
|
return { |
|
'hamiltonian': spin_hamiltonian, |
|
'qubit_count': qubit_count, |
|
'electron_count': electron_count, |
|
'parameter_count': parameter_count, |
|
'hamiltonian_terms': term_count, |
|
'circuit_latex': circuit_latex |
|
} |
|
|
|
return _generate_hamiltonian_inner() |
|
|
|
def run_vqe_simulation(molecule_data: Dict[str, Any], |
|
scale_factor: float, |
|
hamiltonian_only: bool = False) -> Dict[str, Any]: |
|
""" |
|
Run a VQE simulation using CUDA-Q. |
|
|
|
Parameters |
|
---------- |
|
molecule_data : Dict[str, Any] |
|
Dictionary containing all molecule metadata and parameters |
|
scale_factor : float |
|
Factor to scale the molecule geometry by (1.0 = original size) |
|
hamiltonian_only : bool |
|
If True, only generate the Hamiltonian without running VQE optimization |
|
|
|
Returns |
|
------- |
|
Dict[str, Any] |
|
The dictionary containing either just Hamiltonian info or full VQE results |
|
""" |
|
|
|
gpu_time = molecule_data.get('GPU_time', 60) |
|
logger.info(f"Running VQE simulation with GPU time: {gpu_time}") |
|
|
|
@spaces.GPU(duration=gpu_time) |
|
def _run_vqe_simulation_inner(): |
|
setup_target() |
|
|
|
|
|
logger.info("Generating Hamiltonian") |
|
ham_info = generate_hamiltonian(molecule_data, scale_factor) |
|
|
|
|
|
if hamiltonian_only: |
|
return { |
|
'parameter_count': ham_info['parameter_count'], |
|
'hamiltonian_terms': ham_info['hamiltonian_terms'], |
|
'qubit_count': ham_info['qubit_count'], |
|
'electron_count': ham_info['electron_count'], |
|
'message': "Hamiltonian generated successfully", |
|
'circuit_latex': ham_info['circuit_latex'] |
|
} |
|
|
|
|
|
optimizer_max_iterations = molecule_data.get('iterations', 25) |
|
|
|
|
|
thetas0 = np.random.normal(0, 1, ham_info['parameter_count']) |
|
logging.info(f"Initial parameters: {thetas0}") |
|
|
|
|
|
def objective_wrapper(thetas): |
|
return cost_function(kernel, ham_info['hamiltonian'], ham_info['qubit_count'], ham_info['electron_count'], thetas) |
|
|
|
|
|
exp_vals = [] |
|
def callback(xk): |
|
val = objective_wrapper(xk) |
|
exp_vals.append(val) |
|
return True |
|
|
|
optimization_success = False |
|
optimization_message = "" |
|
final_energy = float("inf") |
|
final_parameters = [] |
|
|
|
try: |
|
|
|
test_val = objective_wrapper(thetas0) |
|
logging.info(f"Debug cost check (initial params): {test_val:.6f}") |
|
|
|
if not np.isfinite(test_val): |
|
logging.warning("Debug cost check returned non-finite value. The optimizer may fail.") |
|
|
|
|
|
result = minimize(objective_wrapper, |
|
thetas0, |
|
method='COBYLA', |
|
callback=callback, |
|
options={'maxiter': optimizer_max_iterations}) |
|
|
|
|
|
final_energy = result.fun |
|
final_parameters = result.x |
|
optimization_success = result.success |
|
optimization_message = ( |
|
"VQE optimization completed successfully." if result.success |
|
else f"VQE optimization completed with status: {result.message}" |
|
) |
|
|
|
except Exception as e: |
|
logging.error(f"VQE optimization error: {str(e)}") |
|
optimization_message = f"VQE optimization error: {str(e)}" |
|
optimization_success = False |
|
|
|
logging.info(optimization_message) |
|
logging.debug(f"Final energy: {final_energy}") |
|
logging.debug(f"Optimized parameters: {final_parameters}") |
|
|
|
|
|
results = { |
|
"final_energy": float(final_energy), |
|
"parameters": list(final_parameters), |
|
"success": optimization_success, |
|
"iterations": len(exp_vals), |
|
"history": exp_vals, |
|
"message": optimization_message, |
|
"parameter_count": ham_info['parameter_count'], |
|
"hamiltonian_terms": ham_info['hamiltonian_terms'] |
|
} |
|
return results |
|
|
|
return _run_vqe_simulation_inner() |