Amazon Braket Intermediate Free 6/11 in series 14 min read

Quantum-Classical Hybrid Jobs on Amazon Braket

Run variational quantum algorithms using Amazon Braket Hybrid Jobs: managed classical compute + quantum access in one job, with checkpointing and cost tracking.

What you'll learn

  • Amazon Braket
  • hybrid jobs
  • VQE
  • QAOA
  • AWS
  • hybrid quantum-classical

Prerequisites

  • Python proficiency
  • Beginner quantum computing concepts (superposition, entanglement)
  • Linear algebra basics

What Braket Hybrid Jobs Are

Amazon Braket Hybrid Jobs is a managed execution environment that runs your Python script on a classical EC2 instance while giving it direct access to quantum hardware or simulators, all in a single, trackable job.

Without Hybrid Jobs, running a variational algorithm means manually polling for circuit results, managing retries when a QPU goes offline, and assembling partial results across many API calls. Hybrid Jobs handles all of that. Your script runs to completion, checkpoints are preserved, and costs are attributed to a single job ID.

The key architectural difference: the classical optimizer loop runs inside the job, not on your laptop. This eliminates the round-trip latency between your machine and the AWS API on every optimizer step.

When to Use Hybrid Jobs

Hybrid Jobs is the right tool whenever your algorithm requires many circuits in a feedback loop:

  • VQE: hundreds of Estimator calls per optimizer step, across many steps
  • QAOA: expectation values of a cost Hamiltonian evaluated at each parameter update
  • Quantum machine learning: gradient descent on a parameterized circuit with shot-based gradients
  • Adaptive algorithms: algorithms that choose their next circuit based on prior measurements

Ad-hoc scripting from a local machine is fine for one-off experiments. For anything that runs for more than a few minutes of quantum time, Hybrid Jobs provides better reliability and observability.

Job Structure

A Braket Hybrid Job has four components:

  1. Algorithm script: A Python file that imports braket.jobs and calls save_job_result() at the end.
  2. Input data: Optional S3 files mounted into the job container.
  3. Instance type: The EC2 instance class running the classical compute (e.g., ml.m5.large).
  4. Device ARN: The quantum backend, either a real QPU or a managed simulator like SV1 or TN1.

Architecture Deep-Dive

When you submit a Hybrid Job, Braket provisions an EC2 instance of the type you specified and launches a managed Docker container on it. This container comes pre-installed with the Amazon Braket SDK, NumPy, SciPy, and other common scientific Python packages. Your algorithm script runs inside this container as a regular Python process.

The Braket runtime injects several environment variables into the container before your script starts. These variables tell your script which device to use, where to write results, and where to find checkpoints:

VariablePurpose
AMZN_BRAKET_DEVICE_ARNThe device ARN passed to AwsQuantumJob.create()
AMZN_BRAKET_JOB_NAMEThe unique name of this job
AMZN_BRAKET_JOB_RESULTS_DIRLocal filesystem path for writing output files
AMZN_BRAKET_CHECKPOINT_DIRLocal filesystem path for reading/writing checkpoints
AMZN_BRAKET_TASK_RESULTS_S3_URIThe S3 URI where quantum task results are stored

Your algorithm script should always read the device ARN from the environment rather than hardcoding it. This makes the same script work with any backend, whether SV1, TN1, or a real QPU like IonQ Aria or IQM Garnet.

Here is a utility snippet that prints all Braket-injected environment variables at the start of your job, which is useful for debugging:

# print_env.py
import os

BRAKET_ENV_VARS = [
    "AMZN_BRAKET_DEVICE_ARN",
    "AMZN_BRAKET_JOB_NAME",
    "AMZN_BRAKET_JOB_RESULTS_DIR",
    "AMZN_BRAKET_CHECKPOINT_DIR",
    "AMZN_BRAKET_TASK_RESULTS_S3_URI",
]

def print_braket_env():
    """Print all Braket runtime environment variables."""
    print("=== Braket Runtime Environment ===")
    for var in BRAKET_ENV_VARS:
        value = os.environ.get(var, "<not set>")
        print(f"  {var} = {value}")
    print("==================================")

if __name__ == "__main__":
    print_braket_env()

When the job completes, the Braket runtime automatically uploads the contents of AMZN_BRAKET_JOB_RESULTS_DIR to S3. Anything you write to that directory, including custom files, numpy arrays, or serialized models, becomes part of the job’s output.

Step 1: Write the Algorithm Script (QAOA MaxCut)

The algorithm script is an ordinary Python file. It uses environment variables injected by the Braket runtime and calls save_job_result() to persist outputs to S3.

# algorithm_script.py
import os
import numpy as np
from braket.aws import AwsDevice
from braket.circuits import Circuit
from braket.jobs import save_job_result
from braket.jobs.metrics import log_metric
from scipy.optimize import minimize

DEVICE_ARN = os.environ.get("AMZN_BRAKET_DEVICE_ARN")

