Skip to content

ezpz.pbs⚓︎

pbs.py

Contains helper functions for working with the PBS Pro scheduler @ ALCF

See: docs/pbs.md for more information.

build_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None) ⚓︎

Build the launch command for the current job.

Returns:

Name Type Description
str str

The launch command.

Source code in src/ezpz/pbs.py
def build_launch_cmd(
    ngpus: Optional[int] = None,
    nhosts: Optional[int] = None,
    ngpu_per_host: Optional[int] = None,
    hostfile: Optional[Union[str, Path, os.PathLike]] = None,
    cpu_bind: Optional[str] = None,
) -> str:
    """Build the launch command for the current job.

    Returns:
        str: The launch command.
    """
    from ezpz.configs import get_scheduler

    scheduler = get_scheduler().lower()
    if scheduler == "pbs":
        return get_pbs_launch_cmd(
            ngpus=ngpus,
            nhosts=nhosts,
            ngpu_per_host=ngpu_per_host,
            hostfile=hostfile,
            cpu_bind=cpu_bind,
        )

    elif scheduler == "slurm":
        import ezpz.slurm

        return ezpz.slurm.build_launch_cmd(
            ngpus=ngpus,
            nhosts=nhosts,
            ngpu_per_host=ngpu_per_host,
            hostfile=hostfile,
            cpu_bind=cpu_bind,
        )
    else:
        raise ValueError(f"Unsupported scheduler: {scheduler}")

get_pbs_env(hostfile=None, jobid=None, verbose=None) ⚓︎

Get the PBS environment variables

Source code in src/ezpz/pbs.py
def get_pbs_env(
    hostfile: Optional[Union[str, Path]] = None,
    jobid: Optional[Union[int, str]] = None,
    verbose: Optional[bool] = None,
) -> dict[str, str]:
    """Get the PBS environment variables"""
    from ezpz.configs import get_scheduler

    assert get_scheduler() == "PBS"
    pbsenv = {k: v for k, v in dict(os.environ).items() if "PBS" in k}
    if hostfile is None:
        hostfile = os.environ.get("PBS_NODEFILE")
    if hostfile is None:
        hostfile = get_pbs_nodefile(jobid=jobid)

    assert hostfile is not None
    if (hfp := Path(hostfile)).is_file():
        pbsenv |= {
            f"{k.upper()}": f"{v}" for k, v in get_pbs_launch_info(hfp).items()
        }
        pbsenv |= {"LAUNCH_CMD": get_pbs_launch_cmd(hostfile=hostfile)}
    os.environ |= pbsenv
    if verbose and ezpz.distributed.get_rank() == 0:
        ezpz.distributed.log_dict_as_bulleted_list(pbsenv, name="pbsenv")
    return pbsenv

get_pbs_jobid_of_active_job() ⚓︎

Get the jobid of the currently active job.

