ezpz.launch⚓︎
- See ezpz/
launch.py
ezpz/launch.py
Launch a command on the current PBS or SLURM job.
By default, the command to be executed will be launched across all nodes.
build_executable(launch_cmd=None, cmd_to_launch=None, include_python=False, ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None, extra_launch_args=None)
⚓︎
Build the full executable command to launch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
launch_cmd
|
str
|
The command to launch the job. If None,
will be built using |
None
|
cmd_to_launch
|
str or list
|
The command to run on the job.
If None, will be taken from |
None
|
include_python
|
bool
|
Whether to include the python executable in the command. Defaults to False. |
False
|
extra_launch_args
|
Sequence[str]
|
Additional arguments to append to the scheduler/launcher invocation (e.g., mpirun flags). |
None
|
Returns:
| Type | Description |
|---|---|
list
|
list[str]: The full command to launch the job. |
Source code in src/ezpz/launch.py
def build_executable(
launch_cmd: Optional[str] = None,
cmd_to_launch: Optional[str | list[str]] = None,
include_python: bool = False,
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[str | os.PathLike | Path] = None,
cpu_bind: Optional[str] = None,
extra_launch_args: Optional[Sequence[str]] = None,
) -> list:
"""Build the full executable command to launch.
Args:
launch_cmd (str, optional): The command to launch the job. If None,
will be built using `build_launch_cmd()`.
cmd_to_launch (str or list, optional): The command to run on the job.
If None, will be taken from `sys.argv`.
include_python (bool, optional): Whether to include the python
executable in the command. Defaults to False.
extra_launch_args (Sequence[str], optional): Additional arguments to
append to the scheduler/launcher invocation (e.g., mpirun flags).
Returns:
list[str]: The full command to launch the job.
"""
extra_launch_args = list(extra_launch_args) if extra_launch_args else []
from ezpz.pbs import build_launch_cmd
launch_cmd = (
build_launch_cmd(
ngpus=ngpus,
nhosts=nhosts,
ngpu_per_host=ngpu_per_host,
hostfile=hostfile,
cpu_bind=cpu_bind,
)
if launch_cmd is None
else launch_cmd
)
cmd_to_launch = (
get_command_to_launch_from_argv()
if cmd_to_launch is None
else cmd_to_launch
)
cmd_to_launch_list: list[str] = (
shlex.split(cmd_to_launch)
if isinstance(cmd_to_launch, str)
else (cmd_to_launch if cmd_to_launch is not None else [])
)
if include_python:
found_python = any("python" in str(p) for p in cmd_to_launch_list)
if not found_python:
cmd_to_launch_list.insert(0, _resolve_launch_python())
cmd_to_launch_str = shlex.join(cmd_to_launch_list)
logger.info("Building command to execute by piecing together:")
logger.info(f"(1.) launch_cmd: {launch_cmd}")
logger.info(f"(2.) cmd_to_launch: {cmd_to_launch_str}")
executable = [
*shlex.split(launch_cmd),
*extra_launch_args,
*cmd_to_launch_list,
]
# executable = [
# shlex.join(launch_cmd.split(' ')), *cmd_to_launch_list
# ]
# return shlex.split(shlex.join(executable))
return executable
command_exists(cmd)
⚓︎
configure_warnings()
⚓︎
Silence noisy deprecation warnings for child processes.
get_active_jobid()
⚓︎
Return the job identifier for the currently running PBS/SLURM job.
Source code in src/ezpz/launch.py
def get_active_jobid() -> str | None:
"""Return the job identifier for the currently running PBS/SLURM job."""
from ezpz.configs import get_scheduler
scheduler = get_scheduler().lower()
if scheduler == "pbs":
import ezpz.pbs
return ezpz.pbs.get_pbs_jobid_of_active_job()
elif scheduler == "slurm":
import ezpz.slurm
return ezpz.slurm.get_slurm_jobid_of_active_job()
else:
return None
get_aurora_filters(additional_filters=None)
⚓︎
Return log filtering patterns tailored for Aurora clusters.
Source code in src/ezpz/launch.py
def get_aurora_filters(additional_filters: Optional[list] = None) -> list:
"""Return log filtering patterns tailored for Aurora clusters."""
mn = ezpz.get_machine()
filters = [*additional_filters] if additional_filters else []
if mn.lower() == "aurora":
if EZPZ_LOG_LEVEL == "DEBUG":
filters = []
else:
filters += [
"cuda",
"CUDA",
"cuDNN",
"cuBLAS",
"[W501",
"AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'",
" Overriding a previously registered kernel",
"operator: aten::_cummax_helper",
" registered at build",
"dispatch key: XPU",
"previous kernel: registered at",
"pkg_resources is deprecated as an API",
"import pkg_resources",
"UserWarning: pkg_resources",
"new kernel: registered at",
"/build/pytorch/build/aten/src/ATen/RegisterSchema.cpp",
"Setting ds_accelerator to xpu",
"Trying to register 2 metrics with the same name",
"TF-TRT Warning",
"Warning only once",
"measureDifference between two events",
"AttributeError",
"Initialized with serialization",
"AttributeError: 'MessageFactory' object has no attribute 'GetPrototype'",
"In file included from /var/run/palsd/",
"/opt/aurora/26.26.0/oneapi/compiler/latest/include/sycl/sycl.hpp:41:1",
'41 | __SYCL_WARNING("You are including <sycl/sycl.hpp> without -fsycl flag',
"| ^",
"/opt/aurora/26.26.0/oneapi/compiler/latest/include/sycl/sycl.hpp:34:29: note:",
"34 | #define __SYCL_WARNING(msg) _Pragma(__SYCL_TOSTRING(GCC warning msg))",
# | ^
"<scratch space>:58:6: note: expanded from here",
'58 | GCC warning "You are including <sycl/sycl.hpp> without -fsycl flag, which is errorenous for device code compilation.',
"This warning can be disabled by setting SYCL_DISABLE_FSYCL_SYCLHPP_WARNING macro.",
"| ^",
'# "operator: aten::geometric"',
"1 warning generated.",
"| ^",
'41 | __SYCL_WARNING("You are including <sycl/sycl.hpp> without -fsycl flcan be disabled by setting SYCL_DISABLE_FSYCL_SYCLHPP_WARNING macro."',
"ag, \\",
"34 | #define __SYCL_WARNING(msg) _Pragma(__SYCL_TOmacro.",
"STRING(GCC warning msg))",
'41 | __SYCL_WARNING("are including <sycl/sycl.hpp> without -fsycl flag, \\',
"by setting SYCL_DISABLE_FSYCL_SYCLHPP_WARNING macro.",
]
logger.info(
" ".join(
[
"Filtering for Aurora-specific messages.",
"To view list of filters, run with EZPZ_LOG_LEVEL=DEBUG",
]
)
)
logger.debug(f"Filters: {filters}")
return filters
get_command_to_launch_from_argv()
⚓︎
Return the command specified on sys.argv or None if absent.
Source code in src/ezpz/launch.py
get_hostfile_of_active_job()
⚓︎
Get hostfile of active job.
Source code in src/ezpz/launch.py
def get_hostfile_of_active_job():
"""Get hostfile of active job."""
from ezpz.configs import get_scheduler
scheduler = get_scheduler().lower()
if scheduler == "pbs":
import ezpz.pbs
return ezpz.pbs.get_pbs_nodefile_of_active_job()
elif scheduler == "slurm":
import ezpz.slurm
# jobid = ezpz.slurm.get_slurm_jobid_of_active_job()
# if jobid is not None:
# return ezpz.slurm.get_slurm_nodefile_from_jobid(jobid)
return ezpz.slurm.get_slurm_nodefile_of_active_job()
return None
get_nodelist_of_active_job()
⚓︎
Get nodelist of active job.
Source code in src/ezpz/launch.py
def get_nodelist_of_active_job() -> list[str] | None:
"""Get nodelist of active job."""
from ezpz.configs import get_scheduler
scheduler = get_scheduler().lower()
if scheduler == "pbs":
import ezpz.pbs
jobid = ezpz.pbs.get_pbs_jobid_of_active_job()
if jobid is not None:
return ezpz.pbs.get_pbs_nodelist_from_jobid(jobid)
elif scheduler == "slurm":
import ezpz.slurm
jobid = ezpz.slurm.get_slurm_jobid_of_active_job()
if jobid is not None:
return ezpz.slurm.get_nodelist_from_slurm_jobid(jobid)
return None
get_scheduler(_scheduler=None)
⚓︎
Delegate scheduler detection to the configs module.
kill_existing_processes(filters=None, additional_filters=None)
⚓︎
Kill existing processes that match the filters.
Source code in src/ezpz/launch.py
def kill_existing_processes(
filters: Optional[list] = None,
additional_filters: Optional[list] = None,
) -> int:
"""Kill existing processes that match the filters."""
# TODO: Run this as preamble to launching
filters = [] if filters is None else filters
if ezpz.get_machine().lower() == "aurora":
filters += get_aurora_filters(additional_filters=additional_filters)
if len(filters) == 0:
logger.info("No filters provided; skipping process cleanup.")
return 0
logger.info(f"Killing existing processes with filters: {filters}")
filter_pattern = " ".join(filters)
cmd = ["pkill", "-f", filter_pattern]
return run_command(cmd, filters=filters)
launch(launch_cmd=None, cmd_to_launch=None, include_python=False, ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, cpu_bind=None, filters=None, launcher_args=None, idle_timeout_s=None, retries=0, auto_retry=False, spare_nodes=None, max_failover_retries=None)
⚓︎
Launch a command on the current {PBS, SLURM} job.
Source code in src/ezpz/launch.py
def launch(
launch_cmd: Optional[str] = None,
cmd_to_launch: Optional[str | list[str]] = None,
include_python: bool = False,
ngpus: Optional[int] = None,
nhosts: Optional[int] = None,
ngpu_per_host: Optional[int] = None,
hostfile: Optional[str | os.PathLike | Path] = None,
cpu_bind: Optional[str] = None,
filters: Optional[list[str]] = None,
launcher_args: Optional[Sequence[str]] = None,
idle_timeout_s: Optional[int] = None,
retries: int = 0,
auto_retry: bool = False,
spare_nodes: "int | str | None" = None,
max_failover_retries: Optional[int] = None,
) -> int:
"""Launch a command on the current {PBS, SLURM} job."""
start = time.perf_counter()
print("\n") if ezpz.get_rank() == 0 else None
logger.info(f"----[🍋 ezpz.launch][started][{ezpz.get_timestamp()}]----")
_log_json_log_file(logger)
jobid = get_active_jobid()
assert jobid is not None, "No active job found."
nodelist = get_nodelist_of_active_job()
active_hostfile = get_hostfile_of_active_job()
selected_hostfile: Optional[Path]
if hostfile is not None:
selected_hostfile = Path(hostfile).expanduser()
else:
selected_hostfile = (
Path(active_hostfile).expanduser()
if active_hostfile is not None
else None
)
if selected_hostfile is not None and not selected_hostfile.exists():
logger.warning(
"Hostfile %s does not exist; continuing without explicit hostfile.",
selected_hostfile,
)
selected_hostfile = None
# --auto-retry path: split the full nodelist into active + spare
# BEFORE building the launcher command, so the launcher's
# topology inference sees the (smaller) active hostfile and we
# don't trip _infer_topology's "ngpus > N_active*ppn" check on
# the unused spare hosts.
autoretry_allocation = None
autoretry_log_dir: Optional[Path] = None
if auto_retry:
# CLI entrypoints (parse_args) already enforce this; the
# check here is the library-level precondition for direct
# `launch(auto_retry=True, ngpus=None)` callers. Same
# invariant, different audience.
if ngpus is None:
raise ValueError(
"launch(auto_retry=True) requires ngpus to be set "
"explicitly. The auto-retry loop needs the training "
"rank count to split the node pool into active + "
"spare. (CLI: pass --nproc / -n / --np.)"
)
# Source the candidate node pool. Explicit --hostfile beats
# the scheduler nodelist — see _resolve_auto_retry_node_pool.
autoretry_pool = _resolve_auto_retry_node_pool(
selected_hostfile, nodelist
)
# `--ppn` (== ngpu_per_host) sets ranks-per-node when given;
# fall back to the cluster's GPU count (the typical default,
# one rank per GPU) when not. `or 1` covers the degenerate
# case where get_gpus_per_node() returns 0 on a non-GPU node.
ranks_per_node = ngpu_per_host or ezpz.get_gpus_per_node() or 1
nhosts_active = _ranks_to_hosts(ngpus, ranks_per_node)
autoretry_log_dir = _auto_retry_log_dir(jobid)
autoretry_allocation, autoretry_hostfile = (
_resolve_auto_retry_allocation(
autoretry_pool,
nhosts_active,
spare_nodes,
autoretry_log_dir,
)
)
# Override: subsequent build_executable + topology inference
# see the smaller active hostfile, not the full PBS aux file
# (or the user-supplied --hostfile — both have been split into
# active + spare and only the active subset goes to the launcher).
selected_hostfile = autoretry_hostfile
nhosts = nhosts_active
logger.info(f"Job ID: {jobid}")
logger.info(f"nodelist: {nodelist}")
logger.info(f"hostfile: {selected_hostfile}")
cmd_list = build_executable(
launch_cmd=launch_cmd,
cmd_to_launch=cmd_to_launch,
ngpus=ngpus,
ngpu_per_host=ngpu_per_host,
nhosts=nhosts,
include_python=include_python,
hostfile=selected_hostfile,
cpu_bind=cpu_bind,
extra_launch_args=launcher_args,
)
# cmd_list = shlex.split(cmd)
cmd_str = shlex.join([f"{i}" for i in cmd_list])
cmd = shlex.split(cmd_str)
logger.info(
f"Took: {time.perf_counter() - start:.2f} seconds to build command."
)
logger.info("Executing:\n" + "\n ".join([f"{i}" for i in cmd_list]))
t0 = time.perf_counter()
os.environ["EZPZ_RUN_COMMAND"] = str(cmd)
logger.info(f"Execution started @ {ezpz.get_timestamp()}...")
cmd_start = time.perf_counter()
if autoretry_allocation is not None:
from ezpz.launch_autoretry import (
AutoRetryConfig,
DEFAULT_AUTO_RETRY_IDLE_TIMEOUT_S,
run_with_auto_retry,
)
assert autoretry_log_dir is not None # set above when auto_retry
# Default --auto-retry's idle watchdog to 30min if the caller
# didn't pass --timeout — matches FAILOVER_IDLE_TIMEOUT in
# failover.sh and prevents 5h xccl hangs from burning the
# full PBS walltime.
effective_timeout = (
idle_timeout_s
if idle_timeout_s is not None
else DEFAULT_AUTO_RETRY_IDLE_TIMEOUT_S
)
ar_config = AutoRetryConfig(
cmd=list(cmd),
log_dir=autoretry_log_dir,
idle_timeout_s=effective_timeout,
max_failover_retries=max_failover_retries,
)
retcode = run_with_auto_retry(ar_config, autoretry_allocation)
else:
retcode = _run_with_retries(
cmd, idle_timeout_s=idle_timeout_s, retries=retries
)
cmd_finish = time.perf_counter()
_log_json_log_file(logger)
logger.info(f"----[🍋 ezpz.launch][stop][{ezpz.get_timestamp()}]----")
logger.info(f"Execution finished with {retcode}.")
logger.info(f"Executing finished in {cmd_finish - cmd_start:.2f} seconds.")
logger.info(
f"Took {time.perf_counter() - t0:.2f} seconds to run. Exiting."
)
return retcode
main()
⚓︎
parse_args(argv=None)
⚓︎
Parse command line arguments.
Source code in src/ezpz/launch.py
def parse_args(argv: Optional[Sequence[str]] = None):
"""Parse command line arguments."""
argv = [] if argv is None else list(argv)
launch_argv, command_from_sep = _split_launch_and_command(argv)
if any(flag in launch_argv for flag in ("-h", "--help")):
# Show help with the positional command documented.
parser = build_launch_parser(include_command=True)
parser.parse_args(launch_argv)
parser = build_launch_parser(include_command=False)
args, unknown = parser.parse_known_args(launch_argv)
args.command = command_from_sep if command_from_sep else unknown
# Unknown flags that precede the ``--`` separator are forwarded to the
# underlying launcher (e.g., mpirun -x FOO=bar -- python ...).
args.launcher_args = unknown if command_from_sep else []
# Cross-flag validation. argparse mutex groups can't express
# "--auto-retry vs --retries with a non-zero value" (since
# --retries defaults to 0), so do it here. Same for --auto-retry
# requiring an explicit --nproc — the value depends on whether
# the user passed -n at all, which argparse can't observe from
# inside the parser.
if getattr(args, "auto_retry", False):
if getattr(args, "retries", 0):
raise SystemExit(
"--auto-retry is mutually exclusive with --retries. "
"--retries is bounded per-process retry; --auto-retry "
"is unbounded node-level failover. Pick one."
)
if getattr(args, "nproc", -1) <= 0:
raise SystemExit(
"--auto-retry requires --nproc (-n/--np) to be set "
"explicitly. The auto-retry loop needs the training "
"rank count to split the PBS allocation into active "
"+ spare hosts."
)
return args
run(argv=None)
⚓︎
CLI entry point for launching commands with scheduler fallback.
Source code in src/ezpz/launch.py
def run(argv: Sequence[str] | None = None) -> int:
"""CLI entry point for launching commands with scheduler fallback."""
import ezpz.distributed
configure_warnings()
argv = [] if argv is None else list(argv)
args = parse_args(argv)
command_parts = [part for part in args.command if part]
if not command_parts:
if getattr(args, "print_source", False):
from importlib import import_module
launch_cli_mod = import_module("ezpz.cli.launch_cmd")
source_path = Path(
getattr(launch_cli_mod, "__file__", "")
).resolve()
print(source_path)
return 0
raise SystemExit("No command provided to ezpz launch")
scheduler = get_scheduler().lower()
cli_cpu_bind = _normalize_cpu_bind_value(getattr(args, "cpu_bind", None))
env_cpu_bind = _normalize_cpu_bind_value(os.environ.get("CPU_BIND"))
selected_cpu_bind = cli_cpu_bind or env_cpu_bind
if cli_cpu_bind is not None and env_cpu_bind is not None:
logger.warning(
"Both --cpu-bind and CPU_BIND are specified. "
"Precedence order is: --cpu-bind > CPU_BIND. "
"Using --cpu-bind=%s.",
cli_cpu_bind,
)
if scheduler in {"pbs", "slurm"}:
jobid = get_active_jobid()
if jobid is not None:
launcher_args = list(getattr(args, "launcher_args", []))
if scheduler != "pbs":
launcher_args.extend(
_cpu_bind_launcher_args(selected_cpu_bind)
)
rc = launch(
cmd_to_launch=command_parts,
include_python=False,
ngpus=(args.nproc if args.nproc > -1 else None),
nhosts=(args.nhosts if args.nhosts > -1 else None),
ngpu_per_host=(
args.nproc_per_node if args.nproc_per_node > -1 else None
),
hostfile=args.hostfile,
cpu_bind=cli_cpu_bind if scheduler == "pbs" else None,
filters=args.filter,
launcher_args=launcher_args,
idle_timeout_s=getattr(args, "idle_timeout_s", None),
retries=getattr(args, "retries", 0),
auto_retry=getattr(args, "auto_retry", False),
spare_nodes=getattr(args, "spare_nodes", None),
max_failover_retries=getattr(
args, "max_failover_retries", None
),
)
ezpz.distributed.cleanup()
return rc
requested_nproc = args.nproc if args.nproc > -1 else None
requested_ppn = args.nproc_per_node if args.nproc_per_node > -1 else None
requested_nhosts = args.nhosts if args.nhosts > -1 else None
if (
requested_nproc is None
and requested_ppn is not None
and requested_nhosts is not None
):
requested_nproc = requested_ppn * requested_nhosts
if requested_nproc is None:
requested_nproc = int(os.environ.get("WORLD_SIZE", "2"))
env_flags = _get_mpirun_env_flags()
fallback_cmd = ["mpirun", *env_flags, "-np", str(requested_nproc)]
if args.hostfile:
fallback_cmd.extend(["--hostfile", args.hostfile])
if requested_ppn is not None and requested_nhosts is not None:
fallback_cmd.extend(["--map-by", f"ppr:{requested_ppn}:node"])
fallback_cmd.extend(_cpu_bind_launcher_args(selected_cpu_bind))
fallback_cmd.extend(getattr(args, "launcher_args", []))
fallback_cmd.extend(command_parts)
print("\n") if ezpz.get_rank() == 0 else None
logger.info(f"----[🍋 ezpz.launch][started][{ezpz.get_timestamp()}]----")
_log_json_log_file(logger)
logger.info(
"No active scheduler detected; falling back to local mpirun: %s",
" ".join(shlex.quote(part) for part in fallback_cmd),
)
os.environ["EZPZ_RUN_COMMAND"] = " ".join(fallback_cmd)
logger.info(f"Execution started @ {ezpz.get_timestamp()}...")
cmd_start = time.perf_counter()
retcode = _run_with_retries(
fallback_cmd,
idle_timeout_s=getattr(args, "idle_timeout_s", None),
retries=getattr(args, "retries", 0),
)
cmd_finish = time.perf_counter()
_log_json_log_file(logger)
logger.info(f"----[🍋 ezpz.launch][stop][{ezpz.get_timestamp()}]----")
logger.info(f"Execution finished with {retcode}.")
logger.info(f"Executing finished in {cmd_finish - cmd_start:.2f} seconds.")
ezpz.distributed.cleanup()
return retcode
run_bash_command(command)
⚓︎
run_command(command, filters=None)
⚓︎
Run a command and print its output line by line.
Args:
- command (str or list): The command to run. If a string, it will be split into a list
- filters (list, optional): A list of strings to filter the output lines.
Source code in src/ezpz/launch.py
def run_command(
command: Sequence[str] | str, filters: Optional[Sequence[str]] = None
) -> int:
"""Run a command and print its output line by line.
Args:
- command (str or list): The command to run. If a string, it will be split
into a list
- filters (list, optional): A list of strings to filter the output
lines.
"""
# XXX: Replace `subprocess.Popen`
# with `subprocess.run` for better error handling ??
# <https://docs.python.org/3.10/library/subprocess.html#subprocess.run>
cmd_list = _normalize_command(command)
if filters is not None and len(filters) > 0:
logger.info(f"Caught {len(filters)} filters")
logger.info(
" ".join(
[
"Running command:\n",
shlex.join(cmd_list),
]
)
)
os.environ["EZPZ_RUN_COMMAND"] = str(cmd_list)
with subprocess.Popen(
cmd_list,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
close_fds=True,
) as process:
assert process.stdout is not None
for line in process.stdout:
if (
filters is None
or len(filters) == 0
or not any(f in line for f in filters)
):
print(line.rstrip())
return process.returncode or 0