def build_qaoa_circuit(params: np.ndarray, edges: list) -> Circuit:
    gamma, beta = params[0], params[1]
    n = 1 + max(max(e) for e in edges)
    circ = Circuit()
    for q in range(n):
        circ.h(q)
    for u, v in edges:
        circ.cnot(u, v).rz(v, 2 * gamma).cnot(u, v)
    for q in range(n):
        circ.rx(q, 2 * beta)
    return circ

def expected_cut(params, device, edges):
    n = 1 + max(max(e) for e in edges)
    circ = build_qaoa_circuit(params, edges)
    circ.probability()
    probs = device.run(circ, shots=512).result().values[0]
    cost = 0.0
    for idx, p in enumerate(probs):
        bits = format(idx, f"0{n}b")
        cost -= sum(1 for u, v in edges if bits[u] != bits[v]) * p
    return cost

def main():
    edges = [[0, 1], [1, 2], [2, 3], [3, 0]]
    device = AwsDevice(DEVICE_ARN)
    step = [0]

    def objective(params):
        cost = expected_cut(params, device, edges)
        log_metric("cost", float(cost), step[0])
        step[0] += 1
        return cost

    res = minimize(objective, np.array([0.1, 0.5]), method="COBYLA",
                   options={"maxiter": 30})
    save_job_result({"optimized_params": res.x.tolist(), "best_cost": float(res.fun)})

if __name__ == "__main__":
    main()

Step 2: Submit with AwsQuantumJob.create()

from braket.aws import AwsQuantumJob

job = AwsQuantumJob.create(
    device="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
    source_module="algorithm_script.py",
    entry_point="algorithm_script:main",
    job_name="qaoa-maxcut-demo",
    instance_config=AwsQuantumJob.InstanceConfig(instanceType="ml.m5.large"),
    wait_until_complete=False,   # Submit and return immediately
)

print(f"Job ARN: {job.arn}")
print(f"Job state: {job.state()}")

Setting wait_until_complete=False lets you submit and move on. The job runs on AWS until it completes or fails.

Passing Hyperparameters

Hardcoding algorithm parameters in the script limits reusability. The hyperparameters argument to AwsQuantumJob.create() lets you pass configuration values that are accessible inside the algorithm script as environment variables.

from braket.aws import AwsQuantumJob

job = AwsQuantumJob.create(
    device="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
    source_module="algorithm_script.py",
    entry_point="algorithm_script:main",
    job_name="qaoa-sweep-01",
    instance_config=AwsQuantumJob.InstanceConfig(instanceType="ml.m5.large"),
    hyperparameters={
        "n_iterations": "50",
        "n_shots": "512",
        "edges": "0-1,1-2,2-3,3-0",
    },
    wait_until_complete=False,
)

All hyperparameter values must be strings. Inside the algorithm script, retrieve them using os.environ and parse as needed:

# algorithm_script_with_hyperparams.py
import os
import json
import numpy as np
from braket.aws import AwsDevice
from braket.circuits import Circuit
from braket.jobs import save_job_result
from braket.jobs.metrics import log_metric
from scipy.optimize import minimize

def parse_edges(edge_str: str) -> list:
    """Parse edge string like '0-1,1-2,2-3,3-0' into a list of pairs."""
    edges = []
    for pair in edge_str.split(","):
        u, v = pair.split("-")
        edges.append([int(u), int(v)])
    return edges

def main():
    # Read hyperparameters from environment
    device_arn = os.environ["AMZN_BRAKET_DEVICE_ARN"]
    n_iterations = int(os.environ.get("n_iterations", "30"))
    n_shots = int(os.environ.get("n_shots", "512"))
    edges_str = os.environ.get("edges", "0-1,1-2,2-3,3-0")

    edges = parse_edges(edges_str)
    device = AwsDevice(device_arn)
    n_qubits = 1 + max(max(e) for e in edges)

    print(f"Running QAOA on {n_qubits}-qubit graph")
    print(f"Edges: {edges}")
    print(f"Max iterations: {n_iterations}, Shots per eval: {n_shots}")

    step = [0]

    def objective(params):
        circ = build_qaoa_circuit(params, edges)
        circ.probability()
        probs = device.run(circ, shots=n_shots).result().values[0]
        cost = 0.0
        for idx, p in enumerate(probs):
            bits = format(idx, f"0{n_qubits}b")
            cost -= sum(1 for u, v in edges if bits[u] != bits[v]) * p
        log_metric("cost", float(cost), step[0])
        step[0] += 1
        return cost

    res = minimize(objective, np.array([0.1, 0.5]), method="COBYLA",
                   options={"maxiter": n_iterations})
    save_job_result({
        "optimized_params": res.x.tolist(),
        "best_cost": float(res.fun),
        "n_evals": step[0],
    })

if __name__ == "__main__":
    main()

This pattern makes it easy to run parameter sweeps. Submit several jobs with different hyperparameter values and compare results across runs.

VQE Example: Finding the Ground State of H2

VQE (Variational Quantum Eigensolver) is one of the most common use cases for Hybrid Jobs. Here is a complete VQE implementation that estimates the ground state energy of molecular hydrogen using a simplified Hamiltonian.

