Skip to content

ezpz.submit⚓︎

Job submission helpers for PBS and SLURM schedulers.

Job submission helpers for PBS (qsub) and SLURM (sbatch).

Generates scheduler-specific job scripts from a command and resource requirements, then submits them. Supports two modes:

  1. Wrap a command — auto-generates a job script that calls ezpz launch <command>.
  2. Submit an existing script — passes a user-written script to the scheduler, optionally overriding resource directives.

Usage::

from ezpz.submit import submit

job_id = submit(
    command=["python3", "-m", "ezpz.examples.test", "--model", "small"],
    nodes=2,
    queue="debug",
    time="01:00:00",
)

detect_env_setup() ⚓︎

Return shell commands that reproduce the current Python environment.

Checks (in order):

  1. EZPZ_SETUP_ENV — if it points to a file, sources it; otherwise used as inline shell commands.
  2. Falls back to the ezpz_setup_env helper fetched via curl.

Returns:

Type Description
str

A (possibly multi-line) string of shell commands.

Source code in src/ezpz/submit.py
def detect_env_setup() -> str:
    """Return shell commands that reproduce the current Python environment.

    Checks (in order):

    1. ``EZPZ_SETUP_ENV`` — if it points to a file, sources it; otherwise
       used as inline shell commands.
    2. Falls back to the ``ezpz_setup_env`` helper fetched via curl.

    Returns:
        A (possibly multi-line) string of shell commands.
    """
    setup_env = os.environ.get("EZPZ_SETUP_ENV")
    if setup_env:
        if Path(setup_env).is_file():
            return f"source {shlex.quote(setup_env)}"
        return setup_env

    return "source <(curl -fsSL https://bit.ly/ezpz-utils) && ezpz_setup_env"

generate_pbs_script(command, *, nodes=1, time='01:00:00', queue='debug', account=None, filesystems='home', job_name=None, working_dir=None, env_setup=None, wrap_with_launch=True) ⚓︎

Generate a PBS job script.

Parameters:

Name Type Description Default
command str

The command to execute (already formatted as a string).

required
nodes int

Number of compute nodes.

1
time str

Walltime in HH:MM:SS format.

'01:00:00'
queue str

PBS queue name.

'debug'
account str | None

PBS account/project. Falls back to PBS_ACCOUNT or PROJECT environment variables.

None
filesystems str

Comma-separated or colon-separated filesystem list.

'home'
job_name str | None

Job name (defaults to "ezpz").

None
working_dir str | None

Directory to cd into (defaults to cwd).

None
env_setup str | None

Shell commands for environment activation.

None
wrap_with_launch bool

If True, prefix command with ezpz launch.

True

Returns:

Type Description
str

The complete job script as a string.

Source code in src/ezpz/submit.py
def generate_pbs_script(
    command: str,
    *,
    nodes: int = 1,
    time: str = "01:00:00",
    queue: str = "debug",
    account: str | None = None,
    filesystems: str = "home",
    job_name: str | None = None,
    working_dir: str | None = None,
    env_setup: str | None = None,
    wrap_with_launch: bool = True,
) -> str:
    """Generate a PBS job script.

    Args:
        command: The command to execute (already formatted as a string).
        nodes: Number of compute nodes.
        time: Walltime in ``HH:MM:SS`` format.
        queue: PBS queue name.
        account: PBS account/project.  Falls back to ``PBS_ACCOUNT`` or
            ``PROJECT`` environment variables.
        filesystems: Comma-separated or colon-separated filesystem list.
        job_name: Job name (defaults to ``"ezpz"``).
        working_dir: Directory to ``cd`` into (defaults to cwd).
        env_setup: Shell commands for environment activation.
        wrap_with_launch: If ``True``, prefix *command* with ``ezpz launch``.

    Returns:
        The complete job script as a string.
    """
    account = account or os.environ.get(
        "PBS_ACCOUNT", os.environ.get("PROJECT", "")
    )
    job_name = job_name or "ezpz"
    working_dir = working_dir or os.getcwd()
    if env_setup is None:
        env_setup = detect_env_setup()

    fs = filesystems.replace(",", ":")

    lines = [
        "#!/bin/bash --login",
        f"#PBS -l select={nodes}",
        f"#PBS -l walltime={time}",
        f"#PBS -l filesystems={fs}",
    ]
    if account:
        lines.append(f"#PBS -A {account}")
    lines += [
        "#PBS -k doe",
        "#PBS -j oe",
        f"#PBS -q {queue}",
        f"#PBS -N {job_name}",
        "",
        "set -eo pipefail",
        f"cd {shlex.quote(working_dir)}",
    ]
    if env_setup:
        lines += ["", "# ── Environment setup ──", env_setup]

    run_cmd = f"ezpz launch {command}" if wrap_with_launch else command
    lines += ["", run_cmd, ""]
    return "\n".join(lines)

