# Copyright 2025 - Oumi## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.importgcimportmathimportosfromdataclassesimportdataclassfrompathlibimportPathfromtypingimportAny,NamedTuple,Optional,TypeVar,Union,castimportnumpyasnpimportnumpy.typingasnptimporttorchfromoumi.utils.device_utilsimportget_nvidia_gpu_memory_utilizationfromoumi.utils.loggingimportloggerfromoumi.utils.str_utilsimportcompute_utf8_len
[docs]defdevice_cleanup()->None:"""Empties gpu cache, good to do before and after training for cleanup."""logger.debug("Running garbage collection.")gc.collect()iftorch.cuda.is_available():logger.debug("Cleaning up GPU memory.")logger.debug("GPU memory occupied before cleanup: "f"{get_nvidia_gpu_memory_utilization()} MiB")torch.cuda.empty_cache()logger.debug(f"Memory after cleanup: {get_nvidia_gpu_memory_utilization()} MiB")eliftorch.backends.mps.is_available():logger.debug("Cleaning up MPS memory.")torch.mps.empty_cache()
[docs]deflimit_per_process_memory(percent:float=0.95)->None:"""Limits process memory by a certain percentage. On Windows and WSL, there's a pool of 'shared gpu memory'. This pool is using the RAM (slow) on one's machine rather than actual VRAM (fast). Setting this value ensures your machine never uses the slow memory and OOMs instead. Note that this may not be needed on Linux machines since this is an OS-level feature. """iftorch.cuda.is_available():torch.cuda.set_per_process_memory_fraction(percent)
[docs]defformat_cudnn_version(v:Optional[int])->str:"""Formats the cuDNN version number. Args: v: The cuDNN version number. Returns: A formatted string. """ifvisNone:return""return".".join(map(str,(v//1000,v//100%10,v%100)))
[docs]deflog_versioning_info()->None:"""Logs misc versioning information."""logger.info(f"Torch version: {torch.__version__}. NumPy version: {np.__version__}")ifnottorch.cuda.is_available():logger.info("CUDA is not available!")return# pyright seems to have an issue with torch==2.5.1# torch.version is always available, but pyright doesn't know thatifhasattr(torch,"version"):logger.info(f"CUDA version: {torch.version.cuda} ")# type: ignore# For AMD GPUs, these functions return ROCm, MlOpen versions respectively.logger.info(f"CuDNN version: {format_cudnn_version(torch.backends.cudnn.version())}")
[docs]deflog_devices_info(filepath:Optional[Path]=None)->None:"""Logs high-level info about all available accelerator devices."""ifnottorch.cuda.is_available():returnncpus=os.cpu_count()num_devices=torch.cuda.device_count()log_lines=[f"CPU cores: {ncpus} CUDA devices: {num_devices}"]def_mem_to_gib(x):returnround(float(x)/1024**3,2)foriinrange(num_devices):device_name=torch.cuda.get_device_name(i)mem_free,mem_total=torch.cuda.mem_get_info(i)mem_allocated=torch.cuda.memory_allocated(i)mem_reserved=torch.cuda.memory_reserved(i)capability=torch.cuda.get_device_capability(i)log_lines.append(f"device({i})='{device_name}' "f"Capability: {capability} "f"Memory: [Total: {_mem_to_gib(mem_total)}GiB "f"Free: {_mem_to_gib(mem_free)}GiB "f"Allocated: {_mem_to_gib(mem_allocated)}GiB "f"Cached: {_mem_to_gib(mem_reserved)}GiB]")all_text="\n".join(log_lines)logger.info(all_text)iffilepath:withfilepath.open("w",encoding="utf-8")asf:f.write(all_text)
[docs]deflog_peak_gpu_memory():"""Log the peak GPU memory usage."""iftorch.cuda.is_available():peak_memory=torch.cuda.max_memory_allocated()/1024**3# Convert to GBlogger.info(f"Peak GPU memory usage: {peak_memory:.2f} GB")
[docs]defcreate_model_summary(model:Any)->str:"""Creates a model summary as a free-formed string."""lines=["Model summary:",repr(model),""]module_lines=[f"{name} ({type(layer)})"forname,layerinmodel.named_modules()]lines.append(f"Modules ({len(module_lines)}):")lines.extend(module_lines)lines.append("")# TODO: Consider whether to use `torchsummary` library here.# Caveat: it may require sample inputs/shapes, and other aux info.return"\n".join(lines)
[docs]deflog_model_summary(model,filepath:Optional[Path]=None)->None:"""Logs a model summary."""model_summary=create_model_summary(model)logger.info(model_summary)iffilepath:withfilepath.open("w",encoding="utf-8")asf:f.write(model_summary)
[docs]defget_device_name()->str:"""Returns the name of the device, assuming all are identical."""device_name="CPU"iftorch.cuda.is_available():# Assume all devices are identicaldevice_name=torch.cuda.get_device_name(0)eliftorch.backends.mps.is_available():device_name="MPS"returndevice_name
[docs]def__post_init__(self):"""Ensure that the parameters are valid."""forname,valuein[("all_params",self.all_params),("trainable_params",self.trainable_params),("embedding_params",self.embedding_params),]:ifvalue<0:raiseValueError(f"`{name}` ({value}) must be >= 0.")ifself.trainable_params>self.all_params:raiseValueError(f"`trainable_params` ({self.trainable_params}) cannot be "f"greater than `all_params` ({self.all_params}).")ifself.embedding_params>self.all_params:raiseValueError(f"`embedding_params` ({self.embedding_params}) cannot be "f"greater than `all_params` ({self.all_params}).")
@propertydeftrainable_params_percent(self)->float:"""Percentage of trainable parameters [0.0, 100.0]."""ifself.all_params==0:return0.0return100*self.trainable_params/self.all_params@propertydeffrozen_params_percent(self)->float:"""Percentage of frozen parameters [0.0, 100.0]."""return100.0-self.trainable_params_percent
def_get_parameter_names(model:torch.nn.Module,forbidden_layer_types:list[Any])->list[str]:"""Returns the names of the model parameters that are not inside a forbidden layer. Borrowed from https://github.com/huggingface/transformers/blob/main/src/transformers/trainer.py. """result=[]forname,childinmodel.named_children():result+=[f"{name}.{n}"fornin_get_parameter_names(child,forbidden_layer_types)ifnotisinstance(child,tuple(forbidden_layer_types))]# Add model specific parameters (defined with nn.Parameter) since they are not in# any child.result+=list(model._parameters.keys())returnresult
[docs]defcount_model_parameters(model:torch.nn.Module)->ModelParameterCount:"""Creates a basic counter of the parameters in a neural model. Args: model: The torch-implemented neural network. Returns: ModelParameterCount: A ModelParameterCount for the underlying model. """trainable_params=0all_params=0embedding_params=0embedding_layer_names=[]forname,moduleinmodel.named_modules():ifisinstance(module,torch.nn.Embedding):# Embedding layers appear in named_parameters with ".weight" at the endparam_name=f"{name}.weight"ifnameelse"weight"embedding_layer_names.append(param_name)forname,paraminmodel.named_parameters():param_count=param.numel()all_params+=param_countifparam.requires_grad:trainable_params+=param_countifnameinembedding_layer_names:embedding_params+=param_countreturnModelParameterCount(all_params=all_params,trainable_params=trainable_params,embedding_params=embedding_params,)
[docs]deflog_number_of_model_parameters(model:torch.nn.Module,use_icons:bool=True)->None:"""Logs the number of parameters of the model. Args: model: The torch-implemented neural network. use_icons: Whether to display emojis/icons in the log output. """params=count_model_parameters(model)# Icons if enabled, else fallback to plain texttotal_label="🔢 Total"ifuse_iconselse"Total"embedding_label="🔗 Embedding"ifuse_iconselse"Embedding"trainable_label="🎯 Trainable"ifuse_iconselse"Trainable"frozen_label="🔒 Frozen"ifuse_iconselse"Frozen"n_space=11ifuse_iconselse9logger.info(f"\nModel Parameters Summary:\n"f"{total_label:<{n_space}} parameters: {params.all_params:,}\n"f"{embedding_label:<{n_space}} parameters: {params.embedding_params:,}\n"f"{trainable_label:<{n_space}} parameters: {params.trainable_params:,}\n"f"{frozen_label:<{n_space}} parameters: "f"{params.all_params-params.trainable_params:,} "f"({params.frozen_params_percent:.2f}%)\n")
[docs]defget_torch_dtype(torch_dtype_str:str)->torch.dtype:"""Converts string dtype to torch.dtype."""torch_dtype_str=torch_dtype_str.lower()iftorch_dtype_strin["f64","float64","double"]:returntorch.float64eliftorch_dtype_strin["f32","float32","float"]:returntorch.float32eliftorch_dtype_strin["bf16","bfloat16"]:returntorch.bfloat16eliftorch_dtype_strin["f16","float16","half"]:returntorch.float16eliftorch_dtype_strin["uint8"]:returntorch.uint8else:raiseValueError(f"Unsupported torch dtype: {torch_dtype_str}")
[docs]defget_dtype_size_in_bytes(dtype:Union[str,torch.dtype,npt.DTypeLike],)->int:"""Returns size of this dtype in bytes."""ifisinstance(dtype,torch.dtype):returndtype.itemsizeelifisinstance(dtype,str):ifnotdtype:raiseValueError("Empty string is not a valid dtype")try:# Try to parse using non-standard names like "f64"returnget_torch_dtype(dtype).itemsizeexceptValueError:returnnp.dtype(dtype).itemsizereturnnp.dtype(dtype).itemsize
[docs]defestimate_sample_dict_size_in_bytes(sample:dict[str,Any])->int:"""Estimates the approximate total number of bytes in a provided sample. Training sample is expected to be a dictionary, where a value is a list, tensor, or a numpy array. The function works in best effort mode i.e., 100% accuaracy isn't guaranteed. The implementation is slow, and shouldn't be called in performance-sensitive code. """result=0forkey,valinsample.items():result+=compute_utf8_len(key)result+=_estimate_item_size_in_bytes(val)returnresult
[docs]defcoerce_model_to_dtype(model:torch.nn.Module,dtype:torch.dtype)->None:"""Coerces the model to the desired dtype. This is needed as a temporary workaround to support QLoRA FSDP training. See: https://github.com/huggingface/accelerate/issues/1620#issuecomment-2407102051 """forname,moduleinmodel.named_modules():try:module.to(dtype)exceptExceptionase:logger.warning(f"Failed to coerce module {name} to dtype {dtype}. Error: {e}")
T=TypeVar("T")
[docs]defconvert_to_list_of_tensors(values:list[T])->list[torch.Tensor]:"""Converts a list of array-like objects into alist of torch tensors."""iflen(values)==0:return[]first_item=values[0]ifisinstance(first_item,torch.Tensor):return[cast(torch.Tensor,item)foriteminvalues]ifisinstance(first_item,np.ndarray):return[torch.from_numpy(item)foriteminvalues]elifisinstance(first_item,list):return[torch.from_numpy(np.asarray(item))foriteminvalues]raiseValueError(f"Unsupported element type: {type(first_item)}. ""Must be numpy array, torch tensor, or Python list.")
[docs]defpad_sequences_right_side(sequences:list[T],*,padding_value:float=0)->torch.Tensor:"""Pads a list of variable-length tensors to a single tensor. Appends `padding_value` to the right side of each sequence to expand to the longest length. Args: sequences: list of variable length sequences. padding_value: value for padded elements. Default: 0. Returns: A tensor with shape (B, L, ...), where B is a batch size (`len(sequences)`), L is the longest length (`max(len(sequences[i]))`) """returnpad_sequences(sequences,padding_value=padding_value,padding_side="right")
[docs]defpad_sequences_left_side(sequences:list[T],*,padding_value:float=0)->torch.Tensor:"""Pads a list of variable-length tensors to a single tensor. Prepends `padding_value` to the left side of each sequence to expand to the longest length. Args: sequences: list of variable length sequences. padding_value: value for padded elements. Default: 0. Returns: A tensor with shape (B, L, ...), where B is a batch size (`len(sequences)`), L is the longest length (`max(len(sequences[i]))`) """returnpad_sequences(sequences,padding_value=padding_value,padding_side="left")
[docs]defpad_sequences(sequences:list[T],*,padding_value:float=0,padding_side:Optional[str]=None)->torch.Tensor:"""Pads a list of variable-length tensors to a single tensor. Args: sequences: list of variable length sequences. padding_value: value for padded elements. Default: 0. padding_side: side to apply padding to. Valid values: 'right', 'left'. If unspecified (`None`), defaults to `right`. Returns: A tensor with shape (B, L, ...), where B is a batch size (`len(sequences)`), L is the longest length (`max(len(sequences[i]))`) """ifnotpadding_side:padding_side="right"ifpadding_sidenotin("right","left"):raiseValueError(f"Unsupported padding side: '{padding_side}'. ""Valid values: 'right', 'left'.")iflen(sequences)==0:raiseValueError("Empty list is not allowed.")tensor_sequences=convert_to_list_of_tensors(sequences)try:returntorch.nn.utils.rnn.pad_sequence(tensor_sequences,batch_first=True,padding_value=padding_value,padding_side=padding_side,)exceptRuntimeError:logger.error("Failed to pad sequences with the shapes: "+", ".join([f"{t.shape}"fortintensor_sequences]))raise
class_DimMinMaxSizes(NamedTuple):dim_index:intmin_size:intmax_size:int@propertydefhas_variable_sizes(self)->bool:returnself.min_size!=self.max_sizedef_get_dims_min_max_size(tensors_list:list[torch.Tensor])->list[_DimMinMaxSizes]:num_tensors=len(tensors_list)ifnum_tensors<=0:return[]first_shape=tensors_list[0].shapemin_dim_sizes=list(first_shape)max_dim_sizes=list(first_shape)num_dims=len(min_dim_sizes)fortensor_idxinrange(num_tensors-1):curr_shape=tensors_list[tensor_idx+1].shapeifnum_dims!=len(curr_shape):raiseValueError("Tensors have different number of dimensions: "f"{num_dims} vs {len(curr_shape)}! "f"Shapes: {first_shape}, {curr_shape}")foridxinrange(num_dims):min_dim_sizes[idx]=min(min_dim_sizes[idx],curr_shape[idx])max_dim_sizes[idx]=max(max_dim_sizes[idx],curr_shape[idx])return[_DimMinMaxSizes(dim_index=idx,min_size=min_dim_sizes[idx],max_size=max_dim_sizes[idx])foridxinrange(num_dims)]def_format_dims_min_max_sizes(dim_sizes:list[_DimMinMaxSizes])->str:result:list[str]=[""]*len(dim_sizes)foridx,iteminenumerate(dim_sizes):result[idx]=(f"{item.min_size}...{item.max_size}"ifitem.has_variable_sizeselsef"{item.min_size}")return"["+", ".join(result)+"]"def_pad_to_max_dim_and_stack_impl(tensors_list:list[torch.Tensor],*,max_variable_sized_dims:int,padding_value:float,pad_on_left_side:bool,)->torch.Tensor:num_tensors=len(tensors_list)ifnum_tensors==0:raiseValueError("Empty list of tensors is not allowed.")dim_sizes:list[_DimMinMaxSizes]=_get_dims_min_max_size(tensors_list)num_variable_size_dims=sum((1ifitem.has_variable_sizeselse0)foritemindim_sizes)if(max_variable_sized_dims>=0andnum_variable_size_dims>max_variable_sized_dims):raiseValueError("Too many dimensions with variable size. "f"Got: {num_variable_size_dims} variable size dimensions. "f"Maximum allowed: {max_variable_sized_dims}. "f"Dimension sizes: {_format_dims_min_max_sizes(dim_sizes)}.")ifnum_variable_size_dims==0:# No need to pad anything, just `stack()`.returntorch.stack(tensors_list)elifnum_variable_size_dims==1anddim_sizes[0].has_variable_sizes:# Use pad_sequences provided by PyTorch, which should be equivalent# for the common case.ifpad_on_left_side:returnpad_sequences_left_side(tensors_list,padding_value=padding_value)returnpad_sequences_right_side(tensors_list,padding_value=padding_value)max_dim_sizes=[item.max_sizeforitemindim_sizes]result_shape=torch.Size([num_tensors]+max_dim_sizes)result=torch.full(result_shape,padding_value,dtype=tensors_list[0].dtype,device=tensors_list[0].device,)max_dim_sizes=torch.Size(max_dim_sizes)fori,input_tensorinenumerate(tensors_list):target=result.select(0,i)ifmax_dim_sizes==input_tensor.shape:target[...]=input_tensorcontinuetarget_view=target[...]fordim_idx,curr_sizeinenumerate(input_tensor.shape):max_size=max_dim_sizes[dim_idx]ifcurr_size<max_size:start_idx=(max_size-curr_size)ifpad_on_left_sideelse0target_view=target_view.narrow(dim_idx,start=start_idx,length=curr_size)asserttarget_view.shape==input_tensor.shapetarget_view[...]=input_tensorreturnresult
[docs]defpad_to_max_dim_and_stack(tensors_list:list[T],*,max_variable_sized_dims:int=-1,padding_value:float=0,padding_side:Optional[str]=None,)->torch.Tensor:"""Stacks variable-length tensors to a single tensor with dimension expansion. Some examples: 1) Two tensors with shapes [24,8], [32,8] are combined to [2,32,8]. 2) Two tensors with shapes [24,1,8], [32,4,8] are combined to [2,32,4,8]. 3) Three tensors with shapes [7,3,5],[8,2,6],[9,1,7] are combined to [3,9,3,7]. For 1D input tensors, the function is equivalent to `pad_sequences()`. If all tensors have the same shape and no padding is required, then the function is equivalent to `torch.stack()`. Args: tensors_list: list of tensors with potentially . max_variable_sized_dims: Maximum number of variable-sized dimensions. Negative values mean `Unlimited`. If you know that your tensors have a pre-defined number `N` of variable-sized dimensions (e.g., 1 for `sequence_length`) then it's a good idea to set this parameter to catch abnormal inputs (`ValueError` will be raised in such cases). padding_value: value for padded elements. Default: 0. padding_side: side to apply padding to. Valid values: 'right', 'left'. If unspecified (`None`), defaults to `right`. Returns: A tensor with shape (B, L, ...), where B is a batch size (`len(sequences)`), L is the longest length (`max(len(sequences[i]))`) """pad_on_left_side:bool=Falseifnotpadding_sideorpadding_side=="right":pad_on_left_side=Falseelifpadding_side=="left":pad_on_left_side=Trueelse:raiseValueError(f"Unsupported padding side: '{padding_side}'. ""Valid values: 'right', 'left'.")input_tensors=convert_to_list_of_tensors(tensors_list)try:return_pad_to_max_dim_and_stack_impl(input_tensors,max_variable_sized_dims=max_variable_sized_dims,padding_value=padding_value,pad_on_left_side=pad_on_left_side,)exceptRuntimeError:logger.error("Failed to pad and stack tensors with the shapes: "+", ".join([f"{t.shape}"fortininput_tensors]))raise
[docs]defcreate_ones_like(values:T,)->T:"""Converts an array-like object into an object of the same type filled with 1-s. Supports nested lists, in which case all elements must be of the same type. """ifisinstance(values,torch.Tensor):returntorch.ones_like(values)elifisinstance(values,np.ndarray):returnnp.ones_like(values)elifnotisinstance(values,list):raiseValueError(f"Unsupported type: {type(values)}. ""Must be numpy array, torch tensor, or Python list.")iflen(values)==0:returncast(T,[])first_item=values[0]ifisinstance(first_item,(int,float)):result=list(np.ones_like(values))else:# Nested listfirst_item_type=type(first_item)result=[]foridx,iteminenumerate(values):ifidx>0andnotisinstance(item,first_item_type):raiseValueError("Sequence contains elements of different types: "f"{first_item_type} and {type(item)}.")result.append(create_ones_like(item))returncast(T,result)
[docs]defget_first_dim_len(x:Any)->int:"""Returns length of the first dimension."""ifisinstance(x,(torch.Tensor,np.ndarray)):returnint(x.shape[0])elifisinstance(x,list):returnlen(x)raiseValueError(f"Unsupported type: {type(x)}. ""Must be numpy array, torch tensor, or Python list.")
[docs]defget_shape_as_list(x:Any)->list[int]:"""Returns shape of an object (tensor or numpy array) as Python list."""ifisinstance(x,(torch.Tensor,np.ndarray)):returnlist(x.shape)raiseValueError(f"Unsupported type: {type(x)}. Must be numpy array, torch tensor.")
class_FreezeModelLayer:def__init__(self,name:str,freeze_it:bool):self.name:str=nameself.freeze_it:bool=freeze_itself.children:list[_FreezeModelLayer]=[]def_freeze_model_layers_impl(module:torch.nn.Module,freeze_layers:list[_FreezeModelLayer],parent_path:str)->int:result:int=0formodel_layerinfreeze_layers:full_layer_path=((parent_path+"."+model_layer.name)ifparent_pathelsemodel_layer.name)ifhasattr(module,model_layer.name):child_module=getattr(module,model_layer.name)ifmodel_layer.freeze_it:logger.info(f"Freezing layer '{full_layer_path}'...")forparaminchild_module.parameters(recurse=True):param.requires_grad_(False)result+=1eliflen(model_layer.children)>0:result+=_freeze_model_layers_impl(child_module,model_layer.children,full_layer_path)else:logger.warning(f"Layer '{full_layer_path}' not found in model.")returnresultdef_group_freeze_model_layers(freeze_layers:list[str])->list[_FreezeModelLayer]:dummy_root:_FreezeModelLayer=_FreezeModelLayer(name="",freeze_it=False)# Build a tree of nested layers.forlayer_nameinfreeze_layers:layer:_FreezeModelLayer=dummy_rootall_parts=list(layer_name.split("."))foridx,curr_partinenumerate(all_parts):next_layer=next((xforxinlayer.childrenifx.name==curr_part),None)# If it's the last part, let's freeze this layer.freeze_it=idx+1>=len(all_parts)ifnext_layerisNone:next_layer=_FreezeModelLayer(name=curr_part,freeze_it=freeze_it)layer.children.append(next_layer)eliffreeze_it:next_layer.freeze_it=Truelayer=next_layerreturndummy_root.children
[docs]deffreeze_model_layers(model:torch.nn.Module,freeze_layers:list[str])->int:"""Recursively freezes model layers. Args: model: A model to freeze layers in. freeze_layers: A list of layer names to freeze. Nested layers can be specified using a dot ('.') separator. For example, "visual.child.grandchild". Layer names not found in the model are ignored. Returns: The total number of layers successfully frozen. """root_freeze_layers=_group_freeze_model_layers(freeze_layers)return_freeze_model_layers_impl(model,root_freeze_layers,"")