The H2 Hamiltonian in the STO-3G basis, after qubit mapping, can be approximated as a sum of Pauli terms:

H = -1.0523 * II + 0.3979 * IZ - 0.3979 * ZI - 0.0112 * ZZ + 0.1809 * XX

Each term is a tensor product of Pauli operators acting on two qubits, with a real coefficient. The goal is to find the parameters of a quantum circuit (the ansatz) that minimize the expectation value of this Hamiltonian.

# vqe_h2.py
import os
import numpy as np
from braket.circuits import Circuit
from braket.devices import LocalSimulator
from braket.jobs import save_job_result
from braket.jobs.metrics import log_metric
from scipy.optimize import minimize

# H2 Hamiltonian coefficients (STO-3G, Jordan-Wigner mapping)
HAMILTONIAN = [
    (-1.0523, "II"),
    ( 0.3979, "IZ"),
    (-0.3979, "ZI"),
    (-0.0112, "ZZ"),
    ( 0.1809, "XX"),
]

def build_ansatz(params: np.ndarray) -> Circuit:
    """UCCSD-inspired ansatz for 2 qubits.

    The ansatz applies parameterized Ry rotations followed by a CNOT
    entangling gate, then another layer of Ry rotations. This is
    expressive enough to capture the ground state of H2.
    """
    theta0, theta1, theta2, theta3 = params
    circ = Circuit()
    # First rotation layer
    circ.ry(0, theta0)
    circ.ry(1, theta1)
    # Entangling gate
    circ.cnot(0, 1)
    # Second rotation layer
    circ.ry(0, theta2)
    circ.ry(1, theta3)
    return circ

def measure_pauli_expectation(circ: Circuit, pauli_str: str,
                               device, n_shots: int) -> float:
    """Measure the expectation value of a 2-qubit Pauli string.

    For each qubit, the function applies a basis rotation before
    measurement: H for X, Rx(pi/2) for Y, or nothing for Z/I.
    Then it computes <P> from the measurement counts.
    """
    meas_circ = circ.copy()

    # Apply basis rotations
    for i, pauli in enumerate(pauli_str):
        if pauli == "X":
            meas_circ.h(i)
        elif pauli == "Y":
            meas_circ.rx(i, -np.pi / 2)
        # Z and I need no rotation

    # Skip measurement for identity terms
    if pauli_str == "II":
        return 1.0

    # Add measurement on active qubits
    active_qubits = [i for i, p in enumerate(pauli_str) if p != "I"]
    meas_circ.probability()

    result = device.run(meas_circ, shots=n_shots).result()
    probs = result.values[0]

    # Compute expectation value: sum over all bitstrings
    n_qubits = len(pauli_str)
    expectation = 0.0
    for idx, p in enumerate(probs):
        bits = format(idx, f"0{n_qubits}b")
        # Eigenvalue is (-1)^(sum of measured bits on active qubits)
        parity = sum(int(bits[q]) for q in active_qubits) % 2
        eigenvalue = 1.0 - 2.0 * parity
        expectation += eigenvalue * p

    return expectation

def compute_energy(params: np.ndarray, device, n_shots: int) -> float:
    """Compute <psi(params)|H|psi(params)> by measuring each Pauli term."""
    circ = build_ansatz(params)
    energy = 0.0
    for coeff, pauli_str in HAMILTONIAN:
        exp_val = measure_pauli_expectation(circ, pauli_str, device, n_shots)
        energy += coeff * exp_val
    return energy

def main():
    device = LocalSimulator()
    n_shots = int(os.environ.get("n_shots", "1024"))
    n_iterations = int(os.environ.get("n_iterations", "100"))

    step = [0]
    energy_history = []

    def objective(params):
        energy = compute_energy(params, device, n_shots)
        energy_history.append(energy)
        log_metric("energy", float(energy), step[0])
        log_metric("param_norm", float(np.linalg.norm(params)), step[0])
        step[0] += 1
        return energy

    # Random initial parameters
    init_params = np.random.uniform(-np.pi, np.pi, size=4)

    res = minimize(objective, init_params, method="COBYLA",
                   options={"maxiter": n_iterations, "rhobeg": 0.5})

    print(f"VQE converged to energy: {res.fun:.6f} Ha")
    print(f"Exact ground state energy: -1.1373 Ha")
    print(f"Error: {abs(res.fun - (-1.1373)):.6f} Ha")

    save_job_result({
        "best_params": res.x.tolist(),
        "final_energy": float(res.fun),
        "energy_history": energy_history,
        "n_evaluations": step[0],
    })

if __name__ == "__main__":
    main()

The exact ground state energy of H2 in this basis is approximately -1.1373 Hartree. With enough shots and optimizer iterations, VQE converges to within a few milliHartree of this value.

Metrics and Monitoring with log_metric

The log_metric function writes structured metrics to CloudWatch. Each call records a metric name, a numeric value, and an iteration index. These metrics appear in the Braket console under the job’s detail page, and you can query them programmatically.