generate_slurm_script(command, *, nodes=1, time='01:00:00', queue='debug', account=None, job_name=None, working_dir=None, env_setup=None, wrap_with_launch=True) ⚓︎

Generate a SLURM job script.

Parameters:

Name Type Description Default
command str

The command to execute (already formatted as a string).

required
nodes int

Number of compute nodes.

1
time str

Walltime in HH:MM:SS format.

'01:00:00'
queue str

SLURM partition name.

'debug'
account str | None

SLURM account. Falls back to SLURM_ACCOUNT or PROJECT environment variables.

None
job_name str | None

Job name (defaults to "ezpz").

None
working_dir str | None

Directory to cd into (defaults to cwd).

None
env_setup str | None

Shell commands for environment activation.

None
wrap_with_launch bool

If True, prefix command with ezpz launch.

True

Returns:

Type Description
str

The complete job script as a string.

Source code in src/ezpz/submit.py
def generate_slurm_script(
    command: str,
    *,
    nodes: int = 1,
    time: str = "01:00:00",
    queue: str = "debug",
    account: str | None = None,
    job_name: str | None = None,
    working_dir: str | None = None,
    env_setup: str | None = None,
    wrap_with_launch: bool = True,
) -> str:
    """Generate a SLURM job script.

    Args:
        command: The command to execute (already formatted as a string).
        nodes: Number of compute nodes.
        time: Walltime in ``HH:MM:SS`` format.
        queue: SLURM partition name.
        account: SLURM account.  Falls back to ``SLURM_ACCOUNT`` or
            ``PROJECT`` environment variables.
        job_name: Job name (defaults to ``"ezpz"``).
        working_dir: Directory to ``cd`` into (defaults to cwd).
        env_setup: Shell commands for environment activation.
        wrap_with_launch: If ``True``, prefix *command* with ``ezpz launch``.

    Returns:
        The complete job script as a string.
    """
    account = account or os.environ.get(
        "SLURM_ACCOUNT", os.environ.get("PROJECT", "")
    )
    job_name = job_name or "ezpz"
    working_dir = working_dir or os.getcwd()
    if env_setup is None:
        env_setup = detect_env_setup()

    lines = [
        "#!/bin/bash --login",
        f"#SBATCH --nodes={nodes}",
        f"#SBATCH --time={time}",
    ]
    if account:
        lines.append(f"#SBATCH --account={account}")
    lines += [
        f"#SBATCH --partition={queue}",
        f"#SBATCH --job-name={job_name}",
        "",
        "set -eo pipefail",
        f"cd {shlex.quote(working_dir)}",
    ]
    if env_setup:
        lines += ["", "# ── Environment setup ──", env_setup]

    run_cmd = f"ezpz launch {command}" if wrap_with_launch else command
    lines += ["", run_cmd, ""]
    return "\n".join(lines)

submit(command=None, script=None, *, nodes=1, time='01:00:00', queue='debug', account=None, filesystems='home', job_name=None, working_dir=None, scheduler=None, wrap_with_launch=True, dry_run=False, env_setup=None) ⚓︎

Submit a job to the active scheduler.

Either command (a list of args to wrap) or script (path to an existing job script) must be provided.

Parameters:

Name Type Description Default
command list[str] | None

Command to wrap in a generated job script.

None
script str | Path | None

Path to an existing job script to submit directly.

None
nodes int

Number of compute nodes.

1
time str

Walltime in HH:MM:SS format.

'01:00:00'
queue str

Queue or partition name.

'debug'
account str | None

Project/account for billing.

None
filesystems str

PBS filesystems directive (ignored for SLURM).

'home'
job_name str | None

Job name.

None
working_dir str | None

Working directory for the job.

None
scheduler str | None

Override scheduler detection ("PBS" or "SLURM").

None
wrap_with_launch bool

Wrap command with ezpz launch.

True
dry_run bool

Print the script but do not submit.

False

Returns:

Type Description
str | None

The job ID string, or None on failure / dry-run.

