Skip to content

πŸ§‘β€πŸ³ Recipesβš“οΈŽ

Short, copy-pasteable patterns for common ezpz tasks. For a full walkthrough with progressive examples, see the Distributed Training Guide.

FSDP Trainingβš“οΈŽ

Set up FSDP with a single flag change from DDP.

import torch
import ezpz

rank = ezpz.setup_torch()
model = torch.nn.Linear(32, 16).to(ezpz.get_torch_device())
model = ezpz.wrap_model(model, use_fsdp=True)  # use_fsdp=False for DDP
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
print(f"[rank {rank}] model wrapped, optimizer ready")
ezpz.cleanup()
$ ezpz launch -np 2 -- python3 recipe_fsdp.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/launch] mpirun -np 2 python3 recipe_fsdp.py
[I][ezpz/launch] Execution started...
Using [2 / 2] available "cpu" devices !!
FSDP is not supported on cpu devices; falling back to DDP.
[rank 0] model wrapped, optimizer ready
[rank 1] model wrapped, optimizer ready
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 2.97 seconds.
$ ezpz launch -np 8 -- python3 recipe_fsdp.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/pbs] βœ… Using [8/8] GPUs [2 hosts] x [4 GPU/host]
[I][ezpz/launch] mpiexec --np=8 --ppn=4 python3 recipe_fsdp.py
[I][ezpz/launch] Execution started...
Using [8 / 8] available "cuda" devices !!
[rank 0] model wrapped, optimizer ready
[rank 1] model wrapped, optimizer ready
[rank 2] model wrapped, optimizer ready
[rank 3] model wrapped, optimizer ready
[rank 4] model wrapped, optimizer ready
[rank 5] model wrapped, optimizer ready
[rank 6] model wrapped, optimizer ready
[rank 7] model wrapped, optimizer ready
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 9.49 seconds.

W&B Loggingβš“οΈŽ

History dispatches metrics to Weights & Biases by default (via the EZPZ_TRACKER_BACKENDS env var, which defaults to wandb).

import ezpz

rank = ezpz.setup_torch()

history = ezpz.History(
    project_name="ezpz-wandb-recipe",
    # backends defaults to EZPZ_TRACKER_BACKENDS env var, then "wandb"
)
num_steps = 10
for step in range(num_steps):
    loss_val = 1.0 / (step + 1)
    lr_val = 1e-3
    history.update({"loss": loss_val, "lr": lr_val}, step=step)

if rank == 0:
    history.finalize(outdir="wandb-recipe-outputs", plot=False)
$ ezpz launch -np 2 -- python3 recipe_wandb.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/launch] mpirun -np 2 python3 recipe_wandb.py
[I][ezpz/launch] Execution started...
Using [2 / 2] available "mps" devices !!
[I][ezpz/history] Using History with distributed_history=True
[I][utils] Saving dataset to: ./outputs/dataset_dataset.nc
[I][ezpz/history] Saving history report to ./outputs/report.md
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 2.76 seconds.
$ ezpz launch -np 8 -- python3 recipe_wandb.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/pbs] βœ… Using [8/8] GPUs [2 hosts] x [4 GPU/host]
[I][ezpz/launch] mpiexec --np=8 --ppn=4 python3 recipe_wandb.py
[I][ezpz/launch] Execution started...
Using [8 / 8] available "cuda" devices !!
[I][ezpz/history] Using History with distributed_history=True
[I][utils] Saving dataset to: wandb-recipe-outputs/dataset_dataset.h5
[I][ezpz/history] Saving history report to wandb-recipe-outputs/report.md
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 9.80 seconds.

MLflow Trackingβš“οΈŽ

Same as W&B β€” just change the backend. Or use both at once.

# MLflow only
EZPZ_TRACKER_BACKENDS=mlflow ezpz launch -np 2 -- python3 recipe_wandb.py

# Both W&B and MLflow
EZPZ_TRACKER_BACKENDS=wandb,mlflow ezpz launch -np 2 -- python3 recipe_wandb.py

No code changes needed β€” when backends= is not passed explicitly, History reads the EZPZ_TRACKER_BACKENDS env var automatically (defaulting to wandb). Set your tracking server in ~/.amsc.env or the project .env:

~/.amsc.env
AMSC_API_KEY=your-api-key
MLFLOW_TRACKING_URI=https://mlflow.american-science-cloud.org
MLFLOW_TRACKING_INSECURE_TLS=true