Inside the algorithm script, log metrics at every optimizer step:

from braket.jobs.metrics import log_metric

def objective(params):
    energy = compute_energy(params, device, n_shots)
    grad_norm = estimate_gradient_norm(params, device, n_shots)

    log_metric("energy", float(energy), iteration)
    log_metric("gradient_norm", float(grad_norm), iteration)
    log_metric("step_size", float(current_step_size), iteration)

    return energy

After the job completes, retrieve the metrics programmatically:

from braket.aws import AwsQuantumJob

job = AwsQuantumJob(arn="arn:aws:braket:us-east-1:123456789:job/vqe-h2-run")

# metrics() returns a dict: metric_name -> list of (value, timestamp) tuples
metrics = job.metrics()

for name, values in metrics.items():
    print(f"Metric: {name}")
    for value, timestamp in values[-5:]:  # Last 5 entries
        print(f"  {timestamp}: {value}")

This is especially useful for monitoring long-running VQE jobs. You can check whether the energy is still decreasing without waiting for the job to finish.

Multiple Device Support Per Job

A single Hybrid Job script can use multiple quantum backends in sequence. This is useful for a common workflow: prototype the first few iterations on a fast, cheap simulator to find a good starting point, then switch to a real QPU for the final optimization.

# multi_device_vqe.py
import os
import numpy as np
from braket.aws import AwsDevice
from braket.jobs import save_job_result
from braket.jobs.metrics import log_metric

SV1_ARN = "arn:aws:braket:::device/quantum-simulator/amazon/sv1"

def main():
    # Primary device from the job's device ARN (could be a QPU)
    primary_arn = os.environ["AMZN_BRAKET_DEVICE_ARN"]

    sv1_device = AwsDevice(SV1_ARN)
    primary_device = AwsDevice(primary_arn)

    params = np.random.uniform(-np.pi, np.pi, size=4)
    total_iterations = 50
    simulator_warmup = 10

    print(f"Phase 1: {simulator_warmup} iterations on SV1 simulator")
    for i in range(simulator_warmup):
        energy = compute_energy(params, sv1_device, n_shots=4096)
        params = cobyla_step(params, energy)
        log_metric("energy", float(energy), i)
        log_metric("device", 0.0, i)  # 0 = simulator

    print(f"Phase 2: {total_iterations - simulator_warmup} iterations on {primary_arn}")
    for i in range(simulator_warmup, total_iterations):
        energy = compute_energy(params, primary_device, n_shots=512)
        params = cobyla_step(params, energy)
        log_metric("energy", float(energy), i)
        log_metric("device", 1.0, i)  # 1 = QPU

    save_job_result({
        "best_params": params.tolist(),
        "final_energy": float(energy),
    })

The SV1 simulator supports high shot counts cheaply (no per-shot fees for circuits under 25 qubits), making it ideal for the initial exploration phase. When you submit this job, set the device parameter to the QPU ARN; the script hardcodes SV1 for the warmup phase.

Parallel Circuit Execution

Within a single Hybrid Job, you can parallelize independent circuit evaluations to reduce wall-clock time. This is particularly useful for QAOA parameter sweeps where you evaluate the cost function at multiple candidate parameter sets simultaneously.

# parallel_evaluation.py
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from braket.aws import AwsDevice
from braket.circuits import Circuit

def evaluate_params(params, device, edges, n_shots):
    """Evaluate a single parameter set. Thread-safe because each
    call creates its own circuit and quantum task."""
    circ = build_qaoa_circuit(params, edges)
    circ.probability()
    result = device.run(circ, shots=n_shots).result()
    probs = result.values[0]
    n_qubits = 1 + max(max(e) for e in edges)
    cost = 0.0
    for idx, p in enumerate(probs):
        bits = format(idx, f"0{n_qubits}b")
        cost -= sum(1 for u, v in edges if bits[u] != bits[v]) * p
    return params, cost

def parallel_sweep(param_sets, device, edges, n_shots, max_workers=4):
    """Evaluate multiple parameter sets in parallel.

    Each evaluation submits an independent quantum task, so they can
    run concurrently. On SV1, this can reduce wall-clock time by
    up to max_workers times.
    """
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(evaluate_params, p, device, edges, n_shots): i
            for i, p in enumerate(param_sets)
        }
        for future in as_completed(futures):
            idx = futures[future]
            params, cost = future.result()
            results[idx] = (params, cost)
            print(f"  Set {idx}: cost = {cost:.4f}")

    return results

# Usage: sweep over a 4x4 grid of (gamma, beta) values
gammas = np.linspace(0.0, np.pi, 4)
betas = np.linspace(0.0, np.pi, 4)
param_sets = [np.array([g, b]) for g in gammas for b in betas]

device = AwsDevice(os.environ["AMZN_BRAKET_DEVICE_ARN"])
edges = [[0, 1], [1, 2], [2, 3], [3, 0]]

