ezpz.pbs⚓︎
- See ezpz/
pbs.py
pbs.py
Contains helper functions for working with the PBS Pro scheduler @ ALCF
See: docs/pbs.md for more information.
build_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None)
⚓︎
Build the launch command for the current job.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The launch command. |
Source code in src/ezpz/pbs.py
def build_launch_cmd(
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[Union[str, Path, os.PathLike]] = None,
cpu_bind: Optional[str] = None,
) -> str:
"""Build the launch command for the current job.
Returns:
str: The launch command.
"""
from ezpz.configs import get_scheduler
scheduler = get_scheduler().lower()
if scheduler == "pbs":
return get_pbs_launch_cmd(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile,
cpu_bind=cpu_bind,
)
elif scheduler == "slurm":
import ezpz.slurm
return ezpz.slurm.build_launch_cmd(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile,
cpu_bind=cpu_bind,
)
else:
raise ValueError(f"Unsupported scheduler: {scheduler}")
get_pbs_env(hostfile=None, jobid=None, verbose=None)
⚓︎
Get the PBS environment variables
Source code in src/ezpz/pbs.py
def get_pbs_env(
hostfile: Optional[Union[str, Path]] = None,
jobid: Optional[Union[int, str]] = None,
verbose: Optional[bool] = None,
) -> dict[str, str]:
"""Get the PBS environment variables"""
from ezpz.configs import get_scheduler
assert get_scheduler() == "PBS"
pbsenv = {k: v for k, v in dict(os.environ).items() if "PBS" in k}
if hostfile is None:
hostfile = os.environ.get("PBS_NODEFILE")
if hostfile is None:
hostfile = get_pbs_nodefile(jobid=jobid)
assert hostfile is not None
if (hfp := Path(hostfile)).is_file():
pbsenv |= {
f"{k.upper()}": f"{v}" for k, v in get_pbs_launch_info(hfp).items()
}
pbsenv |= {"LAUNCH_CMD": get_pbs_launch_cmd(hostfile=hostfile)}
os.environ |= pbsenv
if verbose and ezpz.distributed.get_rank() == 0:
ezpz.distributed.log_dict_as_bulleted_list(pbsenv, name="pbsenv")
return pbsenv
get_pbs_jobid_of_active_job()
⚓︎
Get the jobid of the currently active job.
Short-circuits on $PBS_JOBID if set, which is the case in any
job spawned via qsub (PBS exports it into the script's
environment). Falling back to qstat is only needed when this
function is called outside of a job context — and that fallback
requires the qstat binary on PATH, which is NOT available
on compute nodes on systems like Aurora where PBS server binaries
live under /opt/pbs/bin on login nodes only. Without this
short-circuit, ezpz launch raises
ImportError: cannot import name 'qstat' from 'sh' from any
compute-node Python subprocess.
.. note::
When $PBS_NODEFILE is also set and readable, the fast
path cross-checks the local hostname against it before
trusting $PBS_JOBID. This catches the (rare) case where
a user manually exports PBS_JOBID=<some-other-job> for
backwards compatibility or runs ezpz from an
ssh-into-a-compute-node shell of a different job. If the
hostname is missing from the nodefile we log a warning and
fall through to the qstat lookup. When $PBS_NODEFILE
is absent (e.g. compute-node Python subprocesses that don't
inherit the full PBS env) we skip the check and trust
$PBS_JOBID — that's the original motivating case.
Source code in src/ezpz/pbs.py
def get_pbs_jobid_of_active_job() -> str | None:
"""Get the jobid of the currently active job.
Short-circuits on ``$PBS_JOBID`` if set, which is the case in any
job spawned via ``qsub`` (PBS exports it into the script's
environment). Falling back to ``qstat`` is only needed when this
function is called outside of a job context — and that fallback
requires the ``qstat`` binary on ``PATH``, which is NOT available
on compute nodes on systems like Aurora where PBS server binaries
live under ``/opt/pbs/bin`` on login nodes only. Without this
short-circuit, ``ezpz launch`` raises
``ImportError: cannot import name 'qstat' from 'sh'`` from any
compute-node Python subprocess.
.. note::
When ``$PBS_NODEFILE`` is *also* set and readable, the fast
path cross-checks the local hostname against it before
trusting ``$PBS_JOBID``. This catches the (rare) case where
a user manually exports ``PBS_JOBID=<some-other-job>`` for
backwards compatibility or runs ``ezpz`` from an
``ssh``-into-a-compute-node shell of a different job. If the
hostname is missing from the nodefile we log a warning and
fall through to the ``qstat`` lookup. When ``$PBS_NODEFILE``
is absent (e.g. compute-node Python subprocesses that don't
inherit the full PBS env) we skip the check and trust
``$PBS_JOBID`` — that's the original motivating case.
"""
# Fast path: $PBS_JOBID is set by qsub and propagated by
# mpiexec --envall — skip the qstat shell-out entirely.
pbs_jobid = os.environ.get("PBS_JOBID")
if pbs_jobid:
# PBS_JOBID is of the form "12345.aurora-pbs-0001..." — strip
# the server suffix to match the rest of ezpz's jobid handling.
short = pbs_jobid.split(".")[0]
# Sanity check: when PBS_NODEFILE is readable, verify our
# hostname is actually in it. If not, $PBS_JOBID is lying
# (manual override, stale env, etc.) and we should fall
# through to the qstat-based lookup. Skip silently when
# PBS_NODEFILE is absent — that's the compute-node-subprocess
# case the fast path was built for.
#
# IMPORTANT: PBS_NODEFILE on Aurora contains FQDNs like
# `x4114c1s7b0n0.hostmgmt2.cm.aurora.alcf.anl.gov` while
# `socket.getfqdn()` returns the SHORT name `x4114c1s7b0n0` on
# compute nodes (because reverse DNS doesn't include the
# hostmgmt suffix from inside a job). Compare on short names
# only — strip the first `.`-separated segment from BOTH
# sides before membership check.
pbs_nodefile = os.environ.get("PBS_NODEFILE")
if pbs_nodefile:
try:
raw_nodes = Path(pbs_nodefile).read_text().split()
except OSError:
raw_nodes = None
if raw_nodes is not None:
nodes_short = {n.split(".", 1)[0] for n in raw_nodes}
local = socket.getfqdn().split(".", 1)[0]
if local not in nodes_short:
# Demoted from warning to debug: on 96-rank jobs
# this previously produced 96 identical lines of
# yellow output, and the function self-recovers
# via qstat. Keep the message so it's visible
# under EZPZ_LOG_LEVEL=debug when actually
# debugging a stale-jobid case.
logger.debug(
"$PBS_JOBID=%s but local hostname %s is not "
"in $PBS_NODEFILE=%s; falling through to qstat.",
pbs_jobid,
local,
pbs_nodefile,
)
else:
return short
else:
return short
else:
return short
# Slow path: walk the user's running jobs and match by hostname.
# Requires the qstat binary on PATH, which is NOT installed on
# compute nodes on Aurora/Sunspot/Polaris — keep this confined to
# login-node callers (e.g. `ezpz doctor`).
#
# ```python
# jobs = {
# jobid_A: [host_A0, host_A1, host_A2, ..., host_AN],
# jobid_B: [host_B0, host_B0, host_B2, ..., host_BN],
# ...,
# }
# ```
#
# Loop over {jobid, [hosts]} dictionary. At each iteration, look
# and see if _our_ `hostname` is anywhere in the `[hosts]` list.
# If so, then we know that we are currently participating in the
# `jobid` of that entry.
jobs = get_pbs_running_jobs_for_user()
for jobid, nodelist in jobs.items():
# NOTE:
# - `socket.fqdn()` (fully qualified domain name):
# - This will be of the form `x[0-9]+.cm.aurora.alcf.anl.gov`
# We only need the part before the first '.'
if socket.getfqdn().split(".")[0] in nodelist:
return jobid
return None
get_pbs_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None, *, verbose=False)
⚓︎
Get the PBS launch command.
Parameters⚓︎
ngpus : int, optional
Total number of GPUs to use. If None, inferred from other args or max.
nhosts : int, optional
Number of hosts (nodes). If None, defaults to max available.
ngpu_per_host : int, optional
GPUs per host. If None, inferred when possible.
hostfile : path-like, optional
Hostfile to use. If None, uses a fallback from get_hostfile_with_fallback.
verbose : bool, keyword-only
If True, log more and pass --verbose (where applicable).
Source code in src/ezpz/pbs.py
def get_pbs_launch_cmd(
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[Pathish] = None,
cpu_bind: Optional[str] = None,
*,
verbose: bool = False,
) -> str:
"""Get the PBS launch command.
Parameters
----------
ngpus : int, optional
Total number of GPUs to use. If None, inferred from other args or max.
nhosts : int, optional
Number of hosts (nodes). If None, defaults to max available.
ngpu_per_host : int, optional
GPUs per host. If None, inferred when possible.
hostfile : path-like, optional
Hostfile to use. If None, uses a fallback from `get_hostfile_with_fallback`.
verbose : bool, keyword-only
If True, log more and pass `--verbose` (where applicable).
"""
hostfile = hostfile or get_hostfile_with_fallback(hostfile)
hostfile_path = Path(hostfile)
ngpus, nhosts, ngpu_per_host = _infer_topology(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile_path,
)
ngpus_max = ezpz.get_world_size(total=True)
emoji = "✅" if ngpus == ngpus_max else "⚠️"
logger.info(
f"{emoji} Using [{ngpus}/{ngpus_max}] GPUs "
f"[{nhosts} hosts] x [{ngpu_per_host} GPU/host]"
)
if ngpus != ngpus_max:
logger.warning(
f"[🚧 WARNING] Using only [{ngpus}/{ngpus_max}] available GPUs!!"
)
machine_name = ezpz.get_machine().lower()
hostfile_str = str(hostfile_path)
if machine_name == "sophia":
# mpirun style
cmd_list = [
"mpirun",
f"-n={ngpus}",
f"-N={ngpu_per_host}",
f"--hostfile={hostfile_str}",
"-x PATH",
"-x LD_LIBRARY_PATH",
]
if verbose:
cmd_list.append("--verbose")
return " ".join(cmd_list)
# Default: mpiexec
cmd_list = [
"mpiexec",
"--envall",
"--line-buffer",
f"--np={ngpus}",
f"--ppn={ngpu_per_host}",
f"--hostfile={hostfile_str}",
]
if verbose:
cmd_list.append("--verbose")
cpu_bind_env = os.environ.get("CPU_BIND")
cpu_bind_cli = (
_normalize_cpu_bind_value(cpu_bind)
if cpu_bind is not None and cpu_bind.strip()
else None
)
cpu_bind_env_value = (
_normalize_cpu_bind_value(cpu_bind_env)
if cpu_bind_env is not None and cpu_bind_env.strip()
else None
)
cpu_bind_prefix = (
"--cpu-bind=verbose," if verbose else "--cpu-bind="
)
selected_cpu_bind = cpu_bind_cli or cpu_bind_env_value
if selected_cpu_bind:
if cpu_bind_cli is not None:
logger.info("Using cpu bind from --cpu-bind: %s", cpu_bind_cli)
else:
logger.warning(
"Detected CPU_BIND from environment: %s", cpu_bind_env
)
cmd_list.append(f"{cpu_bind_prefix}{selected_cpu_bind}")
else:
is_intel_xpu_machine = machine_name in {"aurora", "sunspot"}
if is_intel_xpu_machine:
# `--no-vni` was previously auto-added here for a
# transient network-interface workaround; dropped as of
# v0.18.x. Pass via launcher passthrough or set a custom
# CPU_BIND if you still need it.
cmd_list.append(f"{cpu_bind_prefix}{_CPU_BIND_AURORA}")
else:
# generic CPU binding
cmd_list.extend(["--cpu-bind=depth", "--depth=8"])
return " ".join(cmd_list)
get_pbs_launch_info(hostfile=None, jobid=None)
⚓︎
Get the PBS launch info
Source code in src/ezpz/pbs.py
def get_pbs_launch_info(
hostfile: Optional[str | Path] = None,
jobid: Optional[int | str] = None,
) -> dict[str, str]:
"""Get the PBS launch info"""
from ezpz.configs import get_scheduler
assert get_scheduler() == "PBS"
if hostfile is None:
hostfile = get_pbs_nodefile(jobid=jobid)
assert hostfile is not None
hfp = Path(hostfile)
hosts = ezpz.distributed.get_nodes_from_hostfile(hfp)
hosts = [h.split(".")[0] for h in hosts]
nhosts = len(hosts)
ngpu_per_host = ezpz.distributed.get_gpus_per_node()
ngpus_available = ezpz.distributed.get_world_size(total=True)
ngpus = nhosts * ngpu_per_host
world_size_total = ezpz.distributed.get_world_size_total()
launch_cmd = get_pbs_launch_cmd(hostfile=hostfile)
return {
"HOSTFILE": hfp.as_posix(),
"HOSTS": (
f"[{', '.join(hosts)}]"
if nhosts < 1000
else "[truncated (>1000 nodes)]"
),
"NHOSTS": f"{nhosts}",
"NGPU_PER_HOST": f"{ngpu_per_host}",
"NGPUS": f"{ngpus}",
"NGPUS_AVAILABLE": f"{ngpus_available}",
"MACHINE": ezpz.distributed.get_machine(),
"DEVICE": ezpz.distributed.get_torch_device_type(),
"BACKEND": ezpz.distributed.get_torch_backend(),
"LAUNCH_CMD": launch_cmd,
"world_size_total": f"{world_size_total}",
}
get_pbs_nodefile(jobid=None)
⚓︎
Get the nodefile for a given jobid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jobid
|
int | str
|
The jobid to get the nodefile for. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str | None
|
The path to the nodefile. |
Source code in src/ezpz/pbs.py
def get_pbs_nodefile(jobid: Optional[int | str] = None) -> str | None:
"""Get the nodefile for a given jobid.
Args:
jobid (int | str, optional): The jobid to get the nodefile for. Defaults to None.
Returns:
str: The path to the nodefile.
"""
if jobid is None:
jobid = get_pbs_jobid_of_active_job()
if jobid is None:
logger.warning("No active job found.")
return None
return get_pbs_nodefile_from_jobid(jobid)
get_pbs_nodefile_from_jobid(jobid)
⚓︎
Get the nodefile for a given jobid.
Source code in src/ezpz/pbs.py
def get_pbs_nodefile_from_jobid(jobid: int | str) -> str:
"""Get the nodefile for a given jobid."""
assert jobid is not None, "No jobid provided and no active job found."
pbs_parent = Path("/var/spool/pbs/aux")
pfiles = [
Path(pbs_parent).joinpath(f)
for f in os.listdir(pbs_parent)
if str(jobid) in f
]
assert len(pfiles) == 1, (
f"Found {len(pfiles)} files matching {jobid} in {pbs_parent}"
)
pbs_nodefile = pfiles[0]
assert pbs_nodefile.is_file(), f"Nodefile {pbs_nodefile} does not exist."
return pbs_nodefile.absolute().resolve().as_posix()
get_pbs_nodefile_of_active_job()
⚓︎
get_pbs_nodelist_from_jobid(jobid)
⚓︎
Get the nodelist for a given jobid.
Source code in src/ezpz/pbs.py
def get_pbs_nodelist_from_jobid(jobid: int | str) -> list[str]:
"""Get the nodelist for a given jobid."""
assert jobid is not None, "No jobid provided and no active job found."
jobs = get_pbs_running_jobs_for_user()
assert str(jobid) in jobs, (
f"Job ID {jobid} not found in running jobs for user {getuser()}"
)
return jobs[str(jobid)]
get_pbs_running_jobs_for_user()
⚓︎
Get all running jobs for the current user.
Results are cached for up to 30 s to avoid redundant qstat calls during a single launch sequence. Only rank 0 runs qstat; other ranks use the cached result.
Source code in src/ezpz/pbs.py
def get_pbs_running_jobs_for_user() -> dict[str, list[str]]:
"""Get all running jobs for the current user.
Results are cached for up to 30 s to avoid redundant qstat calls
during a single launch sequence. Only rank 0 runs qstat; other
ranks use the cached result.
"""
global _pbs_jobs_cache
if _pbs_jobs_cache is not None:
ts, cached = _pbs_jobs_cache
if time.monotonic() - ts < _PBS_JOBS_CACHE_TTL:
return cached
try:
from sh import qstat # type:ignore
except Exception as e:
logger.debug("Error importing sh.qstat: %s", e)
raise
output = _run_qstat_with_retry(qstat, f"-fn1wru {getuser()}")
jobarr = [
i for i in output.split("\n") if " R " in i
]
jobs: dict[str, list[str]] = {}
for row in jobarr:
jstr = [i for i in row.split(" ") if len(i) > 0]
jobid = jstr[0].split(".")[0]
nodelist = [h.split("/")[0] for h in jstr[-1].split("+")]
jobs[jobid] = nodelist
_pbs_jobs_cache = (time.monotonic(), jobs)
return jobs
get_running_jobs_from_qstat()
⚓︎
Get the running jobs from qstat
Source code in src/ezpz/pbs.py
def get_running_jobs_from_qstat() -> list[int]:
"""Get the running jobs from qstat"""
try:
from sh import qstat as shqstat # type: ignore
except Exception as e:
raise e
output = _run_qstat_with_retry(shqstat, "-u", os.environ.get("USER"))
return [
int(i.split(".")[0])
for i in output.split("\n")[2:-1]
if " R " in i
]