See Experiment Tracking > MLflow for the full setup guide.

Custom Hostfileβš“οΈŽ

Use --hostfile with ezpz launch to target specific nodes.

# hostfile.txt β€” one hostname per line, with slots
# node01 slots=4
# node02 slots=4

ezpz launch --hostfile hostfile.txt -- python3 train.py

Forcing a Specific Device/Backendβš“οΈŽ

Override auto-detection with TORCH_DEVICE and TORCH_BACKEND environment variables.

# Force CPU + gloo (useful for debugging on a GPU node)
TORCH_DEVICE=cpu TORCH_BACKEND=gloo ezpz launch -np 2 -- python3 train.py

# Force XPU + xccl on Intel GPUs
TORCH_DEVICE=xpu TORCH_BACKEND=xccl ezpz launch -np 4 -- python3 train.py

Timing with ezpz.synchronize()βš“οΈŽ

Use ezpz.synchronize() for correct cross-backend timing that works on CUDA, XPU, MPS, and CPU.

import time
import torch
import ezpz

rank = ezpz.setup_torch()
model = torch.nn.Linear(32, 16).to(ezpz.get_torch_device())
batch = torch.randn(8, 32, device=ezpz.get_torch_device())

ezpz.synchronize()
t0 = time.perf_counter()

output = model(batch)
loss = output.sum()
loss.backward()

ezpz.synchronize()
dt = time.perf_counter() - t0
print(f"[rank {rank}] step time: {dt:.4f}s")
ezpz.cleanup()
$ ezpz launch -np 2 -- python3 recipe_timing.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/launch] mpirun -np 2 python3 recipe_timing.py
[I][ezpz/launch] Execution started...
Using [2 / 2] available "mps" devices !!
[rank 1] step time: 0.2058s
[rank 0] step time: 0.3189s
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 2.41 seconds.
$ ezpz launch -np 8 -- python3 recipe_timing.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/pbs] βœ… Using [8/8] GPUs [2 hosts] x [4 GPU/host]
[I][ezpz/launch] mpiexec --np=8 --ppn=4 python3 recipe_timing.py
[I][ezpz/launch] Execution started...
Using [8 / 8] available "cuda" devices !!
[rank 0] step time: 0.0988s
[rank 1] step time: 0.0893s
[rank 2] step time: 0.0907s
[rank 3] step time: 0.0957s
[rank 4] step time: 0.0866s
[rank 5] step time: 0.0912s
[rank 6] step time: 0.0838s
[rank 7] step time: 0.0910s
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 7.11 seconds.

Multi-Node Launch Patternsβš“οΈŽ

Use -np (total processes), -ppn (processes per node), and --nhosts to control placement.

# 4 GPUs on a single node
ezpz launch -np 4 -- python3 train.py

# 2 nodes, 4 GPUs each (8 total)
ezpz launch -np 8 -ppn 4 --nhosts 2 -- python3 train.py

# Pass extra env vars to workers
ezpz launch -np 8 -ppn 4 -x NCCL_DEBUG=INFO -- python3 train.py

Disabling Distributed Historyβš“οΈŽ

Set EZPZ_NO_DISTRIBUTED_HISTORY=1 to skip cross-rank metric aggregation on large runs where the all-gather overhead is noticeable.

# Disable distributed history aggregation (auto-disabled above 384 ranks)
EZPZ_NO_DISTRIBUTED_HISTORY=1 ezpz launch -np 512 -- python3 train.py
import ezpz

rank = ezpz.setup_torch()
history = ezpz.History(distributed_history=False)

for step in range(5):
    history.update({"loss": 1.0 / (step + 1)}, step=step)

print(f"[rank {rank}] distributed_history={history.distributed_history}")
ezpz.cleanup()
$ ezpz launch -np 2 -- python3 recipe_no_dist_history.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/launch] mpirun -np 2 python3 recipe_no_dist_history.py
[I][ezpz/launch] Execution started...
Using [2 / 2] available "mps" devices !!
[I][ezpz/history] Using History with distributed_history=False
[rank 0] distributed_history=False
[rank 1] distributed_history=False
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 2.95 seconds.
$ ezpz launch -np 8 -- python3 recipe_no_dist_history.py
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][started]----
[I][ezpz/pbs] βœ… Using [8/8] GPUs [2 hosts] x [4 GPU/host]
[I][ezpz/launch] mpiexec --np=8 --ppn=4 python3 recipe_no_dist_history.py
[I][ezpz/launch] Execution started...
Using [8 / 8] available "cuda" devices !!
[I][ezpz/history] Using History with distributed_history=False
[rank 0] distributed_history=False
[rank 1] distributed_history=False
[rank 2] distributed_history=False
[rank 3] distributed_history=False
[rank 4] distributed_history=False
[rank 5] distributed_history=False
[rank 6] distributed_history=False
[rank 7] distributed_history=False
[I][ezpz/launch] ----[πŸ‹ ezpz.launch][stop]----
[I][ezpz/launch] Execution finished with 0.
[I][ezpz/launch] Executing finished in 8.38 seconds.