results = parallel_sweep(param_sets, device, edges, n_shots=512, max_workers=4)
best_idx = min(results, key=lambda i: results[i][1])
best_params, best_cost = results[best_idx]
print(f"Best params: gamma={best_params[0]:.3f}, beta={best_params[1]:.3f}")
print(f"Best cost: {best_cost:.4f}")

Note that parallelism works best with managed simulators (SV1, TN1) because they can process multiple tasks concurrently. For real QPUs, tasks enter a queue and execute sequentially on the hardware, so parallelism only helps overlap the classical processing and network overhead.

Error Handling and Retry Logic

Braket tasks can fail due to QPU unavailability, transient network issues, or service throttling. A production-quality algorithm script should include retry logic rather than failing on the first error.

# retry_helpers.py
import time
from braket.aws import AwsDevice
from braket.circuits import Circuit

class DeviceUnavailableError(Exception):
    pass

def run_with_retry(circuit: Circuit, device: AwsDevice, shots: int,
                   max_retries: int = 3, base_delay: float = 2.0):
    """Run a circuit with exponential backoff retry on failure.

    Catches common transient errors and retries up to max_retries
    times. The delay doubles after each failed attempt.
    """
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            task = device.run(circuit, shots=shots)
            result = task.result()
            return result
        except Exception as e:
            last_exception = e
            error_msg = str(e).lower()

            # Do not retry on permanent errors
            if "invalid" in error_msg or "validation" in error_msg:
                raise

            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)
                print(f"  Attempt {attempt + 1} failed: {e}")
                print(f"  Retrying in {delay:.1f}s...")
                time.sleep(delay)

    raise RuntimeError(
        f"Task failed after {max_retries + 1} attempts: {last_exception}"
    )

def run_with_fallback(circuit: Circuit, primary_device: AwsDevice,
                      fallback_arn: str, shots: int):
    """Try the primary device first; fall back to SV1 if it is offline.

    Checks the device availability window before attempting the task.
    If the primary device is unavailable, uses the fallback simulator.
    """
    try:
        # Check if the primary device is currently available
        status = primary_device.status
        if status != "ONLINE":
            print(f"  Primary device is {status}, falling back to {fallback_arn}")
            fallback = AwsDevice(fallback_arn)
            return fallback.run(circuit, shots=shots).result(), "fallback"

        result = run_with_retry(circuit, primary_device, shots)
        return result, "primary"

    except Exception as e:
        print(f"  Primary device failed: {e}")
        print(f"  Falling back to {fallback_arn}")
        fallback = AwsDevice(fallback_arn)
        return fallback.run(circuit, shots=shots).result(), "fallback"

Use these helpers in your objective function to make your VQE or QAOA script resilient to transient failures:

SV1_ARN = "arn:aws:braket:::device/quantum-simulator/amazon/sv1"

def objective(params):
    circ = build_ansatz(params)
    circ.probability()
    result, source = run_with_fallback(circ, qpu_device, SV1_ARN, shots=512)
    log_metric("fallback_used", 1.0 if source == "fallback" else 0.0, step[0])
    # ... compute energy from result

Step 3: Monitor and Retrieve Results

import time
from braket.aws import AwsQuantumJob

job = AwsQuantumJob(arn="arn:aws:braket:us-east-1:123456789:job/qaoa-maxcut-demo")
while job.state() in ("QUEUED", "RUNNING"):
    time.sleep(30)

result = job.result() if job.state() == "COMPLETED" else None
if result:
    print(result["optimized_params"], result["best_cost"])
else:
    print(job.state(), job.logs())

Results are stored in S3 under the job’s output prefix and fetched automatically by job.result().

Job Lifecycle and States

A Braket Hybrid Job transitions through a well-defined set of states:

  1. QUEUED: The job has been submitted and is waiting for EC2 resources to become available. For Spot instances, this may take longer during periods of high demand.
  2. INITIALIZING: An EC2 instance has been provisioned and the Docker container is starting. The Braket SDK and your source module are being loaded.
  3. RUNNING: Your algorithm script is executing. Quantum tasks submitted during this phase appear as child tasks under the job.
  4. COMPLETED: The script exited successfully (exit code 0). Results are available via job.result().
  5. FAILED: The script raised an unhandled exception or exited with a non-zero code. Use job.logs() to inspect the error.
  6. CANCELLING: A cancellation request has been sent but the job has not yet stopped.
  7. CANCELLED: The job was cancelled before it finished.

You can manage jobs programmatically:

from braket.aws import AwsQuantumJob

# Cancel a running job
job = AwsQuantumJob(arn="arn:aws:braket:us-east-1:123456789:job/my-job")
job.cancel()
print(f"Job state after cancel: {job.state()}")

# List all jobs in the account (returns a list of job summary dicts)
all_jobs = AwsQuantumJob.get_all_jobs()
for j in all_jobs[:5]:
    print(f"  {j['jobName']}: {j['status']}")

# Stream logs from a failed job
failed_job = AwsQuantumJob(arn="arn:aws:braket:us-east-1:123456789:job/failed-job")
failed_job.logs(poll_interval_seconds=5)

