Advanced Usage#

Training Multiple Model Engines#

If the model engines use the same ModelParallelUnit, you can train multiple model engines in a single DeepSpeedTrial by calling wrap_model_engine() on additional model engines you want to use, and by modifying train_batch() and evaluate_batch() accordingly.

The accounting for number of samples is with respect to the train_batch_size for the first model engine passed to wrap_model_engine().

For more advanced cases where model engines have different model parallel topologies, contact support on the Determined community Slack.

Custom Reducers#

Determined supports arbitrary training and validation metrics reduction, including during distributed training, by letting you define custom reducers. Custom reducers can be a function or an implementation of the determined.pytorch.MetricReducer interface. See determined.pytorch.PyTorchTrialContext.wrap_reducer() for more information.

Manual Distributed Backend Initialization#

By default, DeepSpeedTrial initializes the distributed backend by calling deepspeed.init_distributed before a trial is created. This initializes the torch.distributed backend to use the NVIDIA Collective Communications Library (NCCL). If you want to customize the distributed backend initialization, set the DET_MANUAL_INIT_DISTRIBUTED environment variable in your experiment configuration:

environment:
  environment_variables:
    - DET_MANUAL_INIT_DISTRIBUTED=1

Manual Gradient Aggregation#

DeepSpeedTrial automatically ensures a total of train_batch_size samples are processed in each training iteration. With the assumption that train_batch() calls the model engine’s forward, backward, and optimizer step methods once, DeepSpeedTrial calls train_batch():

  • gradient_accumulation_steps times when not using pipeline parallelism

  • once when using pipeline parallelism

to reach model_engine.train_batch_size() for the first wrapped model engine.

To disable this behavior, call disable_auto_grad_accumulation() in the __init__() method of DeepSpeedTrial. In this case, make sure the first model engine processes train_batch_size samples in each call to train_batch().

Custom Data Loaders#

By default, build_training_data_loader() and build_validation_data_loader() are expected to return a determined.pytorch.DataLoader, which is a thin wrapper around torch.utils.data.DataLoader that supports reproducibility and data sharding for distributed training.

Override this requirement and return a torch.utils.data.DataLoader by setting disable_dataset_reproducibility_checks(). Review customizing a reproducible dataset for recommended best practices when using a custom data loader.

A common use case for a custom data loader is if you created the data loader when building the model engine as show in this example:

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        self.args = AttrDict(self.context.get_hparams())

        training_data = ...
        model = Net(self.args)
        parameters = filter(lambda p: p.requires_grad, model.parameters())

        model_engine, __, __, self.train_dataloader = deepspeed.initialize(
            args=self.args,
            model=model,
            model_parameters=parameters,
            training_data=training_data,
        )
        self.model_engine = self.context.wrap_model_engine(model_engine)

    def build_training_data_loader(self) -> torch.utils.data.DataLoader:
        return self.train_dataloader

Custom Model Parallelism#

DeepSpeedTrial relies on a ModelParallelUnit to provide data parallel world size and to determine whether a GPU slot should build the data loaders and report metrics. For data parallel training with DeepSpeed, the data parallel world size is equal to the number of GPU slots and all GPU slots build the data loaders and report metrics. If the model engine passed to wrap_model_engine() is a PipelineEngine, the ModelParallelUnit is built using the MPU associated with the model engine. To change this behavior to support custom model parallelism, pass a custom set_mpu as shown in the following example:

context.set_mpu(
    ModelParallelUnit(
        data_parallel_rank=[fill in],
        data_parallel_world_size=[fill in],
        should_report_metrics=[fill in],
        should_build_dataloader=[fill in]
    )
)