Distributed Data Loadingβš“οΈŽ

Use DistributedSampler to shard data across ranks. The key detail: call sampler.set_epoch(epoch) before each epoch so every rank gets a different shuffle.

recipe_dataloader.py
import torch
from torch.utils.data import DataLoader, DistributedSampler
import ezpz

rank = ezpz.setup_torch()
device = ezpz.get_torch_device()

# Any standard torch Dataset works
dataset = torch.utils.data.TensorDataset(
    torch.randn(1000, 32),  # inputs
    torch.randint(0, 10, (1000,)),  # labels
)

sampler = DistributedSampler(dataset) if ezpz.get_world_size() > 1 else None

dataloader = DataLoader(
    dataset,
    batch_size=64,
    sampler=sampler,
    shuffle=(sampler is None),  # don't shuffle when using sampler
    drop_last=True,  # consistent batch size across ranks
)

model = torch.nn.Linear(32, 10).to(device)
model = ezpz.wrap_model(model)
optimizer = torch.optim.Adam(model.parameters())

for epoch in range(3):
    if sampler is not None:
        sampler.set_epoch(epoch)  # re-shuffle each epoch
    for batch_inputs, batch_labels in dataloader:
        batch_inputs = batch_inputs.to(device)
        batch_labels = batch_labels.to(device)
        loss = torch.nn.functional.cross_entropy(model(batch_inputs), batch_labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    print(f"[rank {rank}] epoch {epoch} done")

ezpz.cleanup()

When to skip the sampler

For single-process runs (world_size == 1), a sampler isn't needed. The if ezpz.get_world_size() > 1 guard keeps the same code working on a laptop and a cluster.

Checkpointingβš“οΈŽ

Save and load model checkpoints that work across DDP and FSDP.

DDP Checkpointingβš“οΈŽ

With DDP, every rank holds a full copy, so save from rank 0:

recipe_checkpoint_ddp.py
import torch
import ezpz

rank = ezpz.setup_torch()
device = ezpz.get_torch_device()

model = torch.nn.Linear(32, 10).to(device)
model = ezpz.wrap_model(model, use_fsdp=False)
optimizer = torch.optim.Adam(model.parameters())

# ... training loop ...

# Save (rank 0 only)
if rank == 0:
    torch.save({
        "model": model.module.state_dict(),  # unwrap DDP
        "optimizer": optimizer.state_dict(),
        "epoch": 10,
    }, "checkpoint.pt")

# Load (all ranks)
torch.distributed.barrier()
ckpt = torch.load("checkpoint.pt", map_location=device, weights_only=True)
model.module.load_state_dict(ckpt["model"])
optimizer.load_state_dict(ckpt["optimizer"])

ezpz.cleanup()

FSDP Checkpointingβš“οΈŽ

FSDP shards parameters, so you need to gather the full state dict first:

recipe_checkpoint_fsdp.py
import torch
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP, StateDictType
import ezpz

rank = ezpz.setup_torch()
device = ezpz.get_torch_device()

model = torch.nn.Linear(32, 10).to(device)
model = ezpz.wrap_model(model)  # FSDP by default
optimizer = torch.optim.Adam(model.parameters())

# ... training loop ...

# Save β€” gather full state dict, then save from rank 0
with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT):
    state = model.state_dict()
    if rank == 0:
        torch.save(state, "checkpoint_fsdp.pt")

# Load β€” load into full state dict, then scatter back
with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT):
    state = torch.load("checkpoint_fsdp.pt", map_location="cpu", weights_only=True)
    model.load_state_dict(state)

ezpz.cleanup()

FSDP2 (PyTorch 2.4+)