The logs() method streams CloudWatch logs to your terminal, similar to docker logs -f. For failed jobs, the log output typically includes the Python traceback that caused the failure.

Result Post-Processing and S3 Storage

The save_job_result() function serializes a Python dict to JSON and uploads it to the job’s S3 output location. For simple results like optimized parameters and energies, this is sufficient. For richer output, you can also write files directly to the results directory.

# rich_results.py
import os
import json
import numpy as np
from braket.jobs import save_job_result

def save_rich_results(best_params, final_energy, energy_history, param_history):
    """Save both structured results and raw numpy arrays."""

    # Standard result dict (retrieved via job.result())
    save_job_result({
        "best_params": best_params.tolist(),
        "final_energy": float(final_energy),
        "energy_history": [float(e) for e in energy_history],
        "n_iterations": len(energy_history),
    })

    # Save large arrays directly to the results directory
    results_dir = os.environ["AMZN_BRAKET_JOB_RESULTS_DIR"]

    # Full parameter trajectory as a numpy file
    param_array = np.array(param_history)
    np.save(os.path.join(results_dir, "param_trajectory.npy"), param_array)

    # Energy history as a numpy file (for quick plotting)
    energy_array = np.array(energy_history)
    np.save(os.path.join(results_dir, "energy_history.npy"), energy_array)

    # Metadata as JSON
    metadata = {
        "n_qubits": 2,
        "ansatz": "uccsd_inspired",
        "optimizer": "COBYLA",
        "hamiltonian": "H2_STO3G",
    }
    with open(os.path.join(results_dir, "metadata.json"), "w") as f:
        json.dump(metadata, f, indent=2)

    print(f"Results saved to {results_dir}")

After the job completes, retrieve results in two ways:

from braket.aws import AwsQuantumJob
import boto3
import numpy as np
import io

job = AwsQuantumJob(arn="arn:aws:braket:us-east-1:123456789:job/vqe-h2-run")

# Method 1: Structured result dict
result = job.result()
print(f"Final energy: {result['final_energy']:.6f}")
print(f"Iterations: {result['n_iterations']}")

# Method 2: Download numpy files from S3
s3 = boto3.client("s3")

# The job's output S3 path is in the job metadata
job_metadata = job.metadata()
output_s3_uri = job_metadata["outputDataConfig"]["s3Path"]
# Parse bucket and key from s3://bucket/key format
bucket = output_s3_uri.split("/")[2]
prefix = "/".join(output_s3_uri.split("/")[3:])

# Download the parameter trajectory
obj = s3.get_object(Bucket=bucket, Key=f"{prefix}/param_trajectory.npy")
param_trajectory = np.load(io.BytesIO(obj["Body"].read()))
print(f"Parameter trajectory shape: {param_trajectory.shape}")

Checkpointing

If a job fails mid-optimization, checkpointing lets it resume from the last saved state. Inside the algorithm script, call save_job_checkpoint after each optimizer step and load_job_checkpoint at startup:

import os
import numpy as np
from braket.jobs import load_job_checkpoint, save_job_checkpoint

CHECKPOINT_FILE = "optimizer_state"

def load_or_init_params(default: np.ndarray) -> tuple[np.ndarray, int]:
    try:
        ckpt = load_job_checkpoint(
            job_name=os.environ["AMZN_BRAKET_JOB_NAME"],
            checkpoint_file_suffix=CHECKPOINT_FILE,
        )
        return np.array(ckpt["params"]), int(ckpt["iteration"])
    except Exception:
        return default, 0

def save_checkpoint(params: np.ndarray, iteration: int) -> None:
    save_job_checkpoint(
        checkpoint_data={"params": params.tolist(), "iteration": iteration},
        checkpoint_file_suffix=CHECKPOINT_FILE,
    )

On a retry, the job reads the latest checkpoint from S3 and skips the iterations already completed. Checkpointing is especially important when using Spot instances, since your EC2 instance can be reclaimed at any time. When the Braket runtime restarts the job, it automatically makes the previous checkpoint available at AMZN_BRAKET_CHECKPOINT_DIR.

Here is a complete optimization loop with integrated checkpointing:

def run_optimization_with_checkpoints(device, edges, n_shots, n_iterations):
    default_params = np.array([0.1, 0.5])
    params, start_iter = load_or_init_params(default_params)
    energy_history = []

    print(f"Starting from iteration {start_iter}")

    for i in range(start_iter, n_iterations):
        energy = expected_cut(params, device, edges, n_shots)
        energy_history.append(energy)
        log_metric("energy", float(energy), i)

        # Compute a simple finite-difference gradient
        grad = np.zeros_like(params)
        epsilon = 0.01
        for j in range(len(params)):
            params_plus = params.copy()
            params_plus[j] += epsilon
            e_plus = expected_cut(params_plus, device, edges, n_shots)
            grad[j] = (e_plus - energy) / epsilon

        # Gradient descent step
        params = params - 0.1 * grad

        # Checkpoint after every iteration
        save_checkpoint(params, i + 1)
        print(f"  Iteration {i}: energy = {energy:.4f}, checkpointed")

    return params, energy_history

