Skip to content

Commit 4158816

Browse files
Gavin Zhangfacebook-github-bot
authored andcommitted
refactor the total norm computation in grad clipping in APS (#3243)
Summary: Pull Request resolved: #3243 Refactored the previous code for applying gradient clipping across ddp and fsdp parameter. Added a new funciton _compute_total_norm() that takes in the replicated and sharded params provided in the gradientclippingOpitmizer class and computes the total gradient norm of the given parameter. Differential Revision: D79128843
1 parent 6ecb3ee commit 4158816

File tree

1 file changed

+79
-77
lines changed

1 file changed

+79
-77
lines changed

torchrec/optim/clipping.py

Lines changed: 79 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -135,98 +135,101 @@ def step(self, closure: Any = None) -> None:
135135
super().step(closure)
136136
self._step_num += 1
137137

138-
@torch.no_grad()
139138
def clip_grad_norm_(self) -> Optional[Union[float, torch.Tensor]]:
140139
"""Clip the gradient norm of all parameters."""
141-
max_norm = self._max_gradient
142-
norm_type = float(self._norm_type)
140+
141+
# converts self._norm_type to a float if it's a string. Used in the case where self._norm_type is 'inf'.
142+
norm_type_float = float(self._norm_type)
143143
all_grads = []
144+
sharded_grads = {}
144145
total_grad_norm = None
145146

146-
# Process distributed parameters and gradients
147-
for pgs, dist_params in self._sharded_params.items():
148-
sharded_grads = [
149-
p.grad._local_tensor if isinstance(p.grad, DTensor) else p.grad
150-
for p in dist_params
151-
if p.grad is not None and p.grad.numel() > 0
152-
]
153-
if len(sharded_grads) == 0:
154-
continue
155-
all_grads.extend(sharded_grads)
156-
157-
sharded_grad_norm = _batch_cal_norm(
158-
sharded_grads,
159-
max_norm,
160-
norm_type,
161-
pgs,
162-
)
163-
total_grad_norm = (
164-
sharded_grad_norm
165-
if total_grad_norm is None
166-
else (
167-
torch.maximum(total_grad_norm, sharded_grad_norm)
168-
if norm_type == torch.inf
169-
else total_grad_norm + sharded_grad_norm
170-
)
171-
)
147+
sharded_params = self._sharded_params
148+
replicate_params = self._replicate_params
172149

173-
square_sharded_grad_norm = total_grad_norm if total_grad_norm is not None else 0
150+
# Process distributed parameters and gradients
151+
sharded_grads = {
152+
pgs: _get_grads(dist_params) for pgs, dist_params in sharded_params.items()
153+
}
154+
all_grads.extend(*sharded_grads.values())
174155

175156
# Process replicated parameters and gradients
176-
if self._replicate_params:
177-
replicated_grads = [
178-
p.grad._local_tensor if isinstance(p.grad, DTensor) else p.grad
179-
for p in self._replicate_params
180-
if p.grad is not None and p.grad.numel() > 0
181-
]
182-
all_grads.extend(replicated_grads)
183-
184-
replicated_grad_norm = _batch_cal_norm(
185-
replicated_grads,
186-
max_norm,
187-
norm_type,
188-
None,
189-
)
190-
total_grad_norm = (
191-
replicated_grad_norm
192-
if total_grad_norm is None
193-
else (
194-
torch.maximum(total_grad_norm, replicated_grad_norm)
195-
if norm_type == torch.inf
196-
else total_grad_norm + replicated_grad_norm
197-
)
198-
)
199-
square_replicated_grad_norm = replicated_grad_norm
200-
else:
201-
square_replicated_grad_norm = 0
202-
203-
global log_grad_norm
204-
if log_grad_norm:
205-
if total_grad_norm is not None and norm_type != torch.inf:
206-
# pyre-ignore[58]
207-
grad_norm = total_grad_norm ** (1.0 / norm_type)
208-
else:
209-
grad_norm = total_grad_norm
157+
replicate_grads = _get_grads(replicate_params)
158+
all_grads.extend(replicate_grads)
210159

211-
rank = dist.get_rank()
212-
logger.info(
213-
f"Clipping [rank={rank}, step={self._step_num}]: square_sharded_grad_norm = {square_sharded_grad_norm}, square_replicated_grad_norm = {square_replicated_grad_norm}, total_grad_norm = {grad_norm}"
214-
)
215-
216-
# Aggregation
217-
if total_grad_norm is None:
218-
return
160+
total_grad_norm = _compute_total_norm(
161+
replicate_grads, sharded_grads, norm_type_float, self._max_gradient
162+
)
219163

220-
if norm_type != torch.inf:
221-
# pyre-ignore [58]: ** is not supported for operand types torch._tensor.Tensor and float.
222-
total_grad_norm = total_grad_norm ** (1.0 / norm_type)
223164
# pyre-ignore [58]: / is not supported for operand types float and Union[float, torch._tensor.Tensor].
224-
clip_coef = cast(torch.Tensor, max_norm / (total_grad_norm + 1e-6))
165+
clip_coef = cast(torch.Tensor, self._max_gradient / (total_grad_norm + 1e-6))
225166
clip_coef_clamped = torch.clamp(clip_coef, max=1.0)
226167
torch._foreach_mul_(all_grads, clip_coef_clamped)
227168
return total_grad_norm
228169

229170

171+
def _get_grads(
172+
param_list: List[torch.Tensor],
173+
) -> List[torch.Tensor]:
174+
"""Get the gradients of a list of parameters. Converts DTensors to local tensors if needed."""
175+
grads = [
176+
p.grad._local_tensor if isinstance(p.grad, DTensor) else p.grad
177+
for p in param_list
178+
if p.grad is not None and p.grad.numel() > 0
179+
]
180+
return grads
181+
182+
183+
def _compute_total_norm(
184+
replicate_grads: List[torch.Tensor],
185+
sharded_grads: Dict[Tuple[dist.ProcessGroup], List[torch.Tensor]],
186+
norm_type: float = 2.0, # can be a normal float, or torch.inf
187+
max_grad_norm: float = 1.0,
188+
) -> torch.Tensor:
189+
"""
190+
Given both replicate grads and sharded grads, compute the total norm of the gradients of the full replicate params and the
191+
full sharded param (parameters with a process group).
192+
193+
Args:
194+
replicate_grads (List[torch.Tensor]): list of gradients for replicate params
195+
sharded_grads ([Dict[Tuple[dist.ProcessGroup], List[torch.Tensor]]]): dict that maps each process group to a list of gradients for sharded params
196+
norm_type (float): type of the used p-norm. Can be torch.inf for infinity norm.
197+
max_grad_norm (float): max gradient norm.
198+
"""
199+
200+
## compute the norm |W|^p corresponding to all sharded params W
201+
sharded_grad_norm: torch.Tensor = torch.tensor(0.0)
202+
combine_sharded_norm_operator = (
203+
torch.maximum if norm_type == torch.inf else torch.add
204+
)
205+
206+
# We need to move sharded_grad_norm to the same device as the first shard so that we can do addition (or take max)
207+
# this is specifically for the case where sharded_grad_norm is 0, and replicate_grad_norm is not,
208+
# because by default torch.tensor(0.0) is on cpu, and replicate_grad_norm is on GPU. For MTIA
209+
# specifically, adding a tensor on cpu and a tensor on GPU will result in an error.
210+
for pgs, dist_params in sharded_grads.items():
211+
current_shard_norm = _batch_cal_norm(dist_params, max_grad_norm, norm_type, pgs)
212+
sharded_grad_norm = combine_sharded_norm_operator(
213+
sharded_grad_norm.to(current_shard_norm.device), current_shard_norm
214+
)
215+
# compute |W|^p corresponding to all replicate params W
216+
# Similar to the case above, we move replicate_grad_norm to the same device as sharded_grad_norm so that we can do addition.
217+
replicate_grad_norm: torch.Tensor = (
218+
_batch_cal_norm(replicate_grads, max_grad_norm, norm_type)
219+
if replicate_grads
220+
else torch.tensor(0.0)
221+
).to(sharded_grad_norm.device)
222+
223+
combine_norm_operator = (
224+
torch.maximum
225+
if norm_type == torch.inf
226+
else lambda a, b: torch.add(a, b).pow(1.0 / norm_type)
227+
)
228+
229+
total_grad_norm = combine_norm_operator(replicate_grad_norm, sharded_grad_norm)
230+
return total_grad_norm
231+
232+
230233
def _batch_cal_norm(
231234
grad_list: List[torch.Tensor],
232235
max_norm: float,
@@ -236,7 +239,6 @@ def _batch_cal_norm(
236239
"""Helper function that calculates the norm of a list of gradients in batches. If process_groups
237240
are passed in, the norm will be aggregated across all ranks in the process group.
238241
"""
239-
240242
global use_64bit_grad_norm
241243
if use_64bit_grad_norm:
242244
grad_norms = torch.linalg.vector_norm(

0 commit comments

Comments
 (0)