Usage Guide#

This usage guide introduces DeepSpeed and guides you through how to train a PyTorch model with the DeepSpeed engine. To implement DeepSpeedTrial, you need to overwrite specific functions corresponding to common training aspects. It is helpful to work from a skeleton trial to keep track of what is required, as the following example template shows:

from typing import Any, Dict, Iterator, Optional,  Union
from attrdict import AttrDict

import torch
import deepspeed

import determined.pytorch import DataLoader, TorchData
from determined.pytorch.deepspeed import DeepSpeedTrial, DeepSpeedTrialContext

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        self.args = AttrDict(self.context.get_hparams())

    def build_training_data_loader(self) -> DataLoader:
        return DataLoader()

    def build_validation_data_loader(self) -> DataLoader:
        return DataLoader()

    def train_batch(
        self,
        dataloader_iter: Optional[Iterator[TorchData]],
        epoch_idx: int,
        batch_idx: int,
    ) -> Union[torch.Tensor, Dict[str, Any]]:
        return {}

    def evaluate_batch(
        self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
    ) -> Dict[str, Any]:
        return {}

The DeepSpeed API organizes training routines into common steps like creating the data loaders and training and evaluating the model. The provided template shows the function signatures, including the expected return types, for these methods.

Because DeepSpeed is built on top of PyTorch, there are many similarities between the API for PyTorchTrial and DeepSpeedTrial. The following steps show you how to implement each of the DeepSpeedTrial methods beginning with training objects initialization.

Step 1- Configure and Initialize Training Objects#

DeepSpeed training initialization consists of two steps:

  1. Initialize the distributed backend.

  2. Create the DeepSpeed model engine.

Refer to the DeepSpeed Getting Started guide for more information.

Outside of Determined, this is typically done in the following way:

deepspeed.init_distributed(dist_backend=args.backend)
net = ...
model_engine, optimizer, lr_scheduler, _ = deepspeed.initialize(args=args, net=net, ...)

DeepSpeedTrial automatically initializes the distributed training backend so all you need to do is initialize the model engine and other training objects in the DeepSpeedTrial __init__() method.

Configuration#

DeepSpeed behavior during training is configured by passing arguments when initializing the model engine. This can be done in two ways:

  • Using a configuration file specified as an argument with a field named deepspeed_config.

  • Using a dictionary, which is passed in directly when initializing a model engine.

Both approaches can be used in combination with the Determined experiment configuration. See the DeepSpeed documentation for more information on what can be specified in the configuration.

If you want to use a DeepSpeed configuration file, the hyperparameters section can be used as arguments to pass to deepspeed.initialize. For example, if the DeepSpeed configuration file is named ds_config.json, the hyperparameter section of the Determined experiment configuration is:

hyperparameters:
  deepspeed_config: ds_config.json
  ...

If you want to overwrite some values in an existing DeepSpeed configuration file, use overwrite_deepspeed_config() and an experiment configuration similar to:

hyperparameters:
  deepspeed_config: ds_config.json
  overwrite_deepspeed_args:
      train_batch_size: 16
      optimizer:
        params:
          lr: 0.005
  ...

If you want to use a dictionary directly, specify a DeepSpeed configuration dictionary in the hyperparameters section:

hyperparameters:
  optimizer:
    type: Adam
    params:
      betas:
        - 0.8
        - 0.999
      eps: 1.0e-08
      lr: 0.001
      weight_decay: 3.0e-07
  train_batch_size: 16
  zero_optimization:
    stage: 0
    allgather_bucket_size: 50000000
    allgather_partitions: true
    contiguous_gradients: true
    cpu_offload: false
    overlap_comm: true
    reduce_bucket_size: 50000000
    reduce_scatter: true

Initialization#

After configuration, you can initialize the model engine in the DeepSpeedTrial. The following example corresponds to the experiment configuration above, with a field in the hyperparameters section named overwrite_deepspeed_args.

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        self.args = AttrDict(self.context.get_hparams())

        model = Net(self.args)
        ds_config = overwrite_deepspeed_config(
            self.args.deepspeed_config, self.args.get("overwrite_deepspeed_args", {})
        )
        parameters = filter(lambda p: p.requires_grad, model.parameters())
        model_engine, __, __, __ = deepspeed.initialize(
            model=model, model_parameters=parameters, config=ds_config
        )
        self.model_engine = self.context.wrap_model_engine(model_engine)