Custom Docker Container

The default Braket container includes the Braket SDK, NumPy, SciPy, and a handful of other packages. If your algorithm requires additional dependencies (PennyLane, OpenFermion, custom libraries), you can supply a custom Docker image.

First, create a Dockerfile:

FROM public.ecr.aws/amazonlinux/amazonlinux:2

# Install Python 3.10 and pip
RUN yum update -y && \
    amazon-linux-extras install python3.8 -y && \
    yum install python3-pip -y && \
    pip3 install --upgrade pip

# Install the Braket SDK (required) plus custom packages
RUN pip3 install \
    amazon-braket-sdk>=1.56.0 \
    pennylane>=0.33.0 \
    pennylane-braket>=1.20.0 \
    openfermion>=1.6.0 \
    openfermionpyscf>=0.5

# Copy your algorithm code
COPY algorithm_script.py /opt/braket/code/

WORKDIR /opt/braket/code

Build the image and push it to Amazon ECR:

# Build the Docker image
docker build -t braket-custom:latest .

# Authenticate with ECR
aws ecr get-login-password --region us-east-1 | \
    docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com

# Create the ECR repository (first time only)
aws ecr create-repository --repository-name braket-custom

# Tag and push
docker tag braket-custom:latest \
    123456789.dkr.ecr.us-east-1.amazonaws.com/braket-custom:latest
docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/braket-custom:latest

Then reference the image in your job submission:

from braket.aws import AwsQuantumJob

job = AwsQuantumJob.create(
    device="arn:aws:braket:::device/quantum-simulator/amazon/sv1",
    source_module="algorithm_script.py",
    entry_point="algorithm_script:main",
    job_name="vqe-pennylane-custom",
    instance_config=AwsQuantumJob.InstanceConfig(instanceType="ml.m5.large"),
    image_uri="123456789.dkr.ecr.us-east-1.amazonaws.com/braket-custom:latest",
    wait_until_complete=False,
)

The Braket runtime still injects the same environment variables into your custom container. Your code accesses AMZN_BRAKET_DEVICE_ARN and calls save_job_result() the same way.

Cost Tracking and Estimation

Hybrid Jobs attributes all charges (QPU task fees and EC2 compute time) to a single job ARN, making per-experiment cost tracking straightforward in the AWS billing console.

Understanding the cost structure is essential for budgeting. Braket charges have two components: classical EC2 compute and quantum task fees. The quantum fees vary dramatically depending on the backend.

Here is a concrete cost calculation for a QAOA experiment on a 4-node graph:

# cost_estimation.py

def estimate_qaoa_cost(n_optimizer_steps, n_shots_per_eval, n_two_qubit_gates,
                       ec2_hours, use_spot=False):
    """Estimate the total cost of a QAOA Hybrid Job."""

    # --- SV1 Simulator Costs ---
    # SV1 charges per task (circuit execution), not per shot, for < 25 qubits.
    sv1_task_fee = 0.075  # USD per task
    sv1_total = n_optimizer_steps * sv1_task_fee

    # --- IonQ Aria Costs ---
    # IonQ charges per shot, with a per-gate component.
    # Approximate: $0.00097 per 2-qubit gate per shot
    ionq_per_shot_per_gate = 0.00097
    ionq_total = (n_optimizer_steps * n_shots_per_eval *
                  n_two_qubit_gates * ionq_per_shot_per_gate)

    # --- EC2 Costs ---
    ec2_ondemand_rate = 0.10  # ml.m5.large per hour
    ec2_spot_rate = 0.03      # ml.m5.large Spot per hour
    ec2_rate = ec2_spot_rate if use_spot else ec2_ondemand_rate
    ec2_total = ec2_hours * ec2_rate

    return {
        "sv1_quantum_cost": sv1_total,
        "ionq_quantum_cost": ionq_total,
        "ec2_cost_ondemand": ec2_hours * ec2_ondemand_rate,
        "ec2_cost_spot": ec2_hours * ec2_spot_rate,
        "total_sv1": sv1_total + ec2_total,
        "total_ionq": ionq_total + ec2_total,
    }

# Example: QAOA on 4-node graph, 30 steps, 512 shots, 20 two-qubit gates
costs = estimate_qaoa_cost(
    n_optimizer_steps=30,
    n_shots_per_eval=512,
    n_two_qubit_gates=20,
    ec2_hours=0.5,
    use_spot=True,
)

print("=== Cost Estimate: QAOA MaxCut (4 nodes, p=1) ===")
print(f"SV1 quantum fees:    ${costs['sv1_quantum_cost']:.2f}")
print(f"  (30 tasks x $0.075/task)")
print(f"IonQ quantum fees:   ${costs['ionq_quantum_cost']:.2f}")
print(f"  (30 steps x 512 shots x 20 gates x $0.00097/gate)")
print(f"EC2 on-demand:       ${costs['ec2_cost_ondemand']:.2f}")
print(f"EC2 Spot:            ${costs['ec2_cost_spot']:.2f}")
print(f"")
print(f"Total with SV1 + Spot EC2:  ${costs['total_sv1']:.2f}")
print(f"Total with IonQ + Spot EC2: ${costs['total_ionq']:.2f}")