Source code in src/ezpz/submit.py
def submit(
    command: list[str] | None = None,
    script: str | Path | None = None,
    *,
    nodes: int = 1,
    time: str = "01:00:00",
    queue: str = "debug",
    account: str | None = None,
    filesystems: str = "home",
    job_name: str | None = None,
    working_dir: str | None = None,
    scheduler: str | None = None,
    wrap_with_launch: bool = True,
    dry_run: bool = False,
    env_setup: str | None = None,
) -> str | None:
    """Submit a job to the active scheduler.

    Either *command* (a list of args to wrap) or *script* (path to an
    existing job script) must be provided.

    Args:
        command: Command to wrap in a generated job script.
        script: Path to an existing job script to submit directly.
        nodes: Number of compute nodes.
        time: Walltime in ``HH:MM:SS`` format.
        queue: Queue or partition name.
        account: Project/account for billing.
        filesystems: PBS filesystems directive (ignored for SLURM).
        job_name: Job name.
        working_dir: Working directory for the job.
        scheduler: Override scheduler detection (``"PBS"`` or ``"SLURM"``).
        wrap_with_launch: Wrap *command* with ``ezpz launch``.
        dry_run: Print the script but do not submit.

    Returns:
        The job ID string, or ``None`` on failure / dry-run.
    """
    from ezpz.configs import get_scheduler

    if scheduler is None:
        scheduler = get_scheduler()

    if scheduler.upper() not in ("PBS", "SLURM"):
        print(
            f"No supported scheduler detected (got {scheduler!r}).\n"
            "Pass --scheduler PBS or --scheduler SLURM explicitly.",
            file=sys.stderr,
        )
        return None

    # ── Mode 2: submit existing script ───────────────────────────────────
    if script is not None:
        script_path = Path(script)
        if not script_path.is_file():
            print(f"Script not found: {script_path}", file=sys.stderr)
            return None
        if dry_run:
            print(script_path.read_text())
            print(f"[dry-run] Would submit {script_path} via {scheduler}")
            return None
        job_id = submit_job(script_path, scheduler)
        if job_id:
            print(f"Submitted job {job_id}")
        return job_id

    # ── Mode 1: generate script from command ─────────────────────────────
    if command is None or not command:
        print("No command or script provided.", file=sys.stderr)
        return None

    cmd_str = shlex.join(command)

    if job_name is None:
        # Derive from command: "python3 -m ezpz.examples.test" → "ezpz.examples.test"
        for i, arg in enumerate(command):
            if arg == "-m" and i + 1 < len(command):
                job_name = command[i + 1]
                break
        if job_name is None:
            job_name = Path(command[0]).stem

    if scheduler.upper() == "PBS":
        script_text = generate_pbs_script(
            cmd_str,
            nodes=nodes,
            time=time,
            queue=queue,
            account=account,
            filesystems=filesystems,
            job_name=job_name,
            working_dir=working_dir,
            wrap_with_launch=wrap_with_launch,
            env_setup=env_setup,
        )
    else:
        script_text = generate_slurm_script(
            cmd_str,
            nodes=nodes,
            time=time,
            queue=queue,
            account=account,
            job_name=job_name,
            working_dir=working_dir,
            wrap_with_launch=wrap_with_launch,
            env_setup=env_setup,
        )

    # Print for transparency
    print("Generated job script:")
    print("-" * 60)
    print(script_text)
    print("-" * 60)

    if dry_run:
        print(f"[dry-run] Would submit via {scheduler.lower()}")
        return None

    # Write to a file so the user can inspect/resubmit later
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    script_dir = Path(working_dir or os.getcwd())
    script_path = script_dir / f".ezpz-submit-{timestamp}.sh"
    script_path.write_text(script_text, encoding="utf-8")
    script_path.chmod(0o755)

    job_id = submit_job(script_path, scheduler)
    if job_id:
        print(f"Submitted job {job_id}")
        print(f"Script saved to {script_path}")
    return job_id

submit_job(script_path, scheduler) ⚓︎

Submit a job script and return the job ID.

Parameters:

Name Type Description Default
script_path str | Path

Path to the job script.

required
scheduler str

"PBS" or "SLURM".

required

Returns:

Type Description
str | None

The job ID string, or None if submission failed.

Source code in src/ezpz/submit.py
def submit_job(script_path: str | Path, scheduler: str) -> str | None:
    """Submit a job script and return the job ID.

    Args:
        script_path: Path to the job script.
        scheduler: ``"PBS"`` or ``"SLURM"``.

    Returns:
        The job ID string, or ``None`` if submission failed.
    """
    script_path = str(script_path)
    if scheduler.upper() == "PBS":
        cmd = ["qsub", script_path]
    elif scheduler.upper() == "SLURM":
        cmd = ["sbatch", script_path]
    else:
        logger.error("Unknown scheduler %r — cannot submit", scheduler)
        return None

    logger.info("Submitting: %s", " ".join(cmd))
    try:
        result = subprocess.run(
            cmd, capture_output=True, text=True, check=True
        )
        job_id = result.stdout.strip()
        return job_id
    except FileNotFoundError:
        logger.error(
            "%s not found — is the scheduler available on this system?",
            cmd[0],
        )
        return None
    except subprocess.CalledProcessError as exc:
        logger.error("Submission failed (exit %d): %s", exc.returncode, exc.stderr.strip())
        return None