Source code for opacus.privacy_engine

#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import warnings
from itertools import chain
from typing import IO, Any, BinaryIO, Dict, List, Optional, Tuple, Union

import torch
from opacus.accountants import create_accountant
from opacus.accountants.utils import get_noise_multiplier
from opacus.data_loader import DPDataLoader, switch_generator
from opacus.distributed import DifferentiallyPrivateDistributedDataParallel as DPDDP
from opacus.grad_sample import (
    AbstractGradSampleModule,
    GradSampleModule,
    get_gsm_class,
    wrap_model,
)
from opacus.optimizers import DPOptimizer, get_optimizer_class
from opacus.schedulers import _GradClipScheduler, _NoiseScheduler
from opacus.utils.fast_gradient_clipping_utils import DPLossFastGradientClipping
from opacus.validators.module_validator import ModuleValidator
from torch import nn, optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader


[docs] class PrivacyEngine: """ Main entry point to the Opacus API - use ``PrivacyEngine`` to enable differential privacy for your model training. ``PrivacyEngine`` object encapsulates current privacy state (privacy budget + method it's been calculated) and exposes ``make_private`` method to wrap your PyTorch training objects with their private counterparts. Example: >>> dataloader = demo_dataloader >>> model = MyCustomModel() >>> optimizer = torch.optim.SGD(model.parameters(), lr=0.05) >>> privacy_engine = PrivacyEngine() >>> >>> model, optimizer, dataloader = privacy_engine.make_private( ... module=model, ... optimizer=optimizer, ... data_loader=dataloader, ... noise_multiplier=1.0, ... max_grad_norm=1.0, ... ) >>> # continue training as normal """ def __init__(self, *, accountant: str = "prv", secure_mode: bool = False): """ Args: accountant: Accounting mechanism. Currently supported: - rdp (:class:`~opacus.accountants.RDPAccountant`) - gdp (:class:`~opacus.accountants.GaussianAccountant`) - prv (:class`~opacus.accountants.PRVAccountant`) secure_mode: Set to ``True`` if cryptographically strong DP guarantee is required. ``secure_mode=True`` uses secure random number generator for noise and shuffling (as opposed to pseudo-rng in vanilla PyTorch) and prevents certain floating-point arithmetic-based attacks. See :meth:`~opacus.optimizers.optimizer._generate_noise` for details. When set to ``True`` requires ``torchcsprng`` to be installed """ self.accountant = create_accountant(mechanism=accountant) self.secure_mode = secure_mode self.secure_rng = None self.dataset = None # only used to detect switching to a different dataset if self.secure_mode: try: import torchcsprng as csprng except ImportError as e: msg = ( "To use secure RNG, you must install the torchcsprng package! " "Check out the instructions here: https://github.com/pytorch/csprng#installation" ) raise ImportError(msg) from e self.secure_rng = csprng.create_random_device_generator("/dev/urandom") else: warnings.warn( "Secure RNG turned off. This is perfectly fine for experimentation as it allows " "for much faster training performance, but remember to turn it on and retrain " "one last time before production with ``secure_mode`` turned on." ) def _prepare_optimizer( self, *, optimizer: optim.Optimizer, noise_multiplier: float, max_grad_norm: Union[float, List[float]], expected_batch_size: int, loss_reduction: str = "mean", distributed: bool = False, clipping: str = "flat", noise_generator=None, grad_sample_mode="hooks", **kwargs, ) -> DPOptimizer: if isinstance(optimizer, DPOptimizer): optimizer = optimizer.original_optimizer generator = None if self.secure_mode: generator = self.secure_rng elif noise_generator is not None: generator = noise_generator optim_class = get_optimizer_class( clipping=clipping, distributed=distributed, grad_sample_mode=grad_sample_mode, ) return optim_class( optimizer=optimizer, noise_multiplier=noise_multiplier, max_grad_norm=max_grad_norm, expected_batch_size=expected_batch_size, loss_reduction=loss_reduction, generator=generator, secure_mode=self.secure_mode, **kwargs, ) def _prepare_data_loader( self, data_loader: DataLoader, *, poisson_sampling: bool, distributed: bool, ) -> DataLoader: if self.dataset is None: self.dataset = data_loader.dataset elif self.dataset != data_loader.dataset: warnings.warn( f"PrivacyEngine detected new dataset object. " f"Was: {self.dataset}, got: {data_loader.dataset}. " f"Privacy accounting works per dataset, please initialize " f"new PrivacyEngine if you're using different dataset. " f"You can ignore this warning if two datasets above " f"represent the same logical dataset" ) if poisson_sampling: return DPDataLoader.from_data_loader( data_loader, generator=self.secure_rng, distributed=distributed ) elif self.secure_mode: return switch_generator(data_loader=data_loader, generator=self.secure_rng) else: return data_loader def _prepare_model( self, module: nn.Module, *, batch_first: bool = True, max_grad_norm: Union[float, List[float]] = 1.0, loss_reduction: str = "mean", grad_sample_mode: str = "hooks", ) -> AbstractGradSampleModule: # Ideally, validation should have been taken care of by calling # `get_compatible_module()` self.validate(module=module, optimizer=None, data_loader=None) # wrap if isinstance(module, AbstractGradSampleModule): if ( module.batch_first != batch_first or module.loss_reduction != loss_reduction or type(module) is not get_gsm_class(grad_sample_mode) ): raise ValueError( f"Pre-existing GradSampleModule doesn't match new arguments." f"Got: module.batch_first: {module.batch_first}, module.loss_reduction: {module.loss_reduction}, type(module): {type(module)}" f"Requested: batch_first:{batch_first}, loss_reduction: {loss_reduction}, grad_sample_mode: {grad_sample_mode} " f"Please pass vanilla nn.Module instead" ) return module else: if grad_sample_mode == "ghost": return wrap_model( module, grad_sample_mode=grad_sample_mode, batch_first=batch_first, loss_reduction=loss_reduction, max_grad_norm=max_grad_norm, ) else: return wrap_model( module, grad_sample_mode=grad_sample_mode, batch_first=batch_first, loss_reduction=loss_reduction, )
[docs] def is_compatible( self, *, module: nn.Module, optimizer: Optional[optim.Optimizer], data_loader: Optional[DataLoader], ) -> bool: """ Check if task components are compatible with DP. Args: module: module to be checked optimizer: optimizer to be checked data_loader: data_loader to be checked Returns: ``True`` if compatible, ``False`` otherwise """ return ModuleValidator.is_valid(module)
[docs] def validate( self, *, module: nn.Module, optimizer: Optional[optim.Optimizer], data_loader: Optional[DataLoader], ): """ Validate that task components are compatible with DP. Same as ``is_compatible()``, but raises error instead of returning bool. Args: module: module to be checked optimizer: optimizer to be checked data_loader: data_loader to be checked Raises: UnsupportedModuleError If one or more modules found to be incompatible """ ModuleValidator.validate(module, strict=True)
[docs] @classmethod def get_compatible_module(cls, module: nn.Module) -> nn.Module: """ Return a privacy engine compatible module. Also validates the module after running registered fixes. Args: module: module to be modified Returns: Module with some submodules replaced for their deep copies or close equivalents. See :class:`~opacus.validators.module_validator.ModuleValidator` for more details """ module = ModuleValidator.fix(module) ModuleValidator.validate(module, strict=True) return module
[docs] def make_private( self, *, module: nn.Module, optimizer: optim.Optimizer, criterion=nn.CrossEntropyLoss(), # Added deafult for backward compatibility data_loader: DataLoader, noise_multiplier: float, max_grad_norm: Union[float, List[float]], batch_first: bool = True, loss_reduction: str = "mean", poisson_sampling: bool = True, clipping: str = "flat", noise_generator=None, grad_sample_mode: str = "hooks", **kwargs, ) -> Tuple[GradSampleModule, DPOptimizer, DataLoader]: """ Add privacy-related responsibilities to the main PyTorch training objects: model, optimizer, and the data loader. All of the returned objects act just like their non-private counterparts passed as arguments, but with added DP tasks. - Model is wrapped to also compute per sample gradients. - Optimizer is now responsible for gradient clipping and adding noise to the gradients. - DataLoader is updated to perform Poisson sampling. Notes: Using any other models, optimizers, or data sources during training will invalidate stated privacy guarantees. Args: module: PyTorch module to be used for training optimizer: Optimizer to be used for training data_loader: DataLoader to be used for training noise_multiplier: The ratio of the standard deviation of the Gaussian noise to the L2-sensitivity of the function to which the noise is added (How much noise to add) max_grad_norm: The maximum norm of the per-sample gradients. Any gradient with norm higher than this will be clipped to this value. batch_first: Flag to indicate if the input tensor to the corresponding module has the first dimension representing the batch. If set to True, dimensions on input tensor are expected be ``[batch_size, ...]``, otherwise ``[K, batch_size, ...]`` loss_reduction: Indicates if the loss reduction (for aggregating the gradients) is a sum or a mean operation. Can take values "sum" or "mean" poisson_sampling: ``True`` if you want to use standard sampling required for DP guarantees. Setting ``False`` will leave provided data_loader unchanged. Technically this doesn't fit the assumptions made by privacy accounting mechanism, but it can be a good approximation when using Poisson sampling is unfeasible. clipping: Per sample gradient clipping mechanism ("flat" or "per_layer" or "adaptive"). Flat clipping calculates the norm of the entire gradient over all parameters, per layer clipping sets individual norms for every parameter tensor, and adaptive clipping updates clipping bound per iteration. Flat clipping is usually preferred, but using per layer clipping in combination with distributed training can provide notable performance gains. noise_generator: torch.Generator() object used as a source of randomness for the noise grad_sample_mode: mode for computing per sample gradients. Determines the implementation class for the wrapped ``module``. See :class:`~opacus.grad_sample.gsm_base.AbstractGradSampleModule` for more details Returns: Tuple of (model, optimizer, data_loader). Model is a wrapper around the original model that also computes per sample gradients Optimizer is a wrapper around the original optimizer that also does gradient clipping and noise addition to the gradients DataLoader is a brand new DataLoader object, constructed to behave as equivalent to the original data loader, possibly with updated sampling mechanism. Points to the same dataset object. """ if noise_generator and self.secure_mode: raise ValueError("Passing seed is prohibited in secure mode") # compare module parameter with optimizer parameters model_parameters = set(module.parameters()) for p in chain.from_iterable( [param_group["params"] for param_group in optimizer.param_groups] ): if p not in model_parameters: raise ValueError( "Module parameters are different than optimizer Parameters" ) distributed = isinstance(module, (DPDDP, DDP)) module = self._prepare_model( module, batch_first=batch_first, max_grad_norm=max_grad_norm, loss_reduction=loss_reduction, grad_sample_mode=grad_sample_mode, ) if poisson_sampling: module.forbid_grad_accumulation() data_loader = self._prepare_data_loader( data_loader, distributed=distributed, poisson_sampling=poisson_sampling ) sample_rate = 1 / len(data_loader) expected_batch_size = int(len(data_loader.dataset) * sample_rate) # expected_batch_size is the *per worker* batch size if distributed: world_size = torch.distributed.get_world_size() expected_batch_size /= world_size optimizer = self._prepare_optimizer( optimizer=optimizer, noise_multiplier=noise_multiplier, max_grad_norm=max_grad_norm, expected_batch_size=expected_batch_size, loss_reduction=loss_reduction, noise_generator=noise_generator, distributed=distributed, clipping=clipping, grad_sample_mode=grad_sample_mode, **kwargs, ) optimizer.attach_step_hook( self.accountant.get_optimizer_hook_fn(sample_rate=sample_rate) ) if grad_sample_mode == "ghost": criterion = DPLossFastGradientClipping( module, optimizer, criterion, loss_reduction ) return module, optimizer, criterion, data_loader return module, optimizer, data_loader
[docs] def make_private_with_epsilon( self, *, module: nn.Module, optimizer: optim.Optimizer, criterion=nn.CrossEntropyLoss(), # Added deafult for backward compatibility data_loader: DataLoader, target_epsilon: float, target_delta: float, epochs: int, max_grad_norm: Union[float, List[float]], batch_first: bool = True, loss_reduction: str = "mean", poisson_sampling: bool = True, clipping: str = "flat", noise_generator=None, grad_sample_mode: str = "hooks", **kwargs, ): """ Version of :meth:`~opacus.privacy_engine.PrivacyEngine.make_private`, that calculates privacy parameters based on a given privacy budget. For the full documentation see :meth:`~opacus.privacy_engine.PrivacyEngine.make_private` Args: module: PyTorch module to be used for training optimizer: Optimizer to be used for training data_loader: DataLoader to be used for training target_epsilon: Target epsilon to be achieved, a metric of privacy loss at differential changes in data. target_delta: Target delta to be achieved. Probability of information being leaked. epochs: Number of training epochs you intend to perform; noise_multiplier relies on this to calculate an appropriate sigma to ensure privacy budget of (target_epsilon, target_delta) at the end of epochs. max_grad_norm: The maximum norm of the per-sample gradients. Any gradient with norm higher than this will be clipped to this value. batch_first: Flag to indicate if the input tensor to the corresponding module has the first dimension representing the batch. If set to True, dimensions on input tensor are expected be ``[batch_size, ...]``, otherwise ``[K, batch_size, ...]`` loss_reduction: Indicates if the loss reduction (for aggregating the gradients) is a sum or a mean operation. Can take values "sum" or "mean" poisson_sampling: ``True`` if you want to use standard sampling required for DP guarantees. Setting ``False`` will leave provided data_loader unchanged. Technically this doesn't fit the assumptions made by privacy accounting mechanism, but it can be a good approximation when using Poisson sampling is unfeasible. clipping: Per sample gradient clipping mechanism ("flat" or "per_layer" or "adaptive"). Flat clipping calculates the norm of the entire gradient over all parameters, per layer clipping sets individual norms for every parameter tensor, and adaptive clipping updates clipping bound per iteration. Flat clipping is usually preferred, but using per layer clipping in combination with distributed training can provide notable performance gains. noise_generator: torch.Generator() object used as a source of randomness for the noise grad_sample_mode: mode for computing per sample gradients. Determines the implementation class for the wrapped ``module``. See :class:`~opacus.grad_sample.gsm_base.AbstractGradSampleModule` for more details Returns: Tuple of (model, optimizer, data_loader). Model is a wrapper around the original model that also computes per sample gradients Optimizer is a wrapper around the original optimizer that also does gradient clipping and noise addition to the gradients DataLoader is a brand new DataLoader object, constructed to behave as equivalent to the original data loader, possibly with updated sampling mechanism. Points to the same dataset object. """ sample_rate = 1 / len(data_loader) if len(self.accountant) > 0: warnings.warn( "You're calling make_private_with_epsilon with non-zero privacy budget " "already spent. Returned noise_multiplier assumes zero starting point, " "so your overall privacy budget will be higher." ) return self.make_private( module=module, optimizer=optimizer, data_loader=data_loader, criterion=criterion, noise_multiplier=get_noise_multiplier( target_epsilon=target_epsilon, target_delta=target_delta, sample_rate=sample_rate, epochs=epochs, accountant=self.accountant.mechanism(), **kwargs, ), max_grad_norm=max_grad_norm, batch_first=batch_first, loss_reduction=loss_reduction, noise_generator=noise_generator, grad_sample_mode=grad_sample_mode, poisson_sampling=poisson_sampling, clipping=clipping, **kwargs, )
[docs] def get_epsilon(self, delta): """ Computes the (epsilon, delta) privacy budget spent so far. Args: delta: The target delta. Returns: Privacy budget (epsilon) expended so far. """ return self.accountant.get_epsilon(delta)
[docs] def save_checkpoint( self, *, path: Union[str, os.PathLike, BinaryIO, IO[bytes]], module: GradSampleModule, optimizer: Optional[DPOptimizer] = None, noise_scheduler: Optional[_NoiseScheduler] = None, grad_clip_scheduler: Optional[_GradClipScheduler] = None, checkpoint_dict: Optional[Dict[str, Any]] = None, module_state_dict_kwargs: Optional[Dict[str, Any]] = None, torch_save_kwargs: Optional[Dict[str, Any]] = None, ): """ Saves the state_dict of module, optimizer, and accountant at path. Args: path: Path to save the state dict objects. module: GradSampleModule to save; wrapped module's state_dict is saved. optimizer: DPOptimizer to save; wrapped optimizer's state_dict is saved. noise_scheduler: _NoiseScheduler whose state we should save. grad_clip_scheduler: _GradClipScheduler whose state we should save. checkpoint_dict: Dict[str, Any]; an already-filled checkpoint dict. module_state_dict_kwargs: dict of kwargs to pass to ``module.state_dict()`` torch_save_kwargs: dict of kwargs to pass to ``torch.save()`` """ checkpoint_dict = checkpoint_dict or {} checkpoint_dict["module_state_dict"] = module.state_dict( **(module_state_dict_kwargs or {}) ) checkpoint_dict["privacy_accountant_state_dict"] = self.accountant.state_dict() if optimizer is not None: checkpoint_dict["optimizer_state_dict"] = optimizer.state_dict() if noise_scheduler is not None: checkpoint_dict["noise_scheduler_state_dict"] = noise_scheduler.state_dict() if grad_clip_scheduler is not None: checkpoint_dict["grad_clip_scheduler_state_dict"] = ( grad_clip_scheduler.state_dict() ) torch.save(checkpoint_dict, path, **(torch_save_kwargs or {}))
def load_checkpoint( self, *, path: Union[str, os.PathLike, BinaryIO, IO[bytes]], module: GradSampleModule, optimizer: Optional[DPOptimizer] = None, noise_scheduler: Optional[_NoiseScheduler] = None, grad_clip_scheduler: Optional[_GradClipScheduler] = None, module_load_dict_kwargs: Optional[Dict[str, Any]] = None, torch_load_kwargs: Optional[Dict[str, Any]] = None, ) -> Dict: checkpoint = torch.load(path, **(torch_load_kwargs or {})) module.load_state_dict( checkpoint["module_state_dict"], **(module_load_dict_kwargs or {}) ) self.accountant.load_state_dict(checkpoint["privacy_accountant_state_dict"]) optimizer_state_dict = checkpoint.pop("optimizer_state_dict", {}) if optimizer is not None and len(optimizer_state_dict) > 0: optimizer.load_state_dict(optimizer_state_dict) elif (optimizer is not None) ^ (len(optimizer_state_dict) > 0): # warn if only one of them is available warnings.warn( f"optimizer_state_dict has {len(optimizer_state_dict)} items" f" but optimizer is {'' if optimizer else 'not'} provided." ) noise_scheduler_state_dict = checkpoint.pop("noise_scheduler_state_dict", {}) if noise_scheduler is not None and len(noise_scheduler_state_dict) > 0: noise_scheduler.load_state_dict(noise_scheduler_state_dict) grad_clip_scheduler_state_dict = checkpoint.pop( "grad_clip_scheduler_state_dict", {} ) if grad_clip_scheduler is not None and len(grad_clip_scheduler_state_dict) > 0: grad_clip_scheduler.load_state_dict(grad_clip_scheduler_state_dict) return checkpoint