Short-circuits on $PBS_JOBID if set, which is the case in any job spawned via qsub (PBS exports it into the script's environment). Falling back to qstat is only needed when this function is called outside of a job context — and that fallback requires the qstat binary on PATH, which is NOT available on compute nodes on systems like Aurora where PBS server binaries live under /opt/pbs/bin on login nodes only. Without this short-circuit, ezpz launch raises ImportError: cannot import name 'qstat' from 'sh' from any compute-node Python subprocess.

.. note:: When $PBS_NODEFILE is also set and readable, the fast path cross-checks the local hostname against it before trusting $PBS_JOBID. This catches the (rare) case where a user manually exports PBS_JOBID=<some-other-job> for backwards compatibility or runs ezpz from an ssh-into-a-compute-node shell of a different job. If the hostname is missing from the nodefile we log a warning and fall through to the qstat lookup. When $PBS_NODEFILE is absent (e.g. compute-node Python subprocesses that don't inherit the full PBS env) we skip the check and trust $PBS_JOBID — that's the original motivating case.

Source code in src/ezpz/pbs.py
def get_pbs_jobid_of_active_job() -> str | None:
    """Get the jobid of the currently active job.

    Short-circuits on ``$PBS_JOBID`` if set, which is the case in any
    job spawned via ``qsub`` (PBS exports it into the script's
    environment). Falling back to ``qstat`` is only needed when this
    function is called outside of a job context — and that fallback
    requires the ``qstat`` binary on ``PATH``, which is NOT available
    on compute nodes on systems like Aurora where PBS server binaries
    live under ``/opt/pbs/bin`` on login nodes only. Without this
    short-circuit, ``ezpz launch`` raises
    ``ImportError: cannot import name 'qstat' from 'sh'`` from any
    compute-node Python subprocess.

    .. note::
       When ``$PBS_NODEFILE`` is *also* set and readable, the fast
       path cross-checks the local hostname against it before
       trusting ``$PBS_JOBID``. This catches the (rare) case where
       a user manually exports ``PBS_JOBID=<some-other-job>`` for
       backwards compatibility or runs ``ezpz`` from an
       ``ssh``-into-a-compute-node shell of a different job. If the
       hostname is missing from the nodefile we log a warning and
       fall through to the ``qstat`` lookup. When ``$PBS_NODEFILE``
       is absent (e.g. compute-node Python subprocesses that don't
       inherit the full PBS env) we skip the check and trust
       ``$PBS_JOBID`` — that's the original motivating case.
    """
    # Fast path: $PBS_JOBID is set by qsub and propagated by
    # mpiexec --envall — skip the qstat shell-out entirely.
    pbs_jobid = os.environ.get("PBS_JOBID")
    if pbs_jobid:
        # PBS_JOBID is of the form "12345.aurora-pbs-0001..." — strip
        # the server suffix to match the rest of ezpz's jobid handling.
        short = pbs_jobid.split(".")[0]

        # Sanity check: when PBS_NODEFILE is readable, verify our
        # hostname is actually in it. If not, $PBS_JOBID is lying
        # (manual override, stale env, etc.) and we should fall
        # through to the qstat-based lookup. Skip silently when
        # PBS_NODEFILE is absent — that's the compute-node-subprocess
        # case the fast path was built for.
        #
        # IMPORTANT: PBS_NODEFILE on Aurora contains FQDNs like
        # `x4114c1s7b0n0.hostmgmt2.cm.aurora.alcf.anl.gov` while
        # `socket.getfqdn()` returns the SHORT name `x4114c1s7b0n0` on
        # compute nodes (because reverse DNS doesn't include the
        # hostmgmt suffix from inside a job). Compare on short names
        # only — strip the first `.`-separated segment from BOTH
        # sides before membership check.
        pbs_nodefile = os.environ.get("PBS_NODEFILE")
        if pbs_nodefile:
            try:
                raw_nodes = Path(pbs_nodefile).read_text().split()
            except OSError:
                raw_nodes = None
            if raw_nodes is not None:
                nodes_short = {n.split(".", 1)[0] for n in raw_nodes}
                local = socket.getfqdn().split(".", 1)[0]
                if local not in nodes_short:
                    # Demoted from warning to debug: on 96-rank jobs
                    # this previously produced 96 identical lines of
                    # yellow output, and the function self-recovers
                    # via qstat. Keep the message so it's visible
                    # under EZPZ_LOG_LEVEL=debug when actually
                    # debugging a stale-jobid case.
                    logger.debug(
                        "$PBS_JOBID=%s but local hostname %s is not "
                        "in $PBS_NODEFILE=%s; falling through to qstat.",
                        pbs_jobid,
                        local,
                        pbs_nodefile,
                    )
                else:
                    return short
            else:
                return short
        else:
            return short

    # Slow path: walk the user's running jobs and match by hostname.
    # Requires the qstat binary on PATH, which is NOT installed on
    # compute nodes on Aurora/Sunspot/Polaris — keep this confined to
    # login-node callers (e.g. `ezpz doctor`).
    #
    #    ```python
    #    jobs = {
    #        jobid_A: [host_A0, host_A1, host_A2, ..., host_AN],
    #        jobid_B: [host_B0, host_B0, host_B2, ..., host_BN],
    #        ...,
    #    }
    #    ```
    #
    # Loop over {jobid, [hosts]} dictionary. At each iteration, look
    # and see if _our_ `hostname` is anywhere in the `[hosts]` list.
    # If so, then we know that we are currently participating in the
    # `jobid` of that entry.
    jobs = get_pbs_running_jobs_for_user()
    for jobid, nodelist in jobs.items():
        # NOTE:
        # - `socket.fqdn()` (fully qualified domain name):
        #   - This will be of the form `x[0-9]+.cm.aurora.alcf.anl.gov`
        #     We only need the part before the first '.'
        if socket.getfqdn().split(".")[0] in nodelist:
            return jobid
    return None

get_pbs_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None, *, verbose=False) ⚓︎

Get the PBS launch command.

Parameters⚓︎

ngpus : int, optional Total number of GPUs to use. If None, inferred from other args or max. nhosts : int, optional Number of hosts (nodes). If None, defaults to max available. ngpu_per_host : int, optional GPUs per host. If None, inferred when possible. hostfile : path-like, optional Hostfile to use. If None, uses a fallback from get_hostfile_with_fallback. verbose : bool, keyword-only If True, log more and pass --verbose (where applicable).

Source code in src/ezpz/pbs.py
def get_pbs_launch_cmd(
    ngpus: Optional[int] = None,
    nhosts: Optional[int] = None,
    ngpu_per_host: Optional[int] = None,
    hostfile: Optional[Pathish] = None,
    cpu_bind: Optional[str] = None,
    *,
    verbose: bool = False,
) -> str:
    """Get the PBS launch command.

    Parameters
    ----------
    ngpus : int, optional
        Total number of GPUs to use. If None, inferred from other args or max.
    nhosts : int, optional
        Number of hosts (nodes). If None, defaults to max available.
    ngpu_per_host : int, optional
        GPUs per host. If None, inferred when possible.
    hostfile : path-like, optional
        Hostfile to use. If None, uses a fallback from `get_hostfile_with_fallback`.
    verbose : bool, keyword-only
        If True, log more and pass `--verbose` (where applicable).
    """

    hostfile = hostfile or get_hostfile_with_fallback(hostfile)
    hostfile_path = Path(hostfile)

    ngpus, nhosts, ngpu_per_host = _infer_topology(
        ngpus=ngpus,
        nhosts=nhosts,
        ngpu_per_host=ngpu_per_host,
        hostfile=hostfile_path,
    )

    ngpus_max = ezpz.get_world_size(total=True)
    emoji = "✅" if ngpus == ngpus_max else "⚠️"
    logger.info(
        f"{emoji} Using [{ngpus}/{ngpus_max}] GPUs "
        f"[{nhosts} hosts] x [{ngpu_per_host} GPU/host]"
    )

    if ngpus != ngpus_max:
        logger.warning(
            f"[🚧 WARNING] Using only [{ngpus}/{ngpus_max}] available GPUs!!"
        )

    machine_name = ezpz.get_machine().lower()
    hostfile_str = str(hostfile_path)

    if machine_name == "sophia":
        # mpirun style
        cmd_list = [
            "mpirun",
            f"-n={ngpus}",
            f"-N={ngpu_per_host}",
            f"--hostfile={hostfile_str}",
            "-x PATH",
            "-x LD_LIBRARY_PATH",
        ]
        if verbose:
            cmd_list.append("--verbose")
        return " ".join(cmd_list)

    # Default: mpiexec
    cmd_list = [
        "mpiexec",
        "--envall",
        "--line-buffer",
        f"--np={ngpus}",
        f"--ppn={ngpu_per_host}",
        f"--hostfile={hostfile_str}",
    ]
    if verbose:
        cmd_list.append("--verbose")

    cpu_bind_env = os.environ.get("CPU_BIND")
    cpu_bind_cli = (
        _normalize_cpu_bind_value(cpu_bind)
        if cpu_bind is not None and cpu_bind.strip()
        else None
    )
    cpu_bind_env_value = (
        _normalize_cpu_bind_value(cpu_bind_env)
        if cpu_bind_env is not None and cpu_bind_env.strip()
        else None
    )
    cpu_bind_prefix = (
        "--cpu-bind=verbose," if verbose else "--cpu-bind="
    )
    selected_cpu_bind = cpu_bind_cli or cpu_bind_env_value

    if selected_cpu_bind:
        if cpu_bind_cli is not None:
            logger.info("Using cpu bind from --cpu-bind: %s", cpu_bind_cli)
        else:
            logger.warning(
                "Detected CPU_BIND from environment: %s", cpu_bind_env
            )
        cmd_list.append(f"{cpu_bind_prefix}{selected_cpu_bind}")
    else:
        is_intel_xpu_machine = machine_name in {"aurora", "sunspot"}
        if is_intel_xpu_machine:
            # `--no-vni` was previously auto-added here for a
            # transient network-interface workaround; dropped as of
            # v0.18.x. Pass via launcher passthrough or set a custom
            # CPU_BIND if you still need it.
            cmd_list.append(f"{cpu_bind_prefix}{_CPU_BIND_AURORA}")
        else:
            # generic CPU binding
            cmd_list.extend(["--cpu-bind=depth", "--depth=8"])

    return " ".join(cmd_list)

get_pbs_launch_info(hostfile=None, jobid=None) ⚓︎

Get the PBS launch info

Source code in src/ezpz/pbs.py
def get_pbs_launch_info(
    hostfile: Optional[str | Path] = None,
    jobid: Optional[int | str] = None,
) -> dict[str, str]:
    """Get the PBS launch info"""
    from ezpz.configs import get_scheduler

    assert get_scheduler() == "PBS"
    if hostfile is None:
        hostfile = get_pbs_nodefile(jobid=jobid)
    assert hostfile is not None
    hfp = Path(hostfile)
    hosts = ezpz.distributed.get_nodes_from_hostfile(hfp)
    hosts = [h.split(".")[0] for h in hosts]
    nhosts = len(hosts)
    ngpu_per_host = ezpz.distributed.get_gpus_per_node()
    ngpus_available = ezpz.distributed.get_world_size(total=True)
    ngpus = nhosts * ngpu_per_host
    world_size_total = ezpz.distributed.get_world_size_total()
    launch_cmd = get_pbs_launch_cmd(hostfile=hostfile)
    return {
        "HOSTFILE": hfp.as_posix(),
        "HOSTS": (
            f"[{', '.join(hosts)}]"
            if nhosts < 1000
            else "[truncated (>1000 nodes)]"
        ),
        "NHOSTS": f"{nhosts}",
        "NGPU_PER_HOST": f"{ngpu_per_host}",
        "NGPUS": f"{ngpus}",
        "NGPUS_AVAILABLE": f"{ngpus_available}",
        "MACHINE": ezpz.distributed.get_machine(),
        "DEVICE": ezpz.distributed.get_torch_device_type(),
        "BACKEND": ezpz.distributed.get_torch_backend(),
        "LAUNCH_CMD": launch_cmd,
        "world_size_total": f"{world_size_total}",
    }

get_pbs_nodefile(jobid=None) ⚓︎

Get the nodefile for a given jobid.

Parameters:

Name Type Description Default
jobid int | str

The jobid to get the nodefile for. Defaults to None.

None

Returns:

Name Type Description
str str | None

The path to the nodefile.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile(jobid: Optional[int | str] = None) -> str | None:
    """Get the nodefile for a given jobid.

    Args:
        jobid (int | str, optional): The jobid to get the nodefile for. Defaults to None.

    Returns:
        str: The path to the nodefile.
    """
    if jobid is None:
        jobid = get_pbs_jobid_of_active_job()
    if jobid is None:
        logger.warning("No active job found.")
        return None
    return get_pbs_nodefile_from_jobid(jobid)

get_pbs_nodefile_from_jobid(jobid) ⚓︎

Get the nodefile for a given jobid.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile_from_jobid(jobid: int | str) -> str:
    """Get the nodefile for a given jobid."""
    assert jobid is not None, "No jobid provided and no active job found."

    pbs_parent = Path("/var/spool/pbs/aux")
    pfiles = [
        Path(pbs_parent).joinpath(f)
        for f in os.listdir(pbs_parent)
        if str(jobid) in f
    ]
    assert len(pfiles) == 1, (
        f"Found {len(pfiles)} files matching {jobid} in {pbs_parent}"
    )
    pbs_nodefile = pfiles[0]
    assert pbs_nodefile.is_file(), f"Nodefile {pbs_nodefile} does not exist."
    return pbs_nodefile.absolute().resolve().as_posix()

get_pbs_nodefile_of_active_job() ⚓︎

Get the nodefile for the currently active job.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile_of_active_job() -> str | None:
    """Get the nodefile for the currently active job."""
    jobid = get_pbs_jobid_of_active_job()
    return None if jobid is None else get_pbs_nodefile_from_jobid(jobid)

get_pbs_nodelist_from_jobid(jobid) ⚓︎

Get the nodelist for a given jobid.

Source code in src/ezpz/pbs.py
def get_pbs_nodelist_from_jobid(jobid: int | str) -> list[str]:
    """Get the nodelist for a given jobid."""
    assert jobid is not None, "No jobid provided and no active job found."
    jobs = get_pbs_running_jobs_for_user()
    assert str(jobid) in jobs, (
        f"Job ID {jobid} not found in running jobs for user {getuser()}"
    )
    return jobs[str(jobid)]

get_pbs_running_jobs_for_user() ⚓︎

Get all running jobs for the current user.

Results are cached for up to 30 s to avoid redundant qstat calls during a single launch sequence. Only rank 0 runs qstat; other ranks use the cached result.

Source code in src/ezpz/pbs.py
def get_pbs_running_jobs_for_user() -> dict[str, list[str]]:
    """Get all running jobs for the current user.

    Results are cached for up to 30 s to avoid redundant qstat calls
    during a single launch sequence.  Only rank 0 runs qstat; other
    ranks use the cached result.
    """
    global _pbs_jobs_cache
    if _pbs_jobs_cache is not None:
        ts, cached = _pbs_jobs_cache
        if time.monotonic() - ts < _PBS_JOBS_CACHE_TTL:
            return cached

    try:
        from sh import qstat  # type:ignore
    except Exception as e:
        logger.debug("Error importing sh.qstat: %s", e)
        raise

    output = _run_qstat_with_retry(qstat, f"-fn1wru {getuser()}")
    jobarr = [
        i for i in output.split("\n") if " R " in i
    ]
    jobs: dict[str, list[str]] = {}
    for row in jobarr:
        jstr = [i for i in row.split(" ") if len(i) > 0]
        jobid = jstr[0].split(".")[0]
        nodelist = [h.split("/")[0] for h in jstr[-1].split("+")]
        jobs[jobid] = nodelist

    _pbs_jobs_cache = (time.monotonic(), jobs)
    return jobs

get_running_jobs_from_qstat() ⚓︎

Get the running jobs from qstat

Source code in src/ezpz/pbs.py
def get_running_jobs_from_qstat() -> list[int]:
    """Get the running jobs from qstat"""
    try:
        from sh import qstat as shqstat  # type: ignore
    except Exception as e:
        raise e
    output = _run_qstat_with_retry(shqstat, "-u", os.environ.get("USER"))
    return [
        int(i.split(".")[0])
        for i in output.split("\n")[2:-1]
        if " R " in i
    ]