ezpz.pbs⚓︎
- See ezpz/
pbs.py
pbs.py
Contains helper functions for working with the PBS Pro scheduler @ ALCF
See: docs/pbs.md for more information.
build_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None)
⚓︎
Build the launch command for the current job.
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
The launch command. |
Source code in src/ezpz/pbs.py
def build_launch_cmd(
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[Union[str, Path, os.PathLike]] = None,
) -> str:
"""Build the launch command for the current job.
Returns:
str: The launch command.
"""
from ezpz.configs import get_scheduler
scheduler = get_scheduler().lower()
if scheduler == "pbs":
return get_pbs_launch_cmd(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile,
)
elif scheduler == "slurm":
import ezpz.slurm
return ezpz.slurm.build_launch_cmd()
else:
raise ValueError(f"Unsupported scheduler: {scheduler}")
get_pbs_env(hostfile=None, jobid=None, verbose=None)
⚓︎
Get the PBS environment variables
Source code in src/ezpz/pbs.py
def get_pbs_env(
hostfile: Optional[Union[str, Path]] = None,
jobid: Optional[Union[int, str]] = None,
verbose: Optional[bool] = None,
) -> dict[str, str]:
"""Get the PBS environment variables"""
from ezpz.configs import get_scheduler
assert get_scheduler() == "PBS"
pbsenv = {k: v for k, v in dict(os.environ).items() if "PBS" in k}
if hostfile is None:
hostfile = os.environ.get("PBS_NODEFILE")
if hostfile is None:
hostfile = get_pbs_nodefile(jobid=jobid)
assert hostfile is not None
if (hfp := Path(hostfile)).is_file():
pbsenv |= {
f"{k.upper()}": f"{v}" for k, v in get_pbs_launch_info(hfp).items()
}
pbsenv |= {"LAUNCH_CMD": get_pbs_launch_cmd(hostfile=hostfile)}
os.environ |= pbsenv
if verbose and ezpz.dist.get_rank() == 0:
ezpz.dist.log_dict_as_bulleted_list(pbsenv, name="pbsenv")
return pbsenv
get_pbs_jobid_of_active_job()
⚓︎
Get the jobid of the currently active job.
Source code in src/ezpz/pbs.py
def get_pbs_jobid_of_active_job() -> str | None:
"""Get the jobid of the currently active job."""
# 1. Find all of users' currently running jobs
#
# ```python
# jobs = {
# jobid_A: [host_A0, host_A1, host_A2, ..., host_AN],
# jobid_B: [host_B0, host_B0, host_B2, ..., host_BN],
# ...,
# }
# ```
#
# 2. Loop over {jobid, [hosts]} dictionary.
# At each iteration, look and see if _our_ `hostname` is anywhere in the `[hosts]` list.
# If so, then we know that we are currently participating in the `jobid` of that entry.
jobs = get_pbs_running_jobs_for_user()
for jobid, nodelist in jobs.items():
# NOTE:
# - `socket.fqdn()` (fully qualified domain name):
# - This will be of the form `x[0-9]+.cm.aurora.alcf.anl.gov`
# We only need the part before the first '.'
if socket.getfqdn().split(".")[0] in nodelist:
return jobid
return None
get_pbs_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, *, verbose=False)
⚓︎
Get the PBS launch command.
Parameters⚓︎
ngpus : int, optional
Total number of GPUs to use. If None, inferred from other args or max.
nhosts : int, optional
Number of hosts (nodes). If None, defaults to max available.
ngpu_per_host : int, optional
GPUs per host. If None, inferred when possible.
hostfile : path-like, optional
Hostfile to use. If None, uses a fallback from get_hostfile_with_fallback.
verbose : bool, keyword-only
If True, log more and pass --verbose (where applicable).
Source code in src/ezpz/pbs.py
def get_pbs_launch_cmd(
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[Pathish] = None,
*,
verbose: bool = False,
) -> str:
"""Get the PBS launch command.
Parameters
----------
ngpus : int, optional
Total number of GPUs to use. If None, inferred from other args or max.
nhosts : int, optional
Number of hosts (nodes). If None, defaults to max available.
ngpu_per_host : int, optional
GPUs per host. If None, inferred when possible.
hostfile : path-like, optional
Hostfile to use. If None, uses a fallback from `get_hostfile_with_fallback`.
verbose : bool, keyword-only
If True, log more and pass `--verbose` (where applicable).
"""
hostfile = hostfile or get_hostfile_with_fallback(hostfile)
hostfile_path = Path(hostfile)
ngpus, nhosts, ngpu_per_host = _infer_topology(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile_path,
)
ngpus_max = ezpz.get_world_size(total=True)
emoji = "✅" if ngpus == ngpus_max else "⚠️"
logger.info(
f"{emoji} Using [{ngpus}/{ngpus_max}] GPUs "
f"[{nhosts} hosts] x [{ngpu_per_host} GPU/host]"
)
if ngpus != ngpus_max:
logger.warning(
f"[🚧 WARNING] Using only [{ngpus}/{ngpus_max}] available GPUs!!"
)
machine_name = ezpz.get_machine().lower()
hostfile_str = str(hostfile_path)
if machine_name == "sophia":
# mpirun style
cmd_list = [
"mpirun",
f"-n={ngpus}",
f"-N={ngpu_per_host}",
f"--hostfile={hostfile_str}",
"-x PATH",
"-x LD_LIBRARY_PATH",
]
if verbose:
cmd_list.append("--verbose")
return " ".join(cmd_list)
# Default: mpiexec
cmd_list = [
"mpiexec",
"--envall",
f"--np={ngpus}",
f"--ppn={ngpu_per_host}",
f"--hostfile={hostfile_str}",
]
if verbose:
cmd_list.append("--verbose")
cpu_bind_env = os.environ.get("CPU_BIND")
use_verbose_cpu_bind = ngpus < 1024
cpu_bind_prefix = (
"--cpu-bind=verbose," if use_verbose_cpu_bind else "--cpu-bind="
)
if cpu_bind_env:
logger.warning(f"Detected CPU_BIND from environment: {cpu_bind_env}")
bind_value = cpu_bind_env.replace("--cpu-bind=", "")
cmd_list.append(f"{cpu_bind_prefix}{bind_value}")
else:
is_intel_xpu_machine = machine_name in {"aurora", "sunspot"}
if is_intel_xpu_machine:
CPU_BIND_INTEL_XPU = (
"list:2-4:10-12:18-20:26-28:34-36:42-44:"
"54-56:62-64:70-72:78-80:86-88:94-96"
)
cmd_list.extend(
[
"--no-vni",
f"{cpu_bind_prefix}{CPU_BIND_INTEL_XPU}",
]
)
else:
# generic CPU binding
cmd_list.extend(["--cpu-bind=depth", "--depth=8"])
return " ".join(cmd_list)
get_pbs_launch_info(hostfile=None, jobid=None)
⚓︎
Get the PBS launch info
Source code in src/ezpz/pbs.py
def get_pbs_launch_info(
hostfile: Optional[str | Path] = None,
jobid: Optional[int | str] = None,
) -> dict[str, str]:
"""Get the PBS launch info"""
from ezpz.configs import get_scheduler
assert get_scheduler() == "PBS"
if hostfile is None:
hostfile = get_pbs_nodefile(jobid=jobid)
assert hostfile is not None
hfp = Path(hostfile)
hosts = ezpz.dist.get_nodes_from_hostfile(hfp)
hosts = [h.split(".")[0] for h in hosts]
nhosts = len(hosts)
ngpu_per_host = ezpz.dist.get_gpus_per_node()
ngpus_available = ezpz.dist.get_world_size(total=True)
ngpus = nhosts * ngpu_per_host
world_size_total = ezpz.dist.get_world_size_total()
launch_cmd = get_pbs_launch_cmd(hostfile=hostfile)
return {
"HOSTFILE": hfp.as_posix(),
"HOSTS": (
f"[{', '.join(hosts)}]"
if nhosts < 1000
else "[truncated (>1000 nodes)]"
),
"NHOSTS": f"{nhosts}",
"NGPU_PER_HOST": f"{ngpu_per_host}",
"NGPUS": f"{ngpus}",
"NGPUS_AVAILABLE": f"{ngpus_available}",
"MACHINE": ezpz.dist.get_machine(),
"DEVICE": ezpz.dist.get_torch_device_type(),
"BACKEND": ezpz.dist.get_torch_backend(),
"LAUNCH_CMD": launch_cmd,
"world_size_total": f"{world_size_total}",
}
get_pbs_nodefile(jobid=None)
⚓︎
Get the nodefile for a given jobid.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
jobid
|
int | str
|
The jobid to get the nodefile for. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str | None
|
The path to the nodefile. |
Source code in src/ezpz/pbs.py
def get_pbs_nodefile(jobid: Optional[int | str] = None) -> str | None:
"""Get the nodefile for a given jobid.
Args:
jobid (int | str, optional): The jobid to get the nodefile for. Defaults to None.
Returns:
str: The path to the nodefile.
"""
if jobid is None:
jobid = get_pbs_jobid_of_active_job()
if jobid is None:
logger.warning("No active job found.")
return None
return get_pbs_nodefile_from_jobid(jobid)
get_pbs_nodefile_from_jobid(jobid)
⚓︎
Get the nodefile for a given jobid.
Source code in src/ezpz/pbs.py
def get_pbs_nodefile_from_jobid(jobid: int | str) -> str:
"""Get the nodefile for a given jobid."""
assert jobid is not None, "No jobid provided and no active job found."
pbs_parent = Path("/var/spool/pbs/aux")
pfiles = [
Path(pbs_parent).joinpath(f)
for f in os.listdir(pbs_parent)
if str(jobid) in f
]
assert len(pfiles) == 1, (
f"Found {len(pfiles)} files matching {jobid} in {pbs_parent}"
)
pbs_nodefile = pfiles[0]
assert pbs_nodefile.is_file(), f"Nodefile {pbs_nodefile} does not exist."
return pbs_nodefile.absolute().resolve().as_posix()
get_pbs_nodefile_of_active_job()
⚓︎
get_pbs_nodelist_from_jobid(jobid)
⚓︎
Get the nodelist for a given jobid.
Source code in src/ezpz/pbs.py
def get_pbs_nodelist_from_jobid(jobid: int | str) -> list[str]:
"""Get the nodelist for a given jobid."""
assert jobid is not None, "No jobid provided and no active job found."
jobs = get_pbs_running_jobs_for_user()
assert str(jobid) in jobs, (
f"Job ID {jobid} not found in running jobs for user {getuser()}"
)
return jobs[str(jobid)]
get_pbs_running_jobs_for_user()
⚓︎
Get all running jobs for the current user.
Source code in src/ezpz/pbs.py
def get_pbs_running_jobs_for_user():
"""Get all running jobs for the current user."""
try:
from sh import qstat # type:ignore
except Exception as e:
print("Error importing sh.qstat:", e)
raise e
jobarr = [
i for i in qstat(f"-fn1wru {getuser()}").split("\n") if " R " in i
]
jobs = {}
for row in jobarr:
jstr = [i for i in row.split(" ") if len(i) > 0]
jobid = jstr[0].split(".")[0]
nodelist = [h.split("/")[0] for h in jstr[-1].split("+")]
jobs[jobid] = nodelist
return jobs
get_running_jobs_from_qstat()
⚓︎
Get the running jobs from qstat