# Data Parallelism

> Source: https://aiwiki.ai/wiki/data_parallelism
> Updated: 2026-06-21
> Categories: AI Infrastructure, Deep Learning, Machine Learning
> From AI Wiki (https://aiwiki.ai), a free encyclopedia of artificial intelligence. Quote with attribution.

Data parallelism is a distributed training technique in which the same neural network model is replicated across multiple processing units (typically GPUs), each device trains on a different shard of the input data, and the resulting gradients are averaged across all devices before every weight update so that the replicas stay identical. It is the most widely used form of parallelism in deep learning, the default starting point for any multi-GPU workload, and the foundation of the PyTorch DistributedDataParallel (DDP), DeepSpeed ZeRO, and PyTorch FSDP systems used to train large language models. Because every replica begins and ends each step with the same parameters, data parallelism is mathematically equivalent to single-device training on the full global batch, while running far faster in wall-clock time. The landmark demonstration of its scale, Goyal et al. (2017), trained [ResNet-50](/wiki/resnet) on [ImageNet](/wiki/imagenet) in one hour using a batch size of 8,192 across 256 GPUs at roughly 90% scaling efficiency. [2]

*See also: [Machine learning terms](/wiki/machine_learning_terms), [Model parallelism](/wiki/model_parallelism), [Pipeline parallelism](/wiki/pipeline_parallelism), [GPU computing](/wiki/gpu_computing), [Deep learning](/wiki/deep_learning), [Distributed computing](/wiki/distributed_computing)*

## Overview

Data parallelism is a distributed [training](/wiki/training) technique in [machine learning](/wiki/machine_learning) in which the input data is split across multiple processing units (typically GPUs), each holding a complete replica of the [neural network](/wiki/neural_network) model. Each device independently computes forward and backward passes on its local data partition, and the resulting gradients are aggregated across all devices before updating the model weights. Because every replica begins and ends each training step with identical parameters, data parallelism preserves the mathematical equivalence of single-device training while dramatically reducing wall-clock time.

Data parallelism is particularly useful when dealing with large-scale datasets and computationally-intensive models, such as [deep neural networks](/wiki/deep_learning) and other complex machine learning architectures. By distributing the workload across multiple resources, data parallelism accelerates the training process and enables the construction of more accurate and robust models.

Data parallelism is the most widely used form of parallelism in deep learning. Its conceptual simplicity, strong scaling properties, and broad framework support make it the default starting point for any multi-GPU training workload. It has been used at every scale, from two-GPU desktop setups to clusters of thousands of accelerators training [large language models](/wiki/large_language_model) such as GPT-4 and LLaMA. Frameworks such as [PyTorch](/wiki/pytorch), [TensorFlow](/wiki/tensorflow), and [JAX](/wiki/jax) all provide built-in primitives for data-parallel training.

## How data parallelism works

The training loop under data parallelism proceeds through the following stages in each iteration:

1. **Initialization.** The model parameters are broadcast from one designated rank (usually rank 0) to all workers so that every replica starts from the same state. [6]
2. **Data partitioning.** The global mini-batch of size B is divided into N equal shards, one for each of the N participating devices. A distributed sampler partitions each mini-batch into non-overlapping sub-batches (one per worker), and each worker processes a local micro-batch of size B/N.
3. **Independent forward and backward passes.** Every device runs the full forward pass on its micro-batch, computes the loss, and performs [backpropagation](/wiki/backpropagation) to obtain local gradients.
4. **Gradient aggregation.** The local gradients are combined (averaged or summed) across all N devices using a collective communication operation, most commonly all-reduce. After this step every device holds the same averaged gradient.
5. **Weight update.** Each device applies the identical optimizer step (for example, [SGD](/wiki/stochastic_gradient_descent_sgd) or [Adam](/wiki/adam_optimizer)) using the averaged gradient. Because all devices started with the same parameters and applied the same update, their models remain in sync without any additional coordination.

This cycle repeats for every mini-batch until training completes. The net effect is that the effective [batch size](/wiki/batch_size) scales linearly with the number of devices: training on 8 GPUs with a per-GPU batch of 32 yields an effective batch of 256.

## ELI5: data parallelism in plain language

Imagine a teacher needs to grade 1,000 exams. Instead of grading them all alone, the teacher makes 8 exact copies of the answer key and hands 125 exams to each of 8 assistants. Every assistant grades their stack independently. When they finish, they compare notes to make sure they all agree on the grading criteria. Then they repeat with the next 1,000 exams. That is data parallelism: the "answer key" is the model, the exams are the training data, and "comparing notes" is the gradient aggregation step.

Another way to picture it is a group of friends working together to complete a large puzzle. Each person takes a portion of the puzzle pieces and works on their section at the same time, making the whole process faster. Sometimes the friends stop and share their progress with each other, so everyone knows what the others are doing (synchronous approach). Other times they work independently and only occasionally check in with each other (asynchronous approach). Data parallelism helps to solve big machine learning problems more quickly by dividing the work among many "friends" (computational resources).

## Synchronous vs. asynchronous data parallelism

The gradient aggregation step can be performed in two fundamentally different ways.

### Synchronous data parallelism

In synchronous training, all workers must finish computing their local gradients before any worker can proceed to the weight update. The gradients are averaged using a blocking all-reduce operation, and every worker applies the same update at the same time. In synchronous [stochastic gradient descent](/wiki/stochastic_gradient_descent_sgd) (Sync-SGD), because no worker ever uses stale parameters, the optimization trajectory is mathematically equivalent to training on a single device with the full global batch size.

**Advantages:**

- Mathematically equivalent to single-device training with the same effective batch size.
- Deterministic convergence behavior that is straightforward to debug and reproduce.
- Standard approach in virtually all modern large-scale training systems.

**Disadvantages:**

- The speed of each step is limited by the slowest worker (the **straggler problem**). If one GPU is slightly slower due to hardware variance, thermal throttling, or network congestion, all other GPUs must wait at the synchronization barrier. Techniques such as backup workers and bounded staleness have been proposed to mitigate this issue.
- Requires high-bandwidth, low-latency interconnects to keep synchronization overhead manageable.

Synchronous data parallelism is the dominant paradigm today. High-speed interconnects like NVLink and InfiniBand have made communication overhead tolerable even at large scale, and the predictable convergence behavior makes synchronous training more reliable for production workloads.

### Asynchronous data parallelism

In asynchronous training, workers do not wait for one another. Each worker reads the current parameters from a central parameter server, computes gradients on its local data, and pushes those gradients back to the server, which applies the update immediately. There is no global synchronization barrier. [1]

**Advantages:**

- Eliminates idle time caused by stragglers; hardware utilization is maximized.
- Can be effective when workers have heterogeneous compute capabilities.

**Disadvantages:**

- Gradients may become **stale** because they were computed on an older version of the parameters. The staleness of a gradient is defined as the number of parameter updates that occurred between the time the worker read the parameters and the time it submitted its gradient. Applying stale gradients introduces noise and can harm convergence, particularly with large learning rates.
- Harder to reproduce results and tune hyperparameters.
- Can fail to match the final accuracy of synchronous methods, especially at large scale.

Researchers have proposed several remedies for gradient staleness, including dividing gradients by their staleness factor, using delay-compensated updates, and employing bounded-staleness protocols that allow limited asynchrony while capping the maximum staleness.

Asynchronous SGD was explored extensively in early work (Dean et al., 2012; Recht et al., 2011), but synchronous methods have largely superseded it for state-of-the-art training because the practical throughput gains rarely compensate for the statistical efficiency loss. [1][10]

### Hybrid approaches

Some systems use a hybrid strategy: workers are organized into groups that synchronize internally (using all-reduce within the group) while groups communicate asynchronously with one another. For instance, workers within the same node may synchronize synchronously over NVLink, while nodes communicate asynchronously over a slower Ethernet network. This can balance the consistency of synchronous training with some of the fault tolerance and throughput benefits of asynchronous methods.

| Property | Synchronous | Asynchronous | Hybrid |
|---|---|---|---|
| Gradient freshness | Always current | May be stale | Current within group |
| Convergence behavior | Predictable, reproducible; equivalent to large-batch single-device SGD | Noisier, harder to tune; weaker, depends on staleness bounds | Intermediate |
| Hardware utilization | Bottlenecked by slowest worker | Near-maximum | High |
| Communication pattern | All-reduce (blocking) | Parameter server (non-blocking) | Mixed |
| Implementation complexity | Moderate (all-reduce collectives) | Higher (parameter server, staleness management) | Variable |
| Hyperparameter tuning | Straightforward | Requires tuning for staleness | Intermediate |
| Dominant use today | Yes (PyTorch DDP, Horovod) | Rare; large heterogeneous clusters | Occasional |

## All-reduce communication

The all-reduce operation is the backbone of synchronous data parallelism. It takes a tensor from each of N workers, applies a reduction (typically summation), and distributes the result back so that every worker holds the identical reduced tensor. Several algorithms exist, each with different trade-offs.

### Ring all-reduce

Ring all-reduce, popularized in the deep learning community by Baidu's 2017 work, arranges workers in a logical ring. [9] The algorithm proceeds in two phases:

1. **Reduce-scatter phase.** Each worker's gradient tensor is split into N chunks. Over N-1 communication steps, chunks circulate around the ring, and each worker accumulates (sums) incoming chunks with its own. At the end of this phase, each worker holds the fully reduced version of exactly one chunk.
2. **All-gather phase.** Over another N-1 steps, the fully reduced chunks circulate again so that every worker ends up with the complete reduced tensor.

The total data transmitted per worker is 2(N-1)/N times the tensor size, which approaches 2x the tensor size for large N. Crucially, this communication volume is independent of the number of workers, making ring all-reduce **bandwidth-optimal**. [9] The bandwidth cost is determined only by the slowest link in the ring. However, the algorithm requires 2(N-1) sequential steps, which means its latency scales linearly with N. For clusters with many nodes, latency can become a bottleneck.

### Tree all-reduce and recursive halving-doubling

Tree all-reduce organizes workers in a binary (or k-ary) tree. The reduce phase flows from leaves to root, and the broadcast phase flows from root to leaves. Tree all-reduce has lower latency (O(log N) steps) but can create bandwidth bottlenecks at the root. It is more suitable when latency dominates over bandwidth, for example with small tensors or very large worker counts.

Recursive halving-doubling combines the strengths of both approaches by using a halving phase for reduce-scatter and a doubling phase for all-gather, achieving both bandwidth optimality and O(log N) latency for power-of-two worker counts.

### Hierarchical all-reduce

In multi-node clusters, [NCCL](/wiki/nccl) and other communication libraries often use a hierarchical approach: intra-node communication runs over NVLink or NVSwitch (high bandwidth, low latency), while inter-node communication runs over InfiniBand or RoCE. The reduce is performed first within each node and then across nodes, minimizing expensive cross-network traffic.

### NCCL

NVIDIA's Collective Communications Library ([NCCL](/wiki/nccl)) provides highly optimized implementations of all-reduce and other collectives for multi-GPU and multi-node setups. NCCL automatically selects the best algorithm and communication topology based on the hardware configuration. Most modern deep learning frameworks, including PyTorch, use NCCL as the default communication backend for GPU-based distributed training.

| Algorithm | Bandwidth cost per worker | Latency (steps) | Best suited for |
|---|---|---|---|
| Ring all-reduce | 2(N-1)/N * tensor size | 2(N-1) | Large tensors, moderate worker count |
| Tree all-reduce | ~2 * tensor size | 2 log N | Small tensors, large worker count |
| Recursive halving-doubling | ~2 * tensor size | 2 log N | Power-of-two worker counts |
| Hierarchical | Varies (two-level) | Varies | Multi-node clusters with mixed interconnects |

## Framework implementations

Several software frameworks provide production-ready implementations of data parallelism. The table below summarizes the most prominent ones.

| Framework | Developer | Key Mechanism | Supported Backends | Notes |
|---|---|---|---|---|
| [PyTorch](/wiki/pytorch) DDP | Meta (Facebook) | All-reduce with gradient bucketing | NCCL, Gloo, MPI | Default choice for PyTorch users |
| [PyTorch](/wiki/pytorch) FSDP | Meta (Facebook) | Sharded parameters, all-gather + reduce-scatter | NCCL | Enables training models that do not fit in a single GPU |
| [Horovod](/wiki/horovod) | Uber (now LF AI & Data) | Ring all-reduce via MPI/NCCL | NCCL, MPI, Gloo | Framework-agnostic; supports TensorFlow, PyTorch, MXNet |
| [DeepSpeed](/wiki/deepspeed) ZeRO | Microsoft | Sharded optimizer states, gradients, and parameters | NCCL | Three progressive sharding stages |
| tf.distribute | Google | MirroredStrategy (all-reduce) and others | NCCL, RING | Native TensorFlow distribution |
| [JAX](/wiki/jax) pmap/pjit | Google | XLA-compiled SPMD parallelism | XLA collectives | Functional transformation-based API |

### PyTorch DistributedDataParallel (DDP)

PyTorch DDP is the standard API for data-parallel training in [PyTorch](/wiki/pytorch). It wraps a model and transparently handles gradient synchronization during backpropagation. The authors of the system report that "when configured appropriately, the PyTorch distributed data parallel module attains near-linear scalability using 256 GPUs." [6]

**How DDP works internally:**

1. During construction, DDP broadcasts the model state from rank 0 to all other ranks using `ProcessGroup::broadcast()`.
2. DDP registers autograd hooks on each parameter. When a gradient is computed during the backward pass, the hook is triggered.
3. **Gradient bucketing.** DDP groups small gradient tensors into larger buckets before performing all-reduce. Multiple gradient tensors are packed into a single flat buffer, an all-reduce is performed on that buffer, and the results are unpacked back into individual gradient tensors. This bucketing amortizes the per-operation overhead and is controlled by the `bucket_cap_mb` parameter (default 25 MB). Bucketed all-reduce begins as soon as all gradients in a bucket are ready. [6]
4. **Overlapping communication with computation.** Buckets are reduced in reverse parameter order (matching the order in which gradients become ready during backpropagation), enabling overlap between computation and communication. While the backward pass is still computing gradients for earlier layers, the already-completed gradients for later layers can be communicated, hiding a significant portion of the communication latency behind useful computation. [6]
5. **Process group abstraction.** DDP uses PyTorch's distributed process group, which can use NCCL (for GPUs), Gloo (for CPUs), or MPI as its backend. NCCL is the standard choice for NVIDIA GPU training.
6. **No parameter broadcast during forward.** Unlike the older `DataParallel` module (which broadcasts parameters from a single GPU on every forward pass), DDP keeps parameters synchronized through gradient all-reduce alone, avoiding a major bottleneck. [6]

DDP requires that all processes call all-reduce operations in exactly the same order. This is enforced by using the bucket index order rather than the order in which buckets become ready. Mismatched ordering can lead to incorrect results or deadlocks.

### TensorFlow distributed strategies

**tf.distribute.MirroredStrategy** is TensorFlow's equivalent for synchronous data-parallel training on a single machine with multiple GPUs. Each variable in the model is mirrored across all replicas, and gradients are aggregated using NCCL-based all-reduce by default.

**tf.distribute.MultiWorkerMirroredStrategy** extends this to multi-node training. It requires a `TF_CONFIG` environment variable that specifies the cluster topology (worker addresses and roles). The API is designed so that code written for single-machine `MirroredStrategy` can be extended to multiple nodes with minimal changes.

Both strategies use a `strategy.scope()` context manager to distribute variables and computation, and they integrate directly with `model.fit()` as well as custom training loops.

### Horovod

Horovod, developed at Uber and published by Sergeev and Del Balso (2018), is a framework-agnostic distributed training library that supports [PyTorch](/wiki/pytorch), [TensorFlow](/wiki/tensorflow), [Keras](/wiki/keras), and Apache MXNet. It uses ring all-reduce via NCCL and was designed to require minimal code changes. Typically, adapting a single-GPU training script to use Horovod requires adding only a few lines of code. Its API centers on wrapping the optimizer to inject all-reduce calls during backpropagation. [7]

Early versions relied on Baidu's open-source ring-allreduce implementation, but later releases switched to [NCCL](/wiki/nccl), which provides a highly optimized implementation. Horovod introduced **Tensor Fusion**, an algorithm that batches small gradient tensors together before performing the all-reduce, reducing the number of communication operations and improving throughput. [7]

Horovod uses MPI (Message Passing Interface) for process management, making it familiar to the high-performance computing community. The `horovodrun` launcher simplifies starting distributed jobs across multiple nodes. [Benchmarks](/wiki/benchmarks) in the original paper demonstrated near-linear scaling on up to 512 GPUs for training [Inception-v3](/wiki/inception) and [ResNet-101](/wiki/resnet) on [ImageNet](/wiki/imagenet). [7] Horovod was widely used before DDP matured but remains popular in environments that need multi-framework support.

## Scaling efficiency and the linear scaling rule

Ideally, training on N GPUs should be N times faster than training on a single GPU. In practice, communication overhead and other factors reduce this ratio. The scaling efficiency is defined as:

> Scaling efficiency = (Time on 1 GPU) / (N * Time on N GPUs)

A scaling efficiency of 1.0 (or 100%) represents perfect linear scaling. Real-world systems typically achieve 85-95% efficiency on fast interconnects for large models.

### What is the linear scaling rule?

Goyal et al. (2017) established a practical guideline for large-batch training in their paper "Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour." [2] The **linear scaling rule** states:

> When the mini-batch size is multiplied by k, multiply the learning rate by k.

The intuition is that with k times more data per step, the gradient estimate is k times more accurate (lower variance), and a proportionally larger step can be taken without increasing noise. All other hyperparameters (weight decay, momentum coefficient, etc.) are kept unchanged. Using this rule, the authors trained [ResNet-50](/wiki/resnet) on [ImageNet](/wiki/imagenet) with a batch size of 8,192 across 256 GPUs in one hour, matching the accuracy of small-batch baselines. They reported approximately 90% scaling efficiency when moving from 8 to 256 GPUs. [2]

### Learning rate warmup

The linear scaling rule breaks down in the early stages of training, when the network parameters are changing rapidly and the loss landscape is highly non-linear. Applying a very large learning rate from the start can cause training to diverge.

Goyal et al. also introduced a **gradual warmup** scheme: the learning rate starts small and is linearly increased to the target value over the first few epochs (typically 5 epochs in their experiments). Warmup prevents the large learning rate from causing divergence early in training, when the model parameters are far from any reasonable optimum and the loss landscape is poorly conditioned. During warmup, the network moves to a region of the loss landscape where the linear scaling assumption holds more reliably. [2]

Gradual warmup has since become a standard practice in large-scale training and is used in the pre-training of models such as [BERT](/wiki/bert), [GPT](/wiki/gpt), and [Vision Transformers](/wiki/vision_transformer).

## Large-batch training challenges

While data parallelism enables training with very large effective batch sizes, this introduces its own set of difficulties. The linear scaling rule has practical limits. Beyond a certain batch size (which varies by task and model), increasing the batch size no longer reduces training time proportionally and may degrade final accuracy. This phenomenon is sometimes called the **critical batch size**.

### Generalization degradation

Multiple studies have observed that models trained with very large batch sizes can suffer a "generalization gap," achieving the same training loss as small-batch models but worse test accuracy. The leading explanation is that large-batch optimization tends to converge to sharp minima in the loss landscape, where small perturbations to the parameters cause large increases in loss. In contrast, the inherent noise of small-batch [gradient descent](/wiki/gradient_descent) acts as implicit regularization that favors flatter minima, which generalize better. Techniques like learning rate warmup, LARS, LAMB, and carefully tuned learning rate schedules help mitigate this effect.

### Adaptive learning rate methods

To push batch sizes even further, researchers have developed specialized optimizers:

- **LARS (Layer-wise Adaptive Rate Scaling):** Introduced by You et al. (2017), LARS scales the learning rate for each layer based on the ratio of the layer's weight norm to the gradient norm. This prevents layers with small weights from receiving disproportionately large updates. [4]
- **[LAMB](/wiki/lamb_optimizer) (Layer-wise Adaptive Moments optimizer for Batch training):** You et al. (2019/2020) extended LARS to work with Adam-style adaptive learning rates. The authors state that "we propose the LAMB optimizer, which helps us to scale the batch size to 65536 without losing accuracy," reducing BERT pre-training from three days to 76 minutes. [5]

LARS and [LAMB](/wiki/lamb_optimizer) address the observation that different layers can have very different gradient-to-weight ratios, and a single global learning rate (even when linearly scaled) may be too large for some layers and too small for others. By normalizing the update for each layer independently, these optimizers maintain stable training at batch sizes of 32K or more.

| Optimizer / Technique | Authors | Key Idea | Notable Result / Typical Batch Size |
|---|---|---|---|
| Linear scaling rule | Goyal et al., 2017 | Scale learning rate proportionally to batch size | Up to ~8,192 |
| Learning rate warmup | Goyal et al., 2017 | Prevent divergence at training start | All large-batch training |
| LARS | You et al., 2017 | Per-layer learning rate scaling based on weight and gradient norms | ResNet-50 with batch size 32,768 |
| [LAMB](/wiki/lamb_optimizer) | You et al., 2020 | Adam with layer-wise adaptive learning rates | BERT pre-training in 76 minutes (batch 65,536) |

## Gradient accumulation

[Gradient accumulation](/wiki/gradient_accumulation) is a technique that simulates a large effective batch size on hardware that cannot fit the full batch in memory. Instead of processing the entire batch at once, the batch is divided into smaller **micro-batches**. The model performs a forward and backward pass on each micro-batch and accumulates (sums or averages) the gradients. The optimizer step is applied only after all micro-batches have been processed. If the accumulation factor is A and the micro-batch size is B, the effective batch size is A * B.

For example, if the target effective batch size is 1,024 but only a batch of 256 fits in GPU memory, the training loop processes four micro-batches of 256 and accumulates their gradients before performing a single weight update. The resulting parameter update is mathematically identical to processing the full batch of 1,024 at once.

Gradient accumulation is sometimes called "poor man's data parallelism" because it achieves the same effective batch size increase as adding more GPUs, but at the cost of proportionally longer training time (since the micro-batches are processed sequentially on the same device). It can also be combined with multi-GPU data parallelism. For example, with 8 GPUs, a micro-batch size of 4, and an accumulation factor of 4, the effective global batch size is 8 * 4 * 4 = 128. This allows practitioners to reach batch sizes that would otherwise require more GPUs than are available.

**Key considerations:**

- Gradient synchronization (all-reduce) should only occur on the final accumulation step, not on every micro-batch. Frameworks like PyTorch provide a `no_sync()` context manager for DDP that skips synchronization during intermediate accumulation steps, reducing communication overhead. [6]
- Gradient accumulation interacts poorly with standard [Batch Normalization](/wiki/batch_normalization), because the normalization statistics are computed per micro-batch rather than over the full effective batch. Using synchronized batch normalization or replacing batch normalization with [Layer Normalization](/wiki/layer_normalization) (which normalizes per-sample) can mitigate this. This is one reason why layer normalization has become dominant in Transformer architectures.
- Gradient accumulation trades memory for time: it reduces memory requirements per step but increases the number of forward-backward passes per optimizer update.

## ZeRO: Zero Redundancy Optimizer

Standard data parallelism requires every device to store a full copy of the model parameters, gradients, and optimizer states. For a model with P parameters trained in mixed precision with Adam, the memory footprint per device is approximately 16P bytes (2P for fp16 parameters, 2P for fp16 gradients, 4P for fp32 master weights, 4P for first moments, 4P for second moments). This redundancy limits the maximum model size that data parallelism can handle. [3]

ZeRO (Zero Redundancy Optimizer), proposed by Rajbhandari et al. (2020) at Microsoft Research and implemented in the [DeepSpeed](/wiki/deepspeed) library, eliminates this redundancy by partitioning the model states across data-parallel workers rather than replicating them. [3] ZeRO has three progressive stages:

### ZeRO Stage 1: optimizer state partitioning

Each worker stores only 1/N of the optimizer states (for [Adam](/wiki/adam_optimizer), these include the 32-bit master weights, first moment estimates, and second moment estimates). After gradients are reduced, each worker updates only its partition of the parameters and then broadcasts the updated parameters. This reduces optimizer state memory by a factor of N, yielding up to 4x total memory reduction in mixed-precision training with no additional communication overhead. [3]

### ZeRO Stage 2: gradient partitioning

In addition to optimizer states, gradients are also partitioned. Each worker retains only the gradient shard that corresponds to its optimizer state partition. Gradients for other partitions are reduced and discarded immediately after the reduce-scatter operation. This further reduces memory consumption, up to 8x compared to baseline DDP. [3]

### ZeRO Stage 3: parameter partitioning

The 16-bit model parameters themselves are partitioned. Each worker stores only 1/N of the parameters. Before the forward and backward passes, parameters are gathered on demand using all-gather operations and freed immediately after use. This achieves memory reduction proportional to the degree of data parallelism, enabling training of models that are far larger than what would fit on a single device, at the cost of 1.5x communication volume relative to DDP. [3]

| ZeRO stage | What is partitioned | Memory per device (approx.) | Memory savings | Communication overhead |
|---|---|---|---|---|
| Stage 0 (baseline DDP) | Nothing (full replication) | 16P bytes | 1x | 1x (all-reduce) |
| Stage 1 (P_os) | Optimizer states | ~6P bytes | Up to 4x | 1x |
| Stage 2 (P_os+g) | Optimizer states + gradients | ~4P bytes | Up to 8x | 1x |
| Stage 3 (P_os+g+p) | Optimizer states + gradients + parameters | ~16P/N bytes | Linear with worker count | 1.5x (additional all-gather) |

ZeRO has been instrumental in enabling the training of very large models. The original paper reported that ZeRO can train models with 100 billion parameters at three to five times the throughput of the prior best system, and showed that all three stages combined can scale to a trillion-parameter model on 1,024 NVIDIA GPUs. [3] ZeRO also supports offloading optimizer states and parameters to CPU memory or NVMe storage (ZeRO-Offload and ZeRO-Infinity), enabling training of trillion-parameter models on limited GPU hardware at the cost of slower throughput.

## Fully Sharded Data Parallel (FSDP)

PyTorch FSDP is PyTorch's native implementation of ZeRO Stage 3 concepts. While standard DDP requires each GPU to hold a complete copy of the model parameters, gradients, and optimizer states, FSDP shards all three across data-parallel workers, dramatically reducing per-GPU memory consumption. Zhao et al. (2023) report that FSDP "is able to achieve comparable performance to Distributed Data Parallel while providing support for significantly larger models with near-linear scalability in terms of TFLOPS." [8]

**How FSDP works:**

1. Model parameters are divided (sharded) across all participating ranks. Each rank stores only its assigned shard.
2. Before each layer's forward pass, FSDP performs an **all-gather** to reconstruct the full parameters for that layer from all ranks.
3. After the forward computation for that layer completes, the non-local parameter shards are freed from memory.
4. During the backward pass, parameters are gathered again for gradient computation. After gradients are computed, a **reduce-scatter** operation distributes the reduced gradient shards back to their owning ranks.
5. Each rank runs the optimizer only on its local shard of parameters using its local shard of gradients, producing locally sharded optimizer states. [8]

FSDP wraps model layers in a nested hierarchy, so only one layer at a time needs its full parameters materialized. This means peak GPU memory is determined by the size of the largest single FSDP unit plus the sharded states of all other units. [8]

FSDP offers several sharding strategies:

| Sharding Strategy | What Is Sharded | Communication Overhead vs. DDP |
|---|---|---|
| FULL_SHARD | Optimizer states, gradients, and parameters | ~1.5x |
| SHARD_GRAD_OP (ZeRO Stage 2) | Optimizer states and gradients | ~1.0x |
| NO_SHARD | Nothing (equivalent to DDP) | 1.0x |
| HYBRID_SHARD | Full shard within a node, replicate across nodes | Between 1.0x and 1.5x |

The trade-off is clear: full sharding offers the lowest memory footprint per GPU but incurs the most communication. HYBRID_SHARD is often a practical middle ground for multi-node training, as intra-node communication over NVLink is fast while inter-node communication over InfiniBand is slower.

FSDP supports implicit prefetching (automatically issuing all-gather for the next layer while the current layer is computing) and optional CPU offloading. PyTorch FSDP2, introduced in more recent PyTorch releases, simplifies the API and improves composability with other parallelism strategies and with PyTorch features like activation checkpointing and [mixed precision training](/wiki/mixed_precision_training).

## Mixed precision training

[Mixed precision training](/wiki/mixed_precision_training), introduced by Micikevicius et al. (2018) in their ICLR 2018 paper, is a technique that combines half-precision (FP16) and single-precision (FP32) floating-point arithmetic to accelerate training while maintaining model accuracy. [12] It is frequently used alongside data parallelism to further improve throughput and reduce memory consumption.

**Core components of mixed precision training:**

1. **FP16 forward and backward passes.** Weights, activations, and gradients are stored and computed in 16-bit floating point, which halves memory usage and enables the use of Tensor Cores on NVIDIA GPUs for faster matrix operations.
2. **FP32 master weights.** A single-precision copy of the weights is maintained for the optimizer update. After each step, the updated FP32 weights are cast back to FP16 for the next iteration. This prevents the accumulation of small gradient updates that would be lost in FP16 due to limited precision. [12]
3. **Loss scaling.** Because FP16 has a much smaller dynamic range than FP32, small gradient values may underflow to zero. Loss scaling multiplies the loss by a large factor before the backward pass, shifting gradients into the representable FP16 range. The gradients are then unscaled before the optimizer step. Modern implementations use **dynamic loss scaling**, which automatically adjusts the scale factor during training. [12]

**Benefits for data-parallel training:**

- Reduced gradient size (FP16 gradients are half the size of FP32) means less data to transfer during all-reduce, cutting communication time.
- Lower memory consumption per GPU allows for larger micro-batch sizes, which improves GPU utilization and reduces the number of communication rounds needed to achieve a given global batch size.
- Tensor Core acceleration on NVIDIA Ampere, Hopper, and later architectures can deliver 2x to 8x higher throughput for matrix multiplication compared to FP32.

PyTorch provides `torch.cuda.amp` (automatic mixed precision) with `GradScaler` and `autocast` context managers that integrate seamlessly with DDP and FSDP. DeepSpeed and Horovod also provide built-in support for mixed precision training.

More recently, BF16 ([bfloat16](/wiki/bfloat16)) has gained popularity as an alternative to FP16. BF16 uses the same number of exponent bits as FP32, giving it a much wider dynamic range, which eliminates the need for loss scaling in most cases. Google's [TPU](/wiki/tpu) hardware natively supports BF16, and NVIDIA's Ampere and later GPU architectures also include BF16 Tensor Core support.

## Data parallelism vs. model parallelism vs. pipeline parallelism

Data parallelism is one of three primary parallelism strategies used in distributed deep learning. The choice depends on model size, available hardware, and communication bandwidth.

| Aspect | Data Parallelism | [Model Parallelism](/wiki/model_parallelism) (tensor) | [Pipeline Parallelism](/wiki/pipeline_parallelism) |
|---|---|---|---|
| What is replicated | Model (full copy on each worker) | Data (same batch on all workers) | Data (micro-batches flow through stages) |
| What is split | Data (each worker gets a shard) | Model (layers or tensors split across workers) | Model (consecutive layers assigned to stages) |
| Communication pattern | Gradient all-reduce (once per step) | Point-to-point activation/gradient transfers (every layer boundary) | Point-to-point between pipeline stages |
| Communication volume | Moderate | High (per-layer) | Lower than tensor parallelism |
| Memory per worker | Full model + optimizer states | Fraction of model + associated optimizer states | Fraction of model + micro-batch activations |
| When to use | Model fits in one GPU; dataset is large | Model does not fit in one GPU; very large layers (e.g., attention heads) | Model does not fit in one GPU; very deep models; want to overlap computation |
| Scaling bottleneck | Communication overhead at high worker counts (unless combined with ZeRO/FSDP) | Activation transfer latency; load imbalance; high communication overhead | Pipeline bubble (idle time); micro-batch scheduling |
| Implementation complexity | Low | High (requires manual partitioning or automated tools) | Medium to High (micro-batch scheduling, pipeline flushes) |
| Effective batch size | Increases with workers | Unchanged | Unchanged (but micro-batches increase throughput) |

In practice, large-scale training systems combine multiple strategies. This is often called **3D parallelism**: data parallelism across groups of nodes, pipeline parallelism across nodes within a group, and tensor (model) parallelism across GPUs within a node. For example, the Megatron-LM framework from NVIDIA uses all three strategies simultaneously to train models with hundreds of billions of parameters. [11] State-of-the-art training systems for very large models (such as [GPT-4](/wiki/gpt-4), [PaLM](/wiki/palm), and [LLaMA](/wiki/llama)) use this hybrid approach, matching each parallelism dimension to the appropriate level of the hardware topology.

## Multi-node training infrastructure

Scaling data parallelism beyond a single machine introduces network communication as a critical bottleneck. The hardware and software stack for multi-node training includes several key components.

### NCCL (NVIDIA Collective Communications Library)

[NCCL](/wiki/nccl) (pronounced "nickel") is NVIDIA's library for collective communication primitives (all-reduce, all-gather, reduce-scatter, broadcast) optimized for NVIDIA GPUs. NCCL automatically detects the topology of the system, including PCIe, NVLink, NVSwitch, InfiniBand, and RoCE connections, and selects the optimal communication algorithm. It is the default communication backend for both PyTorch DDP and TensorFlow's distributed strategies when running on NVIDIA hardware.

### NVLink and NVSwitch

NVLink is NVIDIA's high-bandwidth, low-latency point-to-point interconnect for GPUs within the same node. NVLink 4.0 (used in H100 GPUs) provides 900 GB/s of bidirectional bandwidth per GPU, delivered over 18 links at 50 GB/s each, which is roughly 7x the 128 GB/s of a PCIe Gen5 x16 slot. [13] NVSwitch enables all-to-all NVLink connectivity among all GPUs in a node, eliminating the topology constraints of point-to-point links. These interconnects make intra-node all-reduce extremely fast.

### InfiniBand and RoCE

For inter-node communication, InfiniBand (IB) and RDMA over Converged Ethernet (RoCE) provide high-bandwidth, low-latency networking with remote direct memory access (RDMA). RDMA enables data to move directly between GPU memory on different nodes without involving the CPU, minimizing latency. NVIDIA's ConnectX network adapters and Quantum InfiniBand switches are commonly used in large AI training clusters. HDR InfiniBand provides 200 Gb/s per port, and NDR provides 400 Gb/s.

| Interconnect | Scope | Bandwidth (bidirectional) | Typical use |
|---|---|---|---|
| NVLink 4.0 | Intra-node (GPU to GPU) | 900 GB/s per GPU | All-reduce within a node |
| NVSwitch | Intra-node (all-to-all) | Full bisection bandwidth | All-to-all within a node |
| InfiniBand NDR | Inter-node | 400 Gb/s per port | Gradient sync across nodes |
| RoCE | Inter-node | Up to 400 Gb/s | Alternative to InfiniBand |
| PCIe Gen5 | Intra-node (CPU-GPU) | 128 GB/s per x16 slot | CPU offloading, data loading |

## Communication overhead analysis

Communication overhead is the primary bottleneck in data-parallel training. Every iteration requires synchronizing gradients across all workers, and the time spent in communication directly reduces the fraction of time spent on useful computation. The communication cost of data-parallel training is determined by three factors:

1. **Message size.** The total gradient size equals the number of model parameters times the bytes per parameter (typically 4 bytes for fp32, 2 bytes for fp16). A 7-billion-parameter model in fp16 produces 14 GB of gradients per step.
2. **Network bandwidth.** Ring all-reduce transmits approximately 2x the gradient size per worker. At 400 Gb/s (50 GB/s) InfiniBand, a 14 GB all-reduce takes roughly 0.56 seconds, assuming full bandwidth utilization.
3. **Computation time.** Communication overhead matters only relative to the time spent on forward and backward passes. If a training step takes 10 seconds of computation, a 0.56 second all-reduce adds only 5.6% overhead. If the step takes 1 second, the same all-reduce adds 56% overhead.

This analysis reveals why data parallelism scales better for large models (more computation per step) and on fast interconnects (less time per all-reduce). The ratio of computation time to communication time (the **computation-to-communication ratio**) determines scaling efficiency. Models with a high parameter-to-FLOP ratio (such as large [transformer](/wiki/transformer) models) tend to be more communication-bound.

### Factors affecting communication cost

| Factor | Impact |
|---|---|
| Model size | Larger models produce more gradient data to synchronize |
| Number of workers | Ring all-reduce latency grows linearly with worker count |
| Interconnect bandwidth | Higher bandwidth (NVLink, InfiniBand) reduces transfer time |
| Network topology | Intra-node communication (NVLink) is much faster than inter-node (Ethernet) |
| Gradient compression | Techniques like quantization and sparsification reduce the volume of data transferred |

### Techniques to reduce communication overhead

Several approaches have been developed to mitigate communication costs:

- **Gradient bucketing.** Grouping small gradient tensors into larger buffers before communicating them amortizes per-message latency. PyTorch DDP uses this by default. [6]
- **Computation-communication overlap.** Starting the all-reduce for completed gradient buckets while the backward pass is still computing gradients for earlier layers. This is the default behavior in PyTorch DDP and Horovod. [6]
- **Gradient compression.** Techniques such as gradient quantization (e.g., 1-bit SGD, TernGrad), gradient sparsification (e.g., Top-K, Random-K), and error feedback reduce the volume of data transmitted at the cost of some approximation.
- **Local SGD (periodic averaging).** Instead of synchronizing every iteration, workers run several local SGD steps and average parameters periodically. This dramatically reduces communication frequency at the cost of introducing temporary model divergence between workers.
- **Tensor Fusion.** Horovod's approach of batching multiple small all-reduce operations into fewer large ones to reduce launch overhead. [7]
- **Mixed precision.** Using fp16 or bf16 halves the gradient size and thus the communication volume.

## Challenges and trade-offs

While data parallelism can significantly accelerate the training process in machine learning, it also presents several challenges and trade-offs.

### Communication overhead

As the number of computational resources increases, the communication overhead between resources may also increase, potentially negating the benefits of data parallelism. This issue is particularly pronounced in synchronous data parallelism, where resources must frequently synchronize their updates.

### Load balancing

Uneven distribution of data or computational resources can lead to load balancing issues, where some resources may be idle while others are still processing their data subsets. This can reduce the overall efficiency of the data parallelism approach and may require careful partitioning and resource allocation strategies. Dynamic batching, heterogeneous-aware schedulers, and straggler mitigation techniques can help address load imbalance.

### Consistency vs. speed

Synchronous and asynchronous data parallelism offer different trade-offs between consistency and speed. Synchronous approaches provide better consistency but may be slower, while asynchronous approaches can be faster but may lead to less accurate models due to gradient staleness.

### Batch size generalization gap

Training with very large batch sizes can sometimes lead to models that generalize less well to unseen data, even when training loss is comparable to small-batch baselines. This has been attributed to large-batch training converging to sharp minima rather than flat minima.

## Best practices

- **Start with DDP or FSDP.** For most PyTorch workloads, DDP is sufficient if the model fits on a single GPU. Switch to FSDP when model memory exceeds single-GPU capacity.
- **Use the linear scaling rule with warmup.** When increasing the number of GPUs, scale the learning rate proportionally and use 5-10 epochs of linear warmup. [2]
- **Profile communication overhead.** Use tools like PyTorch Profiler, NVIDIA Nsight Systems, or `NCCL_DEBUG=INFO` to identify communication bottlenecks.
- **Tune gradient bucket size.** The default 25 MB bucket size in DDP works well for most models, but very large or very small models may benefit from adjustment.
- **Combine with gradient accumulation.** If memory is tight or the desired effective batch size requires it, use gradient accumulation to multiply the effective batch size beyond what hardware alone provides.
- **Consider mixed precision.** Training in fp16 or bf16 halves the gradient size and thus the communication volume, while also speeding up computation on modern GPUs with Tensor Cores.

## Historical context

Data parallelism has been a core idea in parallel computing since the 1980s, with roots in SIMD (Single Instruction, Multiple Data) architectures and early data-parallel languages like CM Fortran for the Connection Machine. In the machine learning context, data-parallel SGD became practical with the rise of GPU computing in the early 2010s.

Key milestones include:

- **2011:** Recht et al. introduced Hogwild!, a lock-free approach to parallelizing stochastic gradient descent. [10]
- **2012:** Dean et al. at Google described DistBelief, one of the first large-scale distributed training systems, using both data and model parallelism with asynchronous SGD and parameter servers. [1]
- **2014:** The introduction of the AllReduce-based approach for neural network training began displacing parameter-server architectures.
- **2017:** Goyal et al. at Facebook AI Research demonstrated training ImageNet in one hour using synchronous data parallelism with 256 GPUs, establishing the linear scaling rule and warmup as standard practice. [2]
- **2017:** Baidu published their ring all-reduce implementation, which was later incorporated into Horovod and became the dominant communication pattern. [9]
- **2018:** Micikevicius et al. published the Mixed Precision Training paper at ICLR, and Sergeev and Del Balso released Horovod. [12][7]
- **2019:** Microsoft introduced ZeRO as part of the DeepSpeed library, breaking the memory barrier of standard data parallelism. [3]
- **2020:** Rajbhandari et al. published the ZeRO paper, and PyTorch introduced DistributedDataParallel with gradient bucketing and communication overlap. [3][6]
- **2022:** PyTorch released FSDP, bringing ZeRO-style sharding natively into the framework. [8]
- **2023-2024:** FSDP2 and improved integration with other parallelism modes (tensor parallelism, pipeline parallelism) in frameworks like Megatron-LM and DeepSpeed made 3D parallelism accessible to a wider audience.

## Frequently asked questions

### What is the difference between data parallelism and model parallelism?

Data parallelism replicates the entire model on every device and splits the training data across devices, then averages gradients with an all-reduce after each step. [Model parallelism](/wiki/model_parallelism) instead splits the model itself (its layers or individual tensors) across devices while every device sees the same data. Data parallelism is used when the model fits in a single GPU and the dataset is large; model parallelism is used when the model is too large to fit on one GPU. Large-scale systems combine both, along with [pipeline parallelism](/wiki/pipeline_parallelism), in a configuration known as 3D parallelism. [11]

### Is data parallelism the same as DDP, ZeRO, or FSDP?

Data parallelism is the general strategy; DDP, ZeRO, and FSDP are specific implementations of it. PyTorch DDP is classic data parallelism that replicates all model states on every worker. DeepSpeed ZeRO and PyTorch FSDP are sharded data parallelism: they keep the data-parallel execution model but partition the optimizer states, gradients, and (in ZeRO Stage 3 and FSDP FULL_SHARD) the parameters themselves across workers to remove the memory redundancy of plain DDP. [3][8]

### When should I use FSDP instead of DDP?

Use DDP when the model, its gradients, and its optimizer states all fit comfortably in a single GPU's memory, because DDP has the lowest communication overhead (a single all-reduce per step). Switch to FSDP (or DeepSpeed ZeRO) when per-GPU memory is the binding constraint, since FSDP shards the model states and can train models far larger than one GPU could hold, at the cost of roughly 1.5x the communication of DDP under FULL_SHARD. [8]

## References

1. Dean, J., Corrado, G., Monga, R., Chen, K., [Devin](/wiki/devin), M., Mao, M., Ranzato, M., Senior, A., Tucker, P., Yang, K., Le, Q. V., & Ng, A. Y. (2012). "Large Scale Distributed Deep Networks." *Advances in Neural Information Processing Systems ([NeurIPS](/wiki/neurips) 2012)*.
2. Goyal, P., Dollar, P., Girshick, R., Noordhuis, P., Wesolowski, L., Kyrola, A., Tulloch, A., Jia, Y., & He, K. (2017). "Accurate, Large Minibatch SGD: Training ImageNet in 1 Hour." [arXiv:1706.02677](https://arxiv.org/abs/1706.02677).
3. Rajbhandari, S., Rasley, J., Ruwase, O., & He, Y. (2020). "ZeRO: Memory Optimizations Toward Training Trillion Parameter Models." *Proceedings of the International Conference for High Performance Computing, Networking, Storage and Analysis (SC '20)*. [arXiv:1910.02054](https://arxiv.org/abs/1910.02054).
4. You, Y., Gitman, I., & Ginsburg, B. (2017). "Large Batch Training of Convolutional Networks" / "Scaling SGD [Batch Size](/wiki/batch_size) to 32K for ImageNet Training." *arXiv:1708.03888 / Technical Report, UC Berkeley*.
5. You, Y., Li, J., Reddi, S., Hseu, J., Kumar, S., Bhojanapalli, S., Song, X., Demmel, J., Keutzer, K., & Hsieh, C.-J. (2020). "Large Batch Optimization for Deep Learning: Training BERT in 76 Minutes." *International Conference on Learning Representations (ICLR 2020)*. [arXiv:1904.00962](https://arxiv.org/abs/1904.00962).
6. Li, S., Zhao, Y., Varma, R., Salpekar, O., Noordhuis, P., Li, T., Paszke, A., Smith, J., Vaughan, B., Damania, P., & Chintala, S. (2020). "PyTorch Distributed: Experiences on Accelerating Data Parallel Training." *Proceedings of the VLDB Endowment*, 13(12). [arXiv:2006.15704](https://arxiv.org/abs/2006.15704).
7. Sergeev, A. & Del Balso, M. (2018). "Horovod: fast and easy distributed deep learning in TensorFlow." [arXiv:1802.05799](https://arxiv.org/abs/1802.05799).
8. Zhao, Y., Gu, A., Varma, R., Luo, L., Huang, C.-C., Xu, M., Wright, L., Shojanazeri, H., Ott, M., Shleifer, S., Desmaison, A., Balioglu, C., Damania, P., Nguyen, B., Chauhan, G., Hao, Y., Mathews, A., & Li, S. (2023). "PyTorch FSDP: Experiences on Scaling Fully Sharded Data Parallel." *Proceedings of the VLDB Endowment*, 16(12). [arXiv:2304.11277](https://arxiv.org/abs/2304.11277).
9. Gibiansky, A. (2017). "Bringing HPC Techniques to Deep Learning." Baidu Research Blog.
10. Recht, B., Re, C., Wright, S., & Niu, F. (2011). "Hogwild!: A Lock-Free Approach to Parallelizing Stochastic Gradient Descent." *Advances in Neural Information Processing Systems (NeurIPS)*.
11. Narayanan, D., Shoeybi, M., Casper, J., et al. (2021). "Efficient Large-Scale Language Model Training on GPU Clusters Using Megatron-LM." *Proceedings of the International Conference for High Performance Computing, Networking, Storage and Analysis (SC '21)*.
12. Micikevicius, P., Narang, S., Alben, J., Diamos, G., Elsen, E., Garcia, D., Ginsburg, B., Houston, M., Kuchaiev, O., Venkatesh, G., & Wu, H. (2018). "Mixed [Precision](/wiki/precision) Training." *International Conference on Learning Representations (ICLR 2018)*. [arXiv:1710.03740](https://arxiv.org/abs/1710.03740).
13. NVIDIA (2022). "NVIDIA H100 Tensor Core GPU Architecture" (Hopper whitepaper) and "NVIDIA H100 Tensor Core GPU Datasheet." NVIDIA Corporation.

