# Copyright 2025 - Oumi
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
from functools import cache
from typing import Any, Optional
import torch
import torch.distributed as dist
import torch.nn.functional as F
__all__ = ["update_out_and_lse", "RingComm", "get_default_args"]
# Derived from
# zhuzilin/ring-flash-attention/ring_flash_attn/utils.py
[docs]
@cache
def get_default_args(func) -> dict[str, Any]:
"""Get the default arguments of a function."""
spec = inspect.getfullargspec(func)
defaults = spec.defaults if spec.defaults is not None else ()
padded_defaults = (None,) * (len(spec.args) - len(defaults)) + defaults
args: dict[str, Any] = dict(zip(spec.args, padded_defaults))
if "softcap" in args:
args["softcap"] = 0.0
return args
@torch.jit.script
def _update_out_and_lse(
out: torch.Tensor,
lse: torch.Tensor,
block_out: torch.Tensor,
block_lse: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
block_out = block_out.to(torch.float32)
block_lse = block_lse.transpose(-2, -1).unsqueeze(dim=-1)
# new_lse = lse + torch.log(1 + torch.exp(block_lse - lse))
# torch.exp(lse - new_lse) * out + torch.exp(block_lse - new_lse) * block_out
# For additional context and discussion, please refer to:
# https://github.com/zhuzilin/ring-flash-attention/pull/34#issuecomment-2076126795
out = out - F.sigmoid(block_lse - lse) * (out - block_out)
lse = lse - F.logsigmoid(lse - block_lse)
return out, lse
[docs]
def update_out_and_lse(
out: Optional[torch.Tensor],
lse: Optional[torch.Tensor],
block_out: torch.Tensor,
block_lse: torch.Tensor,
slice_=None,
) -> tuple[torch.Tensor, torch.Tensor]:
"""Update the output and log-sum-exp of the attention."""
if out is None:
if slice_ is not None:
raise RuntimeError("first update_out_and_lse should not pass slice_ args")
out = block_out.to(torch.float32)
lse = block_lse.transpose(-2, -1).unsqueeze(dim=-1)
elif lse is None:
raise ValueError("`lse` can be None only if `out` is None")
elif slice_ is not None:
slice_out, slice_lse = out[slice_], lse[slice_]
slice_out, slice_lse = _update_out_and_lse(
slice_out, slice_lse, block_out, block_lse
)
out[slice_], lse[slice_] = slice_out, slice_lse
else:
out, lse = _update_out_and_lse(out, lse, block_out, block_lse)
return out, lse # type: ignore
@torch.jit.script
def flatten_varlen_lse(lse, cu_seqlens):
"""Flatten the log-sum-exp of the attention."""
new_lse = []
for i in range(len(cu_seqlens) - 1):
start, end = cu_seqlens[i], cu_seqlens[i + 1]
new_lse.append(lse[i, :, : end - start])
return torch.cat(new_lse, dim=1)
@torch.jit.script
def unflatten_varlen_lse(lse, cu_seqlens, max_seqlen: int):
"""Unflatten the log-sum-exp of the attention."""
num_seq = len(cu_seqlens) - 1
num_head = lse.shape[-2]
new_lse = torch.empty(
(num_seq, max_seqlen, num_head, 1), dtype=torch.float32, device=lse.device
)
for i in range(num_seq):
start, end = cu_seqlens[i], cu_seqlens[i + 1]
new_lse[i, : end - start] = lse[start:end]
return new_lse.squeeze(dim=-1).transpose(1, 2).contiguous()
[docs]
class RingComm:
"""Ring communication."""
def __init__(self, process_group: dist.ProcessGroup):
"""Initialize the ring communication."""
self._process_group = process_group
self._ops = []
self.rank = dist.get_rank(self._process_group)
self.world_size = dist.get_world_size(self._process_group)
self._reqs = None
self.send_rank = (self.rank + 1) % self.world_size
self.recv_rank = (self.rank - 1) % self.world_size
if process_group is not None:
self.send_rank = dist.get_global_rank(self._process_group, self.send_rank)
self.recv_rank = dist.get_global_rank(self._process_group, self.recv_rank)
[docs]
def send_recv(
self, to_send: torch.Tensor, recv_tensor: Optional[torch.Tensor] = None
) -> torch.Tensor:
"""Send and receive a tensor."""
if recv_tensor is None:
res = torch.empty_like(to_send)
else:
res = recv_tensor
send_op = dist.P2POp(
dist.isend, to_send, self.send_rank, group=self._process_group
)
recv_op = dist.P2POp(dist.irecv, res, self.recv_rank, group=self._process_group)
self._ops.append(send_op)
self._ops.append(recv_op)
return res
[docs]
def commit(self):
"""Commit the operations."""
if self._reqs is not None:
raise RuntimeError("commit called twice")
self._reqs = dist.batch_isend_irecv(self._ops)
[docs]
def wait(self):
"""Wait for the operations to complete."""
if self._reqs is None:
raise RuntimeError("wait called before commit")
for req in self._reqs:
req.wait()
self._reqs = None
self._ops = []