Running this produces:

=== Cost Estimate: QAOA MaxCut (4 nodes, p=1) ===
SV1 quantum fees:    $2.25
  (30 tasks x $0.075/task)
IonQ quantum fees:   $298.00
  (30 steps x 512 shots x 20 gates x $0.00097/gate)
EC2 on-demand:       $0.05
EC2 Spot:            $0.01

Total with SV1 + Spot EC2:  $2.26
Total with IonQ + Spot EC2: $298.01

The takeaway: for algorithm development and testing, use SV1. The cost difference is two orders of magnitude. Reserve real QPU time for the final validation run or for studying hardware noise effects.

Spot instances reduce EC2 costs by 60-90% and are safe to use when the script implements checkpointing. A reclaimed instance restarts from the last checkpoint rather than from scratch.

Local Testing with LocalJob

Before spending QPU budget, validate the script locally using LocalJob:

from braket.jobs.local import LocalJob

local_job = LocalJob.create(
    device="local:braket/sv1",
    source_module="algorithm_script.py",
    entry_point="algorithm_script:main",
    job_name="qaoa-local-test",
)
print(local_job.result())

LocalJob requires no AWS credentials, runs synchronously, and prints logs to stdout. It sets the same environment variables that the managed runtime would set, so your script’s os.environ["AMZN_BRAKET_DEVICE_ARN"] calls work identically. Switch the device ARN to a real QPU only after the local run completes successfully.

You can also pass hyperparameters to LocalJob the same way:

local_job = LocalJob.create(
    device="local:braket/sv1",
    source_module="algorithm_script.py",
    entry_point="algorithm_script:main",
    job_name="qaoa-local-test",
    hyperparameters={
        "n_iterations": "10",
        "n_shots": "256",
        "edges": "0-1,1-2,2-0",
    },
)

Common Mistakes

Five pitfalls that catch nearly every new Hybrid Jobs user:

1. Forgetting to call save_job_result()

If your script completes without calling save_job_result(), the job transitions to COMPLETED but job.result() returns None. Always call save_job_result() at the end of your script, even if you only pass an empty dict:

# Always do this, even if your results are saved as files
save_job_result({})

Without this call, there is no signal in the job output that the script finished successfully. You lose the structured result dict entirely.

2. Hardcoding the device ARN

Writing AwsDevice("arn:aws:braket:::device/quantum-simulator/amazon/sv1") directly in the algorithm script works, but it ignores the device you pass to AwsQuantumJob.create(). If you later switch to a QPU, the script still uses SV1. Always read from the environment:

# Correct
device_arn = os.environ["AMZN_BRAKET_DEVICE_ARN"]
device = AwsDevice(device_arn)

# Incorrect: ignores the device configured in the job submission
device = AwsDevice("arn:aws:braket:::device/quantum-simulator/amazon/sv1")

3. Skipping local testing

Submitting directly to a QPU without first running LocalJob is a costly mistake. Common issues that LocalJob catches before they cost real money:

  • Import errors (missing dependencies in the container)
  • Logic bugs in the objective function
  • Incorrect environment variable names
  • Type errors in save_job_result() arguments (values must be JSON-serializable)

A one-minute local test can save hours of debugging and hundreds of dollars in QPU fees.

4. No checkpointing for long runs

Without checkpoints, a Spot instance reclamation or a job timeout (the maximum runtime is 336 hours, or 14 days) means all progress is lost. Even if you are not using Spot instances, network errors or service interruptions can terminate a job. Implement checkpointing from the start:

# Save after every iteration, not just at the end
for i in range(start_iter, n_iterations):
    energy = compute_energy(params, device, n_shots)
    params = update_params(params, energy)
    save_checkpoint(params, i + 1)  # Costs nothing, saves everything

5. Misunderstanding task fees vs. shot fees

On SV1, the 0.075feeappliespertask(percircuitsubmission),notpershot.Ataskwith1shotandataskwith10,000shotscostthesame0.075 fee applies per *task* (per circuit submission), not per shot. A task with 1 shot and a task with 10,000 shots cost the same 0.075 for circuits under 25 qubits. This means that algorithms submitting many small circuits (one shot each) pay drastically more than algorithms that batch shots into fewer tasks.

For example, computing a 4-term Hamiltonian expectation value by submitting 4 separate single-shot tasks costs 4 x 0.075=0.075 = 0.30. Computing the same thing with 4 tasks of 1,000 shots each costs the same $0.30 but gives you 250x more statistical accuracy per dollar. Structure your circuits to use the shots parameter rather than submitting many single-shot tasks.

On QPUs like IonQ, the pricing model is different (per-shot, per-gate), so the optimization strategy changes. Always check the current pricing page for your target device before designing your shot allocation strategy.

Was this tutorial helpful?