ezpz.utils⚓︎
- See ezpz/
utils
ezpz/utils/init.py
DistributedPdb
⚓︎
Bases: Pdb
Supports using PDB from inside a multiprocessing child process.
Usage: DistributedPdb().set_trace()
Source code in src/ezpz/utils/__init__.py
class DistributedPdb(pdb.Pdb):
"""
Supports using PDB from inside a multiprocessing child process.
Usage:
DistributedPdb().set_trace()
"""
def interaction(self, *args, **kwargs):
_stdin = sys.stdin
try:
sys.stdin = open("/dev/stdin")
pdb.Pdb.interaction(self, *args, **kwargs)
finally:
sys.stdin = _stdin
DummyTqdmFile
⚓︎
Dummy file-like wrapper that forwards writes to tqdm.
Source code in src/ezpz/utils/__init__.py
class DummyTqdmFile:
"""Dummy file-like wrapper that forwards writes to tqdm."""
file = None
def __init__(self, file):
self.file = file
def write(self, text):
if len(text.rstrip()) > 0:
tqdm.tqdm.write(text, file=self.file, end="\n")
def flush(self):
return getattr(self.file, "flush", lambda: None)()
ForkedPdb
⚓︎
Bases: Pdb
PDB subclass for debugging multi-processed code.
Source code in src/ezpz/utils/__init__.py
breakpoint(rank=0)
⚓︎
Set a breakpoint, but only on a single rank. All other ranks will wait for you to be done with the breakpoint before continuing.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
rank
|
int
|
Which rank to break on. Default: |
0
|
Source code in src/ezpz/utils/__init__.py
def breakpoint(rank: int = 0):
"""
Set a breakpoint, but only on a single rank. All other ranks will wait for you to be
done with the breakpoint before continuing.
Args:
rank (int): Which rank to break on. Default: ``0``
"""
if ezpz.get_rank() == rank:
pdb = DistributedPdb()
pdb.message(
"\n!!! ATTENTION !!!\n\n"
f"Type 'up' to get to the frame that called dist.breakpoint(rank={rank})\n"
)
pdb.set_trace()
# torch.distributed.barrier()
ezpz.dist.barrier()
format_pair(k, v, precision=6)
⚓︎
Format a key-value pair (supports nested dict/list/tuple/set).
Nested dicts become dotted keys: key.subkey=value Sequences become indexed keys: key[0]=value
Returns a newline-joined string if multiple leaf pairs are produced.
Source code in src/ezpz/utils/__init__.py
def format_pair(k: str, v: Any, precision: int = 6) -> str:
"""Format a key-value pair (supports nested dict/list/tuple/set).
Nested dicts become dotted keys: key.subkey=value
Sequences become indexed keys: key[0]=value
Returns a newline-joined string if multiple leaf pairs are produced.
"""
def _is_int_like(x: Any) -> bool:
return (
isinstance(x, (bool, int, np.integer))
and not isinstance(x, (bool,)) is False
) # keep bool distinct below
def _is_bool_like(x: Any) -> bool:
return isinstance(x, (bool, np.bool_))
def _is_float_like(x: Any) -> bool:
return isinstance(x, (float, np.floating))
def _scalar_str(key: str, val: Any) -> str:
# numpy scalar -> python scalar (helps consistent isinstance checks)
if isinstance(val, np.generic):
val = val.item()
if _is_bool_like(val):
return f"{key}={bool(val)}"
if isinstance(val, (int, np.integer)):
return f"{key}={int(val)}"
if isinstance(val, float):
# be explicit for non-finite floats (avoids ValueError with format spec)
if not math.isfinite(val):
return f"{key}={val}"
return f"{key}={val:.{precision}f}"
# fallback: strings, None, objects, etc.
return f"{key}={val}"
def _flatten(key: str, val: Any) -> list[str]:
# numpy scalar -> python scalar early
if isinstance(val, np.generic):
val = val.item()
if isinstance(val, dict):
out: list[str] = []
for kk, vv in val.items():
out.extend(_flatten(f"{key}.{kk}", vv))
return out
if isinstance(val, (list, tuple)):
out: list[str] = []
for i, vv in enumerate(val):
out.extend(_flatten(f"{key}[{i}]", vv))
return out
if isinstance(val, set):
# sets are unordered; make deterministic
out: list[str] = []
for i, vv in enumerate(sorted(val, key=lambda x: repr(x))):
out.extend(_flatten(f"{key}[{i}]", vv))
return out
return [_scalar_str(key, val)]
return "\n".join(_flatten(k, v))
get_bf16_config_json(enabled=True)
⚓︎
get_deepspeed_adamw_optimizer_config_json(auto_config=True)
⚓︎
Get the deepspeed adamw optimizer config json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
auto_config
|
bool
|
Whether to use the auto config. Default: |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Deepspeed adamw optimizer config. |
Source code in src/ezpz/utils/__init__.py
def get_deepspeed_adamw_optimizer_config_json(
auto_config: Optional[bool] = True,
) -> dict:
"""
Get the deepspeed adamw optimizer config json.
Args:
auto_config (bool): Whether to use the auto config. Default: ``True``.
Returns:
dict: Deepspeed adamw optimizer config.
"""
return (
{"type": "AdamW"}
if not auto_config
else {
"type": "AdamW",
"params": {
"lr": "auto",
"weight_decay": "auto",
"torch_adam": True,
"adam_w_mode": True,
},
}
)
get_deepspeed_config_json(auto_config=True, gradient_accumulation_steps=1, gradient_clipping='auto', steps_per_print=10, train_batch_size='auto', train_micro_batch_size_per_gpu='auto', wall_clock_breakdown=False, wandb=True, bf16=True, fp16=None, flops_profiler=None, optimizer=None, scheduler=None, zero_optimization=None, stage=0, allgather_partitions=None, allgather_bucket_size=int(500000000.0), overlap_comm=None, reduce_scatter=True, reduce_bucket_size=int(500000000.0), contiguous_gradients=None, offload_param=None, offload_optimizer=None, stage3_max_live_parameters=int(1000000000.0), stage3_max_reuse_distance=int(1000000000.0), stage3_prefetch_bucket_size=int(500000000.0), stage3_param_persistence_threshold=int(1000000.0), sub_group_size=None, elastic_checkpoint=None, stage3_gather_16bit_weights_on_model_save=None, ignore_unused_parameters=None, round_robin_gradients=None, zero_hpz_partition_size=None, zero_quantized_weights=None, zero_quantized_gradients=None, log_trace_cache_warnings=None, save_config=True, output_file=None, output_dir=None)
⚓︎
Write a deepspeed config to the output directory.
Source code in src/ezpz/utils/__init__.py
def get_deepspeed_config_json(
auto_config: Optional[bool] = True,
gradient_accumulation_steps: int = 1,
gradient_clipping: Optional[str | float] = "auto",
steps_per_print: Optional[int] = 10,
train_batch_size: str = "auto",
train_micro_batch_size_per_gpu: str = "auto",
wall_clock_breakdown: bool = False,
wandb: bool = True, # NOTE: Opinionated, W&B is enabled by default
bf16: bool = True, # NOTE: Opinionated, BF16 is enabled by default
fp16: Optional[bool] = None,
flops_profiler: Optional[dict] = None,
optimizer: Optional[dict] = None,
scheduler: Optional[dict] = None,
zero_optimization: Optional[dict] = None,
stage: Optional[int] = 0,
allgather_partitions: Optional[bool] = None,
allgather_bucket_size: Optional[int] = int(5e8),
overlap_comm: Optional[bool] = None,
reduce_scatter: Optional[bool] = True,
reduce_bucket_size: Optional[int] = int(5e8),
contiguous_gradients: Optional[bool] = None,
offload_param: Optional[dict] = None,
offload_optimizer: Optional[dict] = None,
stage3_max_live_parameters: Optional[int] = int(1e9),
stage3_max_reuse_distance: Optional[int] = int(1e9),
stage3_prefetch_bucket_size: Optional[int] = int(5e8),
stage3_param_persistence_threshold: Optional[int] = int(1e6),
sub_group_size: Optional[int] = None,
elastic_checkpoint: Optional[dict] = None,
stage3_gather_16bit_weights_on_model_save: Optional[bool] = None,
ignore_unused_parameters: Optional[bool] = None,
round_robin_gradients: Optional[bool] = None,
zero_hpz_partition_size: Optional[int] = None,
zero_quantized_weights: Optional[bool] = None,
zero_quantized_gradients: Optional[bool] = None,
log_trace_cache_warnings: Optional[bool] = None,
save_config: bool = True,
output_file: Optional[str] = None,
output_dir: Optional[PathLike] = None,
) -> dict[str, Any]:
"""
Write a deepspeed config to the output directory.
"""
import json
wandb_config = {"enabled": wandb}
bf16_config = {"enabled": bf16}
fp16_config = {"enabled": fp16}
flops_profiler_config = (
get_flops_profiler_config_json()
if flops_profiler is None
else flops_profiler
)
optimizer = (
get_deepspeed_adamw_optimizer_config_json()
if optimizer is None
else optimizer
)
scheduler = (
get_deepspeed_warmup_decay_scheduler_config_json()
if scheduler is None
else scheduler
)
if stage is not None and int(stage) > 0:
zero_optimization = (
get_deepspeed_zero_config_json(
stage=stage,
allgather_partitions=allgather_partitions,
allgather_bucket_size=allgather_bucket_size,
overlap_comm=overlap_comm,
reduce_scatter=reduce_scatter,
reduce_bucket_size=reduce_bucket_size,
contiguous_gradients=contiguous_gradients,
offload_param=offload_param,
offload_optimizer=offload_optimizer,
stage3_max_live_parameters=stage3_max_live_parameters,
stage3_max_reuse_distance=stage3_max_reuse_distance,
stage3_prefetch_bucket_size=stage3_prefetch_bucket_size,
stage3_param_persistence_threshold=stage3_param_persistence_threshold,
sub_group_size=sub_group_size,
elastic_checkpoint=elastic_checkpoint,
stage3_gather_16bit_weights_on_model_save=stage3_gather_16bit_weights_on_model_save,
ignore_unused_parameters=ignore_unused_parameters,
round_robin_gradients=round_robin_gradients,
zero_hpz_partition_size=zero_hpz_partition_size,
zero_quantized_weights=zero_quantized_weights,
zero_quantized_gradients=zero_quantized_gradients,
log_trace_cache_warnings=log_trace_cache_warnings,
)
if zero_optimization is None
else zero_optimization
)
else:
zero_optimization = None
ds_config = {
"gradient_accumulation_steps": gradient_accumulation_steps,
"gradient_clipping": gradient_clipping,
"steps_per_print": steps_per_print,
"train_batch_size": train_batch_size,
"train_micro_batch_size_per_gpu": train_micro_batch_size_per_gpu,
"wall_clock_breakdown": wall_clock_breakdown,
"wandb": wandb,
"bf16": bf16,
"fp16": fp16,
"flops_profiler": flops_profiler,
"optimizer": optimizer,
"scheduler": scheduler,
"zero_optimization": zero_optimization,
}
if save_config:
if output_file is None:
if output_dir is None:
output_dir = Path(os.getcwd()).joinpath("ds_configs")
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
outfile = output_dir.joinpath("deepspeed_config.json")
else:
outfile = Path(output_file)
logger.info(f"Saving DeepSpeed config to: {outfile.as_posix()}")
logger.info(json.dumps(ds_config, indent=4))
with outfile.open("w") as f:
json.dump(
ds_config,
fp=f,
indent=4,
)
return ds_config
get_deepspeed_warmup_decay_scheduler_config_json(auto_config=True)
⚓︎
Get the deepspeed warmup decay scheduler config json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
auto_config
|
bool
|
Whether to use the auto config. Default: |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Deepspeed warmup decay scheduler config. |
Source code in src/ezpz/utils/__init__.py
def get_deepspeed_warmup_decay_scheduler_config_json(
auto_config: Optional[bool] = True,
) -> dict:
"""
Get the deepspeed warmup decay scheduler config json.
Args:
auto_config (bool): Whether to use the auto config. Default: ``True``.
Returns:
dict: Deepspeed warmup decay scheduler config.
"""
return (
{"type": "WarmupDecayLR"}
if not auto_config
else {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto",
"total_num_steps": "auto",
},
}
)
get_deepspeed_zero_config_json(zero_config)
⚓︎
get_flops_profiler_config_json(enabled=True, profile_step=1, module_depth=-1, top_modules=1, detailed=True)
⚓︎
Get the deepspeed flops profiler config json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enabled
|
bool
|
Whether to use the flops profiler. Default: |
True
|
profile_step
|
int
|
The step to profile. Default: |
1
|
module_depth
|
int
|
The depth of the module. Default: |
-1
|
top_modules
|
int
|
The number of top modules to show. Default: |
1
|
detailed
|
bool
|
Whether to show detailed profiling. Default: |
True
|
Returns:
| Name | Type | Description |
|---|---|---|
dict |
dict
|
Deepspeed flops profiler config. |
Source code in src/ezpz/utils/__init__.py
def get_flops_profiler_config_json(
enabled: bool = True,
profile_step: int = 1,
module_depth: int = -1,
top_modules: int = 1,
detailed: bool = True,
) -> dict:
"""
Get the deepspeed flops profiler config json.
Args:
enabled (bool): Whether to use the flops profiler. Default: ``True``.
profile_step (int): The step to profile. Default: ``1``.
module_depth (int): The depth of the module. Default: ``-1``.
top_modules (int): The number of top modules to show. Default: ``1``.
detailed (bool): Whether to show detailed profiling. Default: ``True``.
Returns:
dict: Deepspeed flops profiler config.
"""
return {
"enabled": enabled,
"profile_step": profile_step,
"module_depth": module_depth,
"top_modules": top_modules,
"detailed": detailed,
}
get_fp16_config_json(enabled=True)
⚓︎
get_max_memory_allocated(device)
⚓︎
Get the maximum memory allocated on the specified device.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
device
|
device
|
The device to check memory allocation for. |
required |
Source code in src/ezpz/utils/__init__.py
def get_max_memory_allocated(device: torch.device) -> float:
"""
Get the maximum memory allocated on the specified device.
Args:
device (torch.device): The device to check memory allocation for.
"""
if torch.cuda.is_available():
return torch.cuda.max_memory_allocated(device)
elif torch.xpu.is_available(): # and ipex is not None:
try:
import intel_extension_for_pytorch as ipex
return ipex.xpu.max_memory_allocated(device)
except ImportError:
return -1.0
raise RuntimeError(f"Memory allocation not available for {device=}")
get_timestamp(fstr=None)
⚓︎
Get formatted timestamp.
Returns the current date and time as a formatted string. By default, returns a timestamp in the format 'YYYY-MM-DD-HHMMSS'. A custom format string can be provided to change the output format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
fstr
|
str
|
Format string for strftime. If None, uses default format '%Y-%m-%d-%H%M%S'. Defaults to None. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
Formatted timestamp string. |
Examples:
>>> get_timestamp() # Returns something like '2023-12-01-143022'
>>> get_timestamp("%Y-%m-%d") # Returns something like '2023-12-01'
Source code in src/ezpz/utils/__init__.py
def get_timestamp(fstr: Optional[str] = None) -> str:
"""Get formatted timestamp.
Returns the current date and time as a formatted string. By default, returns
a timestamp in the format 'YYYY-MM-DD-HHMMSS'. A custom format string can
be provided to change the output format.
Args:
fstr (str, optional): Format string for strftime. If None, uses default
format '%Y-%m-%d-%H%M%S'. Defaults to None.
Returns:
str: Formatted timestamp string.
Examples:
>>> get_timestamp() # Returns something like '2023-12-01-143022'
>>> get_timestamp("%Y-%m-%d") # Returns something like '2023-12-01'
"""
import datetime
now = datetime.datetime.now()
return (
now.strftime("%Y-%m-%d-%H%M%S") if fstr is None else now.strftime(fstr)
)
grab_tensor(x, force=False)
⚓︎
Convert various tensor/array-like objects to numpy arrays.
This function converts different types of array-like objects (tensors, lists, etc.) to numpy arrays for consistent handling. Supports PyTorch tensors, numpy arrays, and nested lists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
x
|
Any
|
The object to convert to a numpy array. Can be None, scalar values, lists, numpy arrays, or PyTorch tensors. |
required |
force
|
bool
|
Force conversion even if it requires copying data. Defaults to False. |
False
|
Returns:
| Type | Description |
|---|---|
Union[ndarray, ScalarLike, None]
|
Union[np.ndarray, ScalarLike, None]: Numpy array representation of the input, or the original scalar value, or None if input was None. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If unable to convert a list to array. |
Examples:
>>> import torch
>>> import numpy as np
>>> grab_tensor([1, 2, 3])
array([1, 2, 3])
>>> grab_tensor(torch.tensor([1, 2, 3]))
array([1, 2, 3])
>>> grab_tensor(np.array([1, 2, 3]))
array([1, 2, 3])
Source code in src/ezpz/utils/__init__.py
def grab_tensor(
x: Any, force: bool = False
) -> Union[np.ndarray, ScalarLike, None]:
"""Convert various tensor/array-like objects to numpy arrays.
This function converts different types of array-like objects (tensors, lists, etc.)
to numpy arrays for consistent handling. Supports PyTorch tensors, numpy arrays,
and nested lists.
Args:
x (Any): The object to convert to a numpy array. Can be None, scalar values,
lists, numpy arrays, or PyTorch tensors.
force (bool, optional): Force conversion even if it requires copying data.
Defaults to False.
Returns:
Union[np.ndarray, ScalarLike, None]: Numpy array representation of the input,
or the original scalar value, or None if input was None.
Raises:
ValueError: If unable to convert a list to array.
Examples:
>>> import torch
>>> import numpy as np
>>> grab_tensor([1, 2, 3])
array([1, 2, 3])
>>> grab_tensor(torch.tensor([1, 2, 3]))
array([1, 2, 3])
>>> grab_tensor(np.array([1, 2, 3]))
array([1, 2, 3])
"""
if x is None:
return None
if isinstance(x, (int, float, bool, np.floating)):
return x
if isinstance(x, tuple):
x = list(x)
if isinstance(x, list):
if len(x) == 0:
return np.array([])
if isinstance(x[0], torch.Tensor):
return grab_tensor(torch.stack(x))
if isinstance(x[0], np.ndarray):
return np.stack(x)
if isinstance(x[0], (int, float, bool, np.floating)):
return np.array(x)
if isinstance(x[0], (tuple, list)):
return np.array(x)
else:
raise ValueError(f"Unable to convert list: \n {x=}\n to array")
# else:
# try:
# import tensorflow as tf # type:ignore
# except (ImportError, ModuleNotFoundError) as exc:
# raise exc
# if isinstance(x[0], tf.Tensor):
# return grab_tensor(tf.stack(x))
elif isinstance(x, np.ndarray):
return x
elif isinstance(x, torch.Tensor):
return x.numpy(force=force)
# return x.detach().cpu().numpy()
elif callable(getattr(x, "numpy", None)):
assert callable(getattr(x, "numpy"))
return x.numpy(force=force)
model_summary(model, verbose=False, depth=1, input_size=None)
⚓︎
Print a summary of the model using torchinfo.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
Any
|
The model to summarize. |
required |
verbose
|
bool
|
Whether to print the summary. Default: |
False
|
depth
|
int
|
The depth of the summary. Default: |
1
|
input_size
|
Optional[Sequence[int]]
|
The input size for the model. Default: |
None
|
Returns:
| Type | Description |
|---|---|
ModelStatistics | None
|
ModelStatistics | None: The model summary if torchinfo is available, otherwise None. |
Source code in src/ezpz/utils/__init__.py
def model_summary(
model: Any,
verbose: bool = False,
depth: int = 1,
input_size: Optional[Sequence[int]] = None,
) -> ModelStatistics | None:
"""
Print a summary of the model using torchinfo.
Args:
model: The model to summarize.
verbose (bool): Whether to print the summary. Default: ``False``.
depth (int): The depth of the summary. Default: ``1``.
input_size (Optional[Sequence[int]]): The input size for the model. Default: ``None``.
Returns:
ModelStatistics | None: The model summary if torchinfo is available, otherwise None.
"""
try:
from torchinfo import summary
return summary(
model,
input_size=input_size,
depth=depth,
verbose=verbose,
)
# logger.info(f'\n{summary_str}')
except (ImportError, ModuleNotFoundError):
logger.warning(
"torchinfo not installed, unable to print model summary!"
)
summarize_dict(d, precision=6, keys_to_skip=None)
⚓︎
Summarize a dictionary into a string with formatted key-value pairs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
d
|
dict
|
The dictionary to summarize. |
required |
precision
|
int
|
The precision for floating point values. Default: |
6
|
Returns:
| Name | Type | Description |
|---|---|---|
str |
str
|
A string representation of the dictionary with formatted key-value pairs. |
Source code in src/ezpz/utils/__init__.py
def summarize_dict(
d: dict,
precision: int = 6,
keys_to_skip: Iterable | None = None,
) -> str:
"""
Summarize a dictionary into a string with formatted key-value pairs.
Args:
d (dict): The dictionary to summarize.
precision (int): The precision for floating point values. Default: ``6``.
Returns:
str: A string representation of the dictionary with formatted key-value pairs.
"""
keys_to_skip = [] if keys_to_skip is None else keys_to_skip
return " ".join(
[
format_pair(k, v, precision=precision)
for k, v in d.items()
if k not in keys_to_skip
]
)
write_deepspeed_zero12_auto_config(zero_stage=1, output_dir=None)
⚓︎
Write a deepspeed zero1 auto config to the output directory.
Source code in src/ezpz/utils/__init__.py
def write_deepspeed_zero12_auto_config(
zero_stage: int = 1, output_dir: Optional[PathLike] = None
) -> dict:
"""
Write a deepspeed zero1 auto config to the output directory.
"""
import json
ds_config = {
"gradient_accumulation_steps": 1,
"gradient_clipping": "auto",
"steps_per_print": 1,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": True,
"wandb": {"enabled": True},
"bf16": {"enabled": True},
"flops_profiler": {
"enabled": True,
"profile_step": 1,
"module_depth": -1,
"top_modules": 1,
"detailed": True,
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": "auto",
"weight_decay": "auto",
"torch_adam": True,
"adam_w_mode": True,
},
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto",
"total_num_steps": "auto",
},
},
"zero_optimization": {
"stage": zero_stage,
"allgather_partitions": True,
"allgather_bucket_size": 2e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": "auto",
"contiguous_gradients": True,
},
}
if output_dir is None:
output_dir = Path(os.getcwd()).joinpath("ds_configs")
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
outfile = output_dir.joinpath(
f"deepspeed_zero{zero_stage}_auto_config.json"
)
logger.info(
f"Saving DeepSpeed ZeRO Stage {zero_stage} "
f"auto config to: {outfile.as_posix()}"
)
with outfile.open("w") as f:
json.dump(
ds_config,
fp=f,
indent=4,
)
return ds_config
write_deepspeed_zero3_auto_config(zero_stage=3, output_dir=None)
⚓︎
Write a deepspeed zero1 auto config to the output directory.
Source code in src/ezpz/utils/__init__.py
def write_deepspeed_zero3_auto_config(
zero_stage: int = 3, output_dir: Optional[PathLike] = None
) -> dict:
"""
Write a deepspeed zero1 auto config to the output directory.
"""
import json
ds_config = {
"gradient_accumulation_steps": 1,
"gradient_clipping": "auto",
"steps_per_print": 1,
"train_batch_size": "auto",
"train_micro_batch_size_per_gpu": "auto",
"wall_clock_breakdown": True,
"wandb": {"enabled": True},
"bf16": {"enabled": True},
"flops_profiler": {
"enabled": True,
"profile_step": 1,
"module_depth": -1,
"top_modules": 1,
"detailed": True,
},
"optimizer": {
"type": "AdamW",
"params": {
"lr": "auto",
"weight_decay": "auto",
"torch_adam": True,
"adam_w_mode": True,
},
},
"scheduler": {
"type": "WarmupDecayLR",
"params": {
"warmup_min_lr": "auto",
"warmup_max_lr": "auto",
"warmup_num_steps": "auto",
"total_num_steps": "auto",
},
},
"zero_optimization": {
"stage": zero_stage,
"allgather_partitions": True,
"allgather_bucket_size": 2e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": "auto",
"contiguous_gradients": True,
},
}
if output_dir is None:
output_dir = Path(os.getcwd()).joinpath("ds_configs")
output_dir = Path(output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
outfile = output_dir.joinpath(
f"deepspeed_zero{zero_stage}_auto_config.json"
)
logger.info(
f"Saving DeepSpeed ZeRO Stage {zero_stage} "
f"auto config to: {outfile.as_posix()}"
)
with outfile.open("w") as f:
json.dump(
ds_config,
fp=f,
indent=4,
)
return ds_config
write_generic_deepspeed_config(gradient_accumulation_steps=1, gradient_clipping='auto', steps_per_print=10, train_batch_size='auto', train_micro_batch_size_per_gpu='auto', wall_clock_breakdown=False, wandb=None, bf16=None, fp16=None, flops_profiler=None, optimizer=None, scheduler=None, zero_optimization=None)
⚓︎
Write a generic deepspeed config to the output directory.
Source code in src/ezpz/utils/__init__.py
def write_generic_deepspeed_config(
gradient_accumulation_steps: int = 1,
gradient_clipping: str | float = "auto",
steps_per_print: int = 10,
train_batch_size: str = "auto",
train_micro_batch_size_per_gpu: str = "auto",
wall_clock_breakdown: bool = False,
wandb: Optional[dict] = None,
bf16: Optional[dict] = None,
fp16: Optional[dict] = None,
flops_profiler: Optional[dict] = None,
optimizer: Optional[dict] = None,
scheduler: Optional[dict] = None,
zero_optimization: Optional[dict] = None,
):
"""
Write a generic deepspeed config to the output directory.
"""
ds_config = {
"gradient_accumulation_steps": gradient_accumulation_steps,
"gradient_clipping": gradient_clipping,
"steps_per_print": steps_per_print,
"train_batch_size": train_batch_size,
"train_micro_batch_size_per_gpu": train_micro_batch_size_per_gpu,
"wall_clock_breakdown": wall_clock_breakdown,
"wandb": wandb,
"bf16": bf16,
"fp16": fp16,
"flops_profiler": flops_profiler,
"optimizer": optimizer,
"scheduler": scheduler,
"zero_optimization": zero_optimization,
}
return ds_config