After the model engine is initialized, you need to register it with Determined by calling wrap_model_engine(). Differing from PyTorchTrial, you do not need to register the optimizer or learning rate scheduler with Determined because both are attributes of the model engine.

If you want to use pipeline parallelism with a given model, pass layers of the model for partitioning to the DeepSpeed PipelineModule before creating the pipeline model engine:

net = ...
net = deepspeed.PipelineModule(
    layers=get_layers(net),
    loss_fn=torch.nn.CrossEntropyLoss(),
    num_stages=args.pipeline_parallel_size,
    ...,
)

Step 2 - Load Data#

The next step is to build the data loader used for training and validation. The same process is used to download the data for PyTorchTrial. Building the data loaders is also similar, except for the batch size specification for the returned data loaders, which differs because the DeepSpeed model engines automatically handle gradient aggregation.

Automatic gradient aggregation in DeepSpeed is specified in configuration fields:

  • train_batch_size

  • train_micro_batch_size

  • gradient_accumulation_steps

which are related as follows:

train_batch_size = train_micro_batch_size * gradient_accumulation_steps * data_parallel_size,

where data_parallel_size is the number of model replicas across all GPUs used during training. Therefore, a single train batch consists of multiple micro batches, specified by the gradient_accumulation_steps argument. Given a model parallelization scheme, you can specify two fields and the third can be inferred.

The DeepSpeed model engines assume the model is processing micro batches and automatically handle stepping the optimizer and learning rate scheduler every gradient_accumulation_steps micro batches. This means that the build_training_data_loader should return batches of size train_micro_batch_size_per_gpu. In most cases, build_validation_data_loader also returns batches of size train_micro_batch_size_per_gpu.

If you want exact epoch boundaries to be respected, the number of micro batches in the training data loader should be divisible by gradient_accumulation_steps.

If you are using pipeline parallelism, the validation data loader needs to have at least gradient_accumulation_steps worth of batches.

Step 3 - Training and Evaluation#

This step covers the training and evaluation routine for the standard data parallel model engine and the pipeline parallel engine available in DeepSpeed.

After you create the DeepSpeed model engine and data loaders, define the training and evaluation routines for the DeepSpeedTrial. Differing from PyTorchTrial, train_batch() and evaluate_batch() take an iterator over the corresponding data loader built from build_training_data_loader() and build_validation_dataloader() instead of a batch.

Data Parallel Training#

For data parallel training, only, the training and evaluation routines are:

def train_batch(
    self,
    dataloader_iter: Optional[Iterator[TorchData]],
    epoch_idx: int,
    batch_idx: int,
) -> Union[torch.Tensor, Dict[str, Any]]:
    inputs = self.context.to_device(next(dataloader_iter))
    loss = self.model_engine(inputs)
    self.model_engine.backward(loss)
    self.model_engine.step()
    return {"loss": loss}


def evaluate_batch(
    self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
) -> Dict[str, Any]:
    inputs = self.context.to_device(next(dataloader_iter))
    loss = self.model_engine(inputs)
    return {"loss": loss}

You need to manually get a batch from the iterator and move it to the GPU using the provided to_device() helper function, which knows the GPU assigned to a given distributed training process.

Pipeline Parallel Training#

When using pipeline parallelism, the forward and backward steps during training are combined into a single function call because DeepSpeed automatically interleaves multiple micro batches for processing in a single training step. In this case, there is no need to manually get a batch from the dataloader_iter iterator because the pipeline model engine requests it as needed while interleaving micro batches:

def train_batch(
    self,
    dataloader_iter: Optional[Iterator[TorchData]],
    epoch_idx: int,
    batch_idx: int,
) -> Union[torch.Tensor, Dict[str, Any]]:
    loss = self.model_engine.train_batch()
    return {"loss": loss}


