Source code for opacus.optimizers.ddp_perlayeroptimizer

# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from functools import partial
from typing import Callable, List, Optional

import torch
from torch import nn
from torch.optim import Optimizer

from .ddpoptimizer import DistributedDPOptimizer
from .optimizer import DPOptimizer, _generate_noise
from .perlayeroptimizer import DPPerLayerOptimizer


def _clip_and_accumulate_parameter(p: nn.Parameter, max_grad_norm: float):
    per_sample_norms = p.grad_sample.view(len(p.grad_sample), -1).norm(2, dim=-1)
    per_sample_clip_factor = (max_grad_norm / (per_sample_norms + 1e-6)).clamp(max=1.0)

    grad = torch.einsum("i,i...", per_sample_clip_factor, p.grad_sample)
    if p.summed_grad is not None:
        p.summed_grad += grad
    else:
        p.summed_grad = grad


[docs] class SimpleDistributedPerLayerOptimizer(DPPerLayerOptimizer, DistributedDPOptimizer): def __init__( self, optimizer: Optimizer, *, noise_multiplier: float, max_grad_norm: float, expected_batch_size: Optional[int], loss_reduction: str = "mean", generator=None, secure_mode: bool = False, ): self.rank = torch.distributed.get_rank() self.world_size = torch.distributed.get_world_size() super().__init__( optimizer, noise_multiplier=noise_multiplier, max_grad_norm=max_grad_norm, expected_batch_size=expected_batch_size, loss_reduction=loss_reduction, generator=generator, secure_mode=secure_mode, )
[docs] class DistributedPerLayerOptimizer(DPOptimizer): """ :class:`~opacus.optimizers.optimizer.DPOptimizer` that implements per layer clipping strategy and is compatible with distributed data parallel """ def __init__( self, optimizer: Optimizer, *, noise_multiplier: float, max_grad_norm: List[float], expected_batch_size: Optional[int], loss_reduction: str = "mean", generator=None, secure_mode: bool = False, ): self.rank = torch.distributed.get_rank() self.world_size = torch.distributed.get_world_size() self.max_grad_norms = max_grad_norm max_grad_norm = torch.norm(torch.Tensor(self.max_grad_norms), p=2).item() super().__init__( optimizer, noise_multiplier=noise_multiplier, max_grad_norm=max_grad_norm, expected_batch_size=expected_batch_size, loss_reduction=loss_reduction, generator=generator, secure_mode=secure_mode, ) self._register_hooks() def _add_noise_parameter(self, p: nn.Parameter): """ The reason why we need self is because of generator for secure_mode """ noise = _generate_noise( std=self.noise_multiplier * self.max_grad_norm, reference=p.summed_grad, generator=None, secure_mode=self.secure_mode, ) p.grad = p.summed_grad + noise @property def accumulated_iterations(self) -> int: return max([p.accumulated_iterations for p in self.params]) def _scale_grad_parameter(self, p: nn.Parameter): if not hasattr(p, "accumulated_iterations"): p.accumulated_iterations = 0 p.accumulated_iterations += 1 if self.loss_reduction == "mean": p.grad /= ( self.expected_batch_size * p.accumulated_iterations * self.world_size )
[docs] def clip_and_accumulate(self): raise NotImplementedError( "Clip and accumulate is added per layer in DPDDP Per Layer." )
[docs] def add_noise(self): raise NotImplementedError("Noise is added per layer in DPDDP Per Layer.")
[docs] def pre_step( self, closure: Optional[Callable[[], float]] = None ) -> Optional[float]: if self._check_skip_next_step(): self._is_last_step_skipped = True return False if self.step_hook: self.step_hook(self) for p in self.params: p.accumulated_iterations = 0 self._is_last_step_skipped = False return True
def _ddp_per_layer_hook( self, p: nn.Parameter, max_grad_norm: float, _: torch.Tensor ): _clip_and_accumulate_parameter(p, max_grad_norm) # Equivalent ot _check_skip_next_step but without popping because it has to be done for every parameter p if self._check_skip_next_step(pop_next=False): return if self.rank == 0: self._add_noise_parameter(p) else: p.grad = p.summed_grad self._scale_grad_parameter(p) return p.grad def _register_hooks(self): for p, max_grad_norm in zip(self.params, self.max_grad_norms): if not p.requires_grad: continue if not hasattr(p, "ddp_hooks"): p.ddp_hooks = [] p.ddp_hooks.append( p.register_hook(partial(self._ddp_per_layer_hook, p, max_grad_norm)) )