If you're using FSDP2, use torch.distributed.checkpoint instead of FSDP.state_dict_type(). See the PyTorch docs.

Gradient Accumulationβš“οΈŽ

Accumulate gradients over multiple micro-batches before stepping. This effectively increases batch size without increasing per-GPU memory.

recipe_grad_accum.py
import torch
import ezpz

rank = ezpz.setup_torch()
device = ezpz.get_torch_device()

model = torch.nn.Linear(32, 10).to(device)
model = ezpz.wrap_model(model)
optimizer = torch.optim.Adam(model.parameters())

accum_steps = 4  # accumulate over 4 micro-batches
effective_batch_size = 16 * accum_steps  # = 64

for step in range(100):
    optimizer.zero_grad()
    for micro_step in range(accum_steps):
        x = torch.randn(16, 32, device=device)
        loss = model(x).sum() / accum_steps  # scale loss
        loss.backward()  # gradients accumulate
    optimizer.step()
    if step % 10 == 0:
        print(f"[rank {rank}] step {step}, loss={loss.item() * accum_steps:.4f}")

ezpz.cleanup()

With FSDP: use no_sync()

FSDP synchronizes gradients on every backward() by default. Wrap the accumulation micro-steps in model.no_sync() to defer the all-reduce until the final micro-step:

from contextlib import nullcontext

for micro_step in range(accum_steps):
    ctx = model.no_sync() if micro_step < accum_steps - 1 else nullcontext()
    with ctx:
        loss = model(x).sum() / accum_steps
        loss.backward()
optimizer.step()

MFU Trackingβš“οΈŽ

Track Model FLOPS Utilization β€” what fraction of the hardware's peak compute your model actually uses.

recipe_mfu.py
import time
import torch
import ezpz
from ezpz.flops import try_estimate, compute_mfu

rank = ezpz.setup_torch()
device = ezpz.get_torch_device()

model = torch.nn.Linear(4096, 4096).to(device)

# Count FLOPS once before wrapping (FSDP/DDP breaks FlopCounterMode).
# try_estimate is the recommended wrapper β€” it handles errors and
# logs the FLOPS count on rank 0.
model_flops = try_estimate(model, input_shape=(32, 4096))

model = ezpz.wrap_model(model)
optimizer = torch.optim.Adam(model.parameters())

for step in range(100):
    ezpz.synchronize()
    t0 = time.perf_counter()
    x = torch.randn(32, 4096, device=device)
    loss = model(x).sum()
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()
    ezpz.synchronize()
    dt = time.perf_counter() - t0

    mfu = compute_mfu(model_flops, dt)  # per-device MFU
    if step % 10 == 0 and rank == 0:
        print(f"step={step} loss={loss.item():.4f} mfu={mfu:.2f}%")

ezpz.cleanup()

compute_mfu returns per-device MFU β€” both model_flops and peak_flops are per-device quantities, so world_size isn't needed. The device's peak FLOPS is auto-detected. Supported accelerators:

  • NVIDIA: A100, H100 (SXM/NVL/PCIe), H200, B200, L40S
  • AMD: MI250X, MI300X, MI325X, MI355X
  • Intel: Data Center GPU Max 1550 (PVC, computed dynamically)

For unrecognized devices compute_mfu returns 0.0 (with a warning).

When try_estimate is not enough

The recipe above measures FLOPS once at startup with a fixed (batch_size, seq_len) shape and re-uses that count for every step. This is fine for the recipe (a fixed-shape nn.Linear workload) but breaks for two important cases:

  1. Variable sequence length β€” attention is O(seqΒ²) but the startup count is for a single shape; a batch of shorter sequences will report MFU that's too high, longer sequences too low.
  2. Autoregressive generation β€” multiplying by n_new_tokens ignores KV-cache savings (over-counts) and growing context (under-counts). In practice this routinely produces MFU values >100%.

For these cases, measure exact FLOPS per step via FlopCounterMode. See ezpz.examples.inference's --flops flag for the pattern: opt-in measurement, ~15-40% per-step overhead, optional --flops-every-n-steps N to amortize the cost.

When to use MFU

MFU measures compute efficiency, not communication efficiency. Low MFU can mean:

  • Memory-bound model β€” the model doesn't have enough compute per byte of data movement (e.g. small batch size)
  • Communication overhead β€” gradient all-reduce takes too long (try FSDP or reduce world size)
  • Kernel launch overhead β€” too many small ops (try torch.compile)