def evaluate_batch(
    self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
) -> Dict[str, Any]:
    loss = self.model_engine.eval_batch()
    return {"loss": loss}

Fully Sharded Data Parallelism (FSDP)#

To use FSDP, use the PyTorch FSDP package as usual along with the Core API. To see an example of how this works, visit the FSDP + Core API for LLM Training example.

Note

PytorchTrial API does not support FSDP.

For more info about the PyTorch FSDP package, visit PyTorch tutorials > Getting Started with Fully Sharded Data Parallel (FSDP).

Profiling#

Deepspeed experiments can be profiled using PyTorch Profiler and results will automatically be uploaded to TensorBoard (accessible via the Determined UI). To configure profiling, call set_profiler() on the DeepSpeedTrialContext class in the trial’s __init__ method.

set_profiler() is a thin wrapper around PyTorch profiler, torch-tb-profiler. It overrides the on_trace_ready parameter to the Determined TensorBoard path, while all other arguments are passed directly into torch.profiler.profile. Stepping the profiler will be handled automatically during the training loop.

See the PyTorch profiler plugin for details.

The snippet below will profile GPU and CPU usage, skipping batch 1, warming up on batch 2, and profiling batches 3 and 4.

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        ...
        self.context.set_profiler(
             activities=[
                 torch.profiler.ProfilerActivity.CPU,
                 torch.profiler.ProfilerActivity.CUDA,
             ],
             schedule=torch.profiler.schedule(
                 wait=1,
                 warmup=1,
                 active=2
             ),
         )

Note

Though configuring a profiling schedule torch.profiler.schedule is optional, profiling every batch may cause a large amount of data to be uploaded to TensorBoard. This may result in long rendering times for TensorBoard and memory issues. For long-running experiments, it is recommended to configure a profiling schedule.

DeepSpeed Trainer#

With the DeepSpeed Trainer API, you can implement and iterate on model training code locally before running on cluster. When you are satisfied with your model code, you configure and submit the code on cluster.

The DeepSpeed Trainer API lets you do the following:

  • Work locally, iterating on your model code.

  • Debug models in your favorite debug environment (e.g., directly on your machine, IDE, or Jupyter notebook).

  • Run training scripts without needing to use an experiment configuration file.

  • Load previously saved checkpoints directly into your model.

Initializing the Trainer#

After defining the PyTorch Trial, initialize the trial and the trainer. init() returns a DeepSpeedTrialContext for instantiating DeepSpeedTrial. Initialize Trainer with the trial and context.

from determined.pytorch import deepspeed as det_ds

def main():
    with det_ds.init() as train_context:
        trial = MyTrial(train_context)
        trainer = det_ds.Trainer(trial, train_context)

if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(level=logging.INFO, format=det.LOG_FORMAT)
    main()

Training is configured with a call to fit() with training loop arguments, such as checkpointing periods, validation periods, and checkpointing policy.

from determined import pytorch
from determined.pytorch import deepspeed as det_ds

def main():
    with det_ds.init() as train_context:
        trial = MyTrial(train_context)
        trainer = det_ds.Trainer(trial, train_context)
+       trainer.fit(
+           max_length=pytorch.Epoch(10),
+           checkpoint_period=pytorch.Batch(100),
+           validation_period=pytorch.Batch(100),
+           checkpoint_policy="all"
+       )


if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(level=logging.INFO, format=det.LOG_FORMAT)
    main()

Run Your Training Script Locally#

Run training scripts locally without submitting to a cluster or defining an experiment configuration file.

from determined import pytorch
from determined.pytorch import deepspeed as det_ds

def main():
    with det_ds.init() as train_context:
        trial = MyTrial(train_context)
        trainer = det_ds.Trainer(trial, train_context)
        trainer.fit(
            max_length=pytorch.Epoch(10),
            checkpoint_period=pytorch.Batch(100),
            validation_period=pytorch.Batch(100),
            checkpoint_policy="all",
        )


if __name__ == "__main__":
    # Configure logging
    logging.basicConfig(level=logging.INFO, format=det.LOG_FORMAT)
    main()

