oumi.performance#

Submodules#

oumi.performance.mfu module#

Based on MFU from PaLM paper: https://arxiv.org/pdf/2204.02311.

oumi.performance.mfu.calculate_mfu(device_name: str, num_devices: int, dtype: dtype, num_params: int, num_tokens: int, delta_time_seconds: float, num_layers: int | None = None, num_attention_heads: int | None = None, attention_head_size: int | None = None, sequence_length: int | None = None, add_rematerialization: bool = False) float[source]#

Returns the number of MFU for the given model configuration.

oumi.performance.mfu.calculate_mfu_from_model_flops_per_second(device_name: str, num_devices: int, dtype: dtype, model_flops_per_second_on_all_devices: float) float[source]#

Returns the number of MFU for the given model flops per second.

oumi.performance.telemetry module#

class oumi.performance.telemetry.CudaTimerContext(name: str, measurements: list[float] | None = None)[source]#

Bases: ContextDecorator

A context manager and decorator for timing CUDA operations.

__enter__() CudaTimerContext[source]#

Starts the CUDA timer.

__exit__(*exc) bool[source]#

Stops the CUDA timer and records the elapsed time.

class oumi.performance.telemetry.TelemetryState(*, start_time: float = None, hostname: str = None, measurements: dict[str, list[float]] = None, cuda_measurements: dict[str, list[float]] = None, gpu_memory: list[dict[str, float]] = None, gpu_temperature: list[float] = None)[source]#

Bases: BaseModel

cuda_measurements: dict[str, list[float]]#
gpu_memory: list[dict[str, float]]#
gpu_temperature: list[float]#
hostname: str#
measurements: dict[str, list[float]]#
model_computed_fields: ClassVar[Dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[Dict[str, FieldInfo]] = {'cuda_measurements': FieldInfo(annotation=dict[str, list[float]], required=False, default_factory=dict), 'gpu_memory': FieldInfo(annotation=list[dict[str, float]], required=False, default_factory=list), 'gpu_temperature': FieldInfo(annotation=list[float], required=False, default_factory=list), 'hostname': FieldInfo(annotation=str, required=False, default_factory=builtin_function_or_method), 'measurements': FieldInfo(annotation=dict[str, list[float]], required=False, default_factory=dict), 'start_time': FieldInfo(annotation=float, required=False, default_factory=builtin_function_or_method)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo] objects.

This replaces Model.__fields__ from Pydantic V1.

start_time: float#
class oumi.performance.telemetry.TelemetryTracker[source]#

Bases: object

A class for tracking various telemetry metrics.

compute_cross_rank_summaries(rank_summaries: list[dict[str, Any]], *, measurement_names: set[str] | dict[str, Any]) dict[str, Any][source]#

Computes a cross-rank summary from summaries produced by individual ranks.

For example, it can be used to compute distribution of {“gpu_temperature”: {“max”}} over ranks.

Parameters:
  • rank_summaries – An array of summaries indexed by rank e.g., returned by the get_summaries_from_all_ranks() method.

  • measurement_names

    A hierarchy of measurment names of interest, which must match the hierarchical naming structure in rank_summaries.

    For example:

    • 1 level: {"total_time"}

    • 2 levels: {"gpu_temperature": {"max", "median"}}

    • 3 levels: {"timers": { "compile": {"mean"}, "forward": {"max", "min"}}}

Returns:

A dictionary containing the statistics specified in measurement_names, and aggregated across ranks. The returned object can be nested (e.g., a dictionary of dictionaries) with potentially multiple levels of nesting, forming a tree whose structure mimics the structure of measurement_names with one additional layer containing cross-rank stats.

For example, if input measurement_names is {"gpu_temperature": {"max", "median"}} then the returned value will look as follows:

{
    "gpu_temperature":{
        "max": { "count": 7, "max": 75, ... },
        "median": { "count": 7, "max": 68, ... }
    }
}

cuda_timer(name: str) CudaTimerContext[source]#

Creates a CUDA benchmark with the given name.

Parameters:

name – The name of the benchmark.

Returns:

A CudaTimerContext object.

get_state_dicts_from_all_ranks() list[dict][source]#

Returns an array of state_dict-s from all ranks.

To work correctly in distributed environment, the method must be called by all ranks. If distributed training is not used then returns an array with 1 element (the current rank’s state_dict).

Returns:

A list of state_dict-s indexed by rank.

get_summaries_from_all_ranks() list[dict[str, Any]][source]#

Returns an array of telemetry summaries from all ranks.

To work correctly in distributed environment, the method must be called by all ranks. If distributed training is not used then returns an array with 1 element (the current rank’s summary).

Returns:

A list of telemetry summaries indexed by rank.

get_summary() dict[str, Any][source]#

Returns a summary of the telemetry statistics.

Returns:

A dictionary containing the summary statistics.

load_state_dict(state_dict: dict) None[source]#

Loads TelemetryState from state_dict.

log_gpu_memory(custom_logger: Callable | None = None) None[source]#

Logs the GPU memory usage.

Parameters:

custom_logger – A custom logging function. If None, store in self.gpu_memory.

print_summary() None[source]#

Prints a summary of the telemetry statistics.

record_gpu_temperature() float[source]#

Records the current GPU temperature.

Returns:

GPU temperature, in degrees Celsius.

state_dict() dict[source]#

Returns the TelemetryState as a dict.

timer(name: str) TimerContext[source]#

Creates a timer with the given name.

Parameters:

name – The name of the timer.

Returns:

A TimerContext object.

class oumi.performance.telemetry.TimerContext(name: str, measurements: list[float] | None = None)[source]#

Bases: ContextDecorator

A context manager and decorator for timing CPU code execution.

__enter__() TimerContext[source]#

Starts the timer.

__exit__(*exc) bool[source]#

Stops the timer and records the elapsed time.

oumi.performance.telemetry.gpu_memory_logger(user_function: Callable, synchronize: bool = True) Callable[source]#

Decorator function that logs the GPU memory usage of a given function.

Parameters:
  • user_function – The function to be decorated.

  • synchronize – Flag indicating whether to synchronize GPU operations before measuring memory usage. Defaults to True.

Returns:

The decorated function.

oumi.performance.torch_profiler_utils module#

oumi.performance.torch_profiler_utils.torch_profile(params: ProfilerParams, *, training_output_dir: str | None, record_function_name: str = 'oumi.train')[source]#

Creates PyTorch Profiler context manager.

Parameters:
  • params – Profiler config.

  • training_output_dir – If ProfilerParams.save_dir is not specified, then a “profiler” sub-directory will be created under training_output_dir, and used to save profiler traces.

  • record_function_name – The name to use with torch.profiler.record_function() for top-level train() operation.

Yields:

torch.profiler.profile or None

The newly-created Profiler object if profiling

is enabled, or None otherwise.

Example

To profile a training loop:

with torch_profile(params, record_function_name="oumi.train") as prof:
    for i in range(n):
        training_step()
        if prof is not None:
            prof.step()