You can run this Python script directly (python3 train.py), or in a Jupyter notebook. This code will train for ten epochs, checkpointing and validating every 100 batches.

Local Distributed Training#

Local training can utilize multiple GPUs on a single node with a few modifications to the above code.

import deepspeed

def main():
+     # Initialize distributed backend before det_ds.init()
+     deepspeed.init_distributed()
+     # Initialize DistributedContext
      with det_ds.init(
+       distributed=core.DistributedContext.from_deepspeed()
      ) as train_context:
          trial = MyTrial(train_context)
          trainer = det_ds.Trainer(trial, train_context)
          trainer.fit(
              max_length=pytorch.Epoch(10),
              checkpoint_period=pytorch.Batch(100),
              validation_period=pytorch.Batch(100),
              checkpoint_policy="all"
          )

This code can be directly invoked with your distributed backend’s launcher: deepspeed --num_gpus=4 trainer.py --deepspeed --deepspeed_config ds_config.json

Test Mode#

Trainer accepts a test_mode parameter which, if true, trains and validates your training code for only one batch, checkpoints, then exits. This is helpful for debugging code or writing automated tests around your model code.

 trainer.fit(
              max_length=pytorch.Epoch(10),
              checkpoint_period=pytorch.Batch(100),
              validation_period=pytorch.Batch(100),
+             test_mode=True
          )

Prepare Your Training Code for Deploying to a Determined Cluster#

Once you are satisfied with the results of training the model locally, you submit the code to a cluster. This example allows for distributed training locally and on cluster without having to make code changes.

Example workflow of frequent iterations between local debugging and cluster deployment:

 def main():
+   info = det.get_cluster_info()
+   if info is None:
+       # Local: configure local distributed training.
+       deepspeed.init_distributed()
+       distributed_context = core.DistributedContext.from_deepspeed()
+       latest_checkpoint = None
+   else:
+       # On-cluster: Determined will automatically detect distributed context.
+       distributed_context = None
+       # On-cluster: configure the latest checkpoint for pause/resume training functionality.
+       latest_checkpoint = info.latest_checkpoint

+     with det_ds.init(
+       distributed=distributed_context
      ) as train_context:
          trial = DCGANTrial(train_context)
          trainer = det_ds.Trainer(trial, train_context)
          trainer.fit(
              max_length=pytorch.Epoch(11),
              checkpoint_period=pytorch.Batch(100),
              validation_period=pytorch.Batch(100),
+             latest_checkpoint=latest_checkpoint,
          )

To run Trainer API solely on-cluster, the code is simpler:

def main():
    with det_ds.init() as train_context:
        trial_inst = gan_model.DCGANTrial(train_context)
        trainer = det_ds.Trainer(trial_inst, train_context)
        trainer.fit(
            max_length=pytorch.Epoch(11),
            checkpoint_period=pytorch.Batch(100),
            validation_period=pytorch.Batch(100),
            latest_checkpoint=det.get_cluster_info().latest_checkpoint,
        )

Submit Your Trial for Training on Cluster#

To run your experiment on cluster, you’ll need to create an experiment configuration (YAML) file. Your experiment configuration file must contain searcher configuration and entrypoint.

name: dcgan_deepspeed_mnist
searcher:
  name: single
  metric: validation_loss
resources:
  slots_per_trial: 2
entrypoint: python3 -m determined.launch.deepspeed python3 train.py

Submit the trial to the cluster:

det e create det.yaml .

If your training code needs to read some values from the experiment configuration, you can set the data field and read from det.get_cluster_info().trial.user_data or set hyperparameters and read from det.get_cluster_info().trial.hparams.

Profiling#

When training on cluster, you can enable the system metrics profiler by adding a parameter to your fit() call:

 trainer.fit(
    ...,
+   profiling_enabled=True
 )

Known DeepSpeed Constraints#

Some DeepSpeed constraints are inherited concerning supported feature compatibility:

  • Pipeline parallelism can only be combined with Zero Redundancy Optimizer (ZeRO) stage 1.

  • Parameter offloading is only supported with ZeRO stage 3.

  • Optimizer offloading is only supported with ZeRO stage 2 and 3.