det.pytorch.deepspeed API Reference#

User Guide

DeepSpeed API

determined.pytorch.deepspeed.DeepSpeedTrial#

class determined.pytorch.deepspeed.DeepSpeedTrial(context: DeepSpeedTrialContext)#

DeepSpeed trials are created by subclassing this abstract class.

We can do the following things in this trial class:

  • Define the DeepSpeed model engine which includes the model, optimizer, and lr_scheduler.

    In the __init__() method, initialize models and, optionally, optimizers and LR schedulers and pass them to deepspeed.initialize to build the model engine. Then pass the created model engine to wrap_model_engine provided by DeepSpeedTrialContext. We support multiple DeepSpeed model engines if they only use data parallelism or if they use the same model parallel unit.

  • Run forward and backward passes.

    In train_batch(), use the methods provided by the DeepSpeed model engine to perform the backward pass and optimizer step. These methods will differ depending on whether you are using pipeline parallelism or not.

trial_controller_class#

alias of DeepSpeedTrialController

trial_context_class#

alias of DeepSpeedTrialContext

abstract __init__(context: DeepSpeedTrialContext) None#

Initializes a trial using the provided context. The general steps are:

  1. Initialize the model(s) and, optionally, the optimizer and lr_scheduler. The latter two can also be configured using the DeepSpeed config.

  2. Build the DeepSpeed model engine by calling deepspeed.initialize with the model (optionally optimizer and lr scheduler) and a DeepSpeed config. Wrap it with context.wrap_model_engine.

  3. If you want, use a custom model parallel unit by calling context.set_mpu.

  4. If you want, disable automatic gradient accumulation by calling context.disable_auto_grad_accumulation.

  5. If you want, use a custom data loader by calling context.disable_dataset_reproducibility_checks.

Here is a code example.

self.context = context
self.args = AttrDict(self.context.get_hparams())

# Build deepspeed model engine.
model = ... # build model
model_engine, optimizer, lr_scheduler, _ = deepspeed.initialize(
    args=self.args,
    model=model,
)

self.model_engine = self.context.wrap_model_engine(model_engine)
abstract train_batch(dataloader_iter: Optional[Iterator[Union[Dict[str, Tensor], Sequence[Tensor], Tensor]]], epoch_idx: int, batch_idx: int) Union[Tensor, Dict[str, Any]]#

Train one full batch (i.e. train on train_batch_size samples, perhaps consisting of multiple micro-batches).

If training without pipeline parallelism, users should implement this function by doing the following things:

  1. Get a batch from the dataloader_iter and pass it to the GPU.

  2. Compute the loss in the forward pass.

  3. Perform the backward pass.

  4. Perform an optimizer step.

  5. Return training metrics in a dictionary.

Here is a code example.

# Assume one model_engine wrapped in ``__init__``.

batch = self.context.to_device(next(dataloader_iter))
loss = self.model_engine(batch)
self.model_engine.backward(loss)
self.model_engine.step()
return {"loss": loss}

If using gradient accumulation over multiple micro-batches, Determined will automatically call train_batch multiple times according to gradient_accumulation_steps in the DeepSpeed config.

With pipeline parallelism there is no need to manually get a batch from the dataloader_iter and the forward, backward, optimizer steps are combined in the model engine’s train_batch method.

# Assume one model_engine wrapped in ``__init__``.

loss = self.model_engine.train_batch(dataloader_iter)
return {"loss": loss}
Parameters:
  • dataloader_iter (Iterator[torch.utils.data.DataLoader], optional) – iterator over the train DataLoader.

  • epoch_idx (integer) – index of the current epoch among all the batches processed per device (slot) since the start of training.

  • batch_idx (integer) – index of the current batch among all the epochs processed per device (slot) since the start of training.

Returns:

training metrics to return.

Return type:

torch.Tensor or Dict[str, Any]

abstract build_training_data_loader() Optional[DataLoader]#

Defines the data loader to use during training.

Must return an instance of determined.pytorch.DataLoader unless context.disable_dataset_reproducibility_checks is called.

If using data parallel training, the batch size should be per GPU batch size. If using gradient aggregation, the data loader should return batches with train_micro_batch_size_per_gpu samples each.

abstract build_validation_data_loader() Optional[DataLoader]#

Defines the data loader to use during validation.

Must return an instance of determined.pytorch.DataLoader unless context.disable_dataset_reproducibility_checks is called.

If using data parallel training, the batch size should be per GPU batch size. If using gradient aggregation, the data loader should return batches with a desired micro batch size (most of the time this is the same as train_micro_batch_size_per_gpu).

build_callbacks() Dict[str, PyTorchCallback]#

Defines a dictionary of string names to callbacks to be used during training and/or validation.

The string name will be used as the key to save and restore callback state for any callback that defines load_state_dict() and state_dict().

abstract evaluate_batch(dataloader_iter: Optional[Iterator[Union[Dict[str, Tensor], Sequence[Tensor], Tensor]]], batch_idx: int) Dict[str, Any]#

Calculate validation metrics for a batch and return them as a dictionary mapping metric names to metric values. Per-batch validation metrics are averaged to produce a single set of validation metrics for the entire validation set by default.

The metrics returned from this function must be JSON-serializable.

DeepSpeedTrial supports more flexible metrics computation via our custom reducer API, see MetricReducer for more details.

Parameters:

dataloader_iter (Iterator[torch.utils.data.DataLoader], optional) – iterator over the validation DataLoader.

save(context: DeepSpeedTrialContext, path: Path) None#

Save is called on every GPU to make sure all checkpoint shards are saved.

By default, we loop through the wrapped model engines and call DeepSpeed’s save:

for i, m in enumerate(context.models):
    m.save_checkpoint(path, tag=f"model{i}")

This method can be overwritten for more custom save behavior.

load(context: DeepSpeedTrialContext, load_path: Path) None#

By default, we loop through the wrapped model engines and call DeepSpeed’s load.

for i, m in enumerate(context.models):
    m.load_checkpoint(path, tag=f"model{i}")

This method can be overwritten for more custom load behavior.

determined.pytorch.deepspeed.DeepSpeedTrialContext#

class determined.pytorch.deepspeed.DeepSpeedTrialContext(*args: Any, **kwargs: Any)#

Bases: TrialContext, _PyTorchReducerContext

Contains runtime information for any Determined workflow that uses the DeepSpeedTrial API.

With this class, users can do the following things:

  1. Wrap DeepSpeed model engines that contain the model, optimizer, lr_scheduler, etc. This will make sure Determined can automatically provide gradient aggregation, checkpointing and fault tolerance. In contrast to determined.pytorch.PyTorchTrial, the user does not need to wrap optimizer and lr_scheduler as that should all be instead passed to the DeepSpeed initialize function (see https://www.deepspeed.ai/getting-started/#writing-deepspeed-models) when building the model engine.

  2. Overwrite a deepspeed config file or dictionary with values from Determined’s experiment config to ensure consistency in batch size and support hyperparameter tuning.

  3. Set a custom model parallel configuration that should instantiate a determined.pytorch.deepspeed.ModelParallelUnit dataclass. We automatically set the mpu for data parallel and standard pipeline parallel training. This should only be needed if there is additional model parallelism outside DeepSpeed’s supported methods.

  4. Disable data reproducibility checks to allow custom data loaders.

  5. Disable automatic gradient aggregation for non-pipeline-parallel training.

current_train_batch() int#

Current global batch index

disable_auto_grad_accumulation() None#

Prevent the DeepSpeedTrialController from automatically calling train_batch multiple times to process enough micro batches to meet the per slot batch size. Thus, the user is responsible for manually training on enough micro batches in train_batch to meet the expected per slot batch size.

disable_dataset_reproducibility_checks() None#

disable_dataset_reproducibility_checks() allows you to return an arbitrary DataLoader from build_training_data_loader() or build_validation_data_loader().

Normally you would be required to return a det.pytorch.DataLoader instead, which would guarantee that an appropriate Sampler is used that ensures:

  • When shuffle=True, the shuffle is reproducible.

  • The dataset will start at the right location, even after pausing/continuing.

  • Proper sharding is used during distributed training.

However, there can be cases where either reproducibility of the dataset is not needed or where the nature of the dataset can cause the det.pytorch.DataLoader to be unsuitable.

In those cases, you can call disable_dataset_reproducibility_checks() and you will be free to return any torch.utils.data.DataLoader you like. Dataset reproducibility will still be possible, but it will be your responsibility. The Sampler classes in determined.pytorch.samplers can help in this regard.

classmethod from_config(config: Dict[str, Any]) TrialContext#

Create a context object suitable for debugging outside of Determined.

An example for a subclass of DeepSpeedTrial:

config = { ... }
context = det.pytorch.deepspeed.DeepSpeedTrialContext.from_config(config)
my_trial = MyDeepSpeedTrial(context)

train_ds = my_trial.build_training_data_loader()
for epoch_idx in range(3):
    for batch_idx, batch in enumerate(train_ds):
        metrics = my_trial.train_batch(batch, epoch_idx, batch_idx)
        ...

An example for a subclass of TFKerasTrial:

config = { ... }
context = det.keras.TFKerasTrialContext.from_config(config)
my_trial = tf_keras_one_var_model.OneVarTrial(context)

model = my_trial.build_model()
model.fit(my_trial.build_training_data_loader())
eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
Parameters:

config – An experiment config file, in dictionary form.

get_data_config() Dict[str, Any]#

Return the data configuration.

get_enable_tensorboard_logging() bool#

Return whether automatic tensorboard logging is enabled

get_experiment_config() Dict[str, Any]#

Return the experiment configuration.

get_experiment_id() int#

Return the experiment ID of the current trial.

get_hparam(name: str) Any#

Return the current value of the hyperparameter with the given name.

get_hparams() Dict[str, Any]#

Return a dictionary of hyperparameter names to values.

get_stop_requested() bool#

Return whether a trial stoppage has been requested.

get_tensorboard_path() Path#

Get the path where files for consumption by TensorBoard should be written

get_tensorboard_writer() Any#

This function returns an instance of torch.utils.tensorboard.SummaryWriter

Trials users who wish to log to TensorBoard can use this writer object. We provide and manage a writer in order to save and upload TensorBoard files automatically on behalf of the user.

Usage example:

class MyModel(PyTorchTrial):
    def __init__(self, context):
        ...
        self.writer = context.get_tensorboard_writer()

    def train_batch(self, batch, epoch_idx, batch_idx):
        self.writer.add_scalar('my_metric', np.random.random(), batch_idx)
        self.writer.add_image('my_image', torch.ones((3,32,32)), batch_idx)
get_trial_id() int#

Return the trial ID of the current trial.

is_epoch_end() bool#

Returns true if the current batch is the last batch of the epoch.

Warning

Not accurate for variable size epochs.

is_epoch_start() bool#

Returns true if the current batch is the first batch of the epoch.

Warning

Not accurate for variable size epochs.

set_enable_tensorboard_logging(enable_tensorboard_logging: bool) None#

Set a flag to indicate whether automatic upload to tensorboard is enabled.

set_mpu(mpu: ModelParallelUnit) None#

Use a custom model parallel configuration.

The argument mpu should implement a determined.pytorch.deepspeed.ModelParallelUnit dataclass to provide information on data parallel topology and whether a rank should compute metrics/build data loaders.

This should only be needed if training with custom model parallelism.

In the case of multiple model parallel engines, we assume that the MPU and data loaders correspond to the first wrapped model engine.

set_profiler(*args: List[str], **kwargs: Any) None#

set_profiler() is a thin wrapper around PyTorch profiler, torch-tb-profiler. It overrides the on_trace_ready parameter to the determined tensorboard path, while all other arguments are passed directly into torch.profiler.profile. Stepping the profiler will be handled automatically during the training loop.

See the PyTorch profiler plugin for details.

Examples:

Profiling GPU and CPU activities, skipping batch 1, warming up on batch 2, and profiling batches 3 and 4.

self.context.set_profiler(
    activities=[
        torch.profiler.ProfilerActivity.CPU,
        torch.profiler.ProfilerActivity.CUDA,
    ],
    schedule=torch.profiler.schedule(
        wait=1,
        warmup=1,
        active=2
    ),
)
set_stop_requested(stop_requested: bool) None#

Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.

to_device(data: Union[Dict[str, Union[ndarray, Tensor]], Sequence[Union[ndarray, Tensor]], ndarray, Tensor]) Union[Dict[str, Tensor], Sequence[Tensor], Tensor]#

Map data to the device allocated by the Determined cluster.

Since we pass an iterable over the data loader to train_batch and evaluate_batch for DeepSpeedTrial, the user is responsible for moving data to GPU if needed. This is basically a helper function to make that easier.

wrap_model_engine(model: deepspeed.DeepSpeedEngine) deepspeed.DeepSpeedEngine#

Register a DeepSpeed model engine.

In the background, we track the model engine for checkpointing, set batch size information, using the first wrapped model engine, and perform checks to properly handle pipeline parallelism if the model engine is a PipelineEngine.

wrap_reducer(reducer: Union[Callable, MetricReducer], name: Optional[str] = None, for_training: bool = True, for_validation: bool = True) MetricReducer#

Register a custom reducer that will calculate a metric properly, even with distributed training.

During distributed training and evaluation, many types of metrics must be calculated globally, rather than calculating the metric on each shard of the dataset and averaged or summed. For example, an accurate ROC AUC for dataset cannot be derived from the individual ROC AUC metrics calculated on by each worker.

Determined solves this problem by offering fully customizable metric reducers which are distributed-aware. These are registered by calling context.wrap_reducer() and are updated by the user during train_batch() or evaluate_batch().

Parameters:
  • reducer (Union[Callable, pytorch.MetricReducer]) – Either a reducer function or a pytorch.MetricReducer. See below for more details.

  • name – (Optional[str] = None): Either a string name to associate with the metric returned by the reducer, or None to indicate the metric will return a dict mapping string names to metric values. This allows for a single reducer to return many metrics, such as for a per-class mean IOU calculation. Note that if name is a string, the returned metric must NOT be a dict-type metric.

  • for_training – (bool = True): Indicate that the reducer should be used for training workloads.

  • for_validation – (bool = True): Indicate that the reducer should be used for validation workloads.

Return Value:
pytorch.MetricReducer:

If reducer was a function, the returned MetricReducer will have a single user-facing method like def update(value: Any) -> None that you should call during train_batch or evaluate_batch. Otherwise, the return value will just be the reducer that was passed in.

Reducer functions: the simple API

If the reducer parameter is a function, it must have the following properties:

  • It accepts a single parameter, which will be a flat list of all inputs the users pass when they call .update() on the object returned by wrap_reducer(). See the code example below for more details.

  • It returns either a single (non-dict) metric or a dictionary mapping names to metrics, as described above.

The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network before they are reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy can be useful for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.

For example, ROC AUC could be properly calculated by passing a small wrapper function calling sklearn.metrics.roc_auc_score:

# Custom reducer function.
def roc_auc_reducer(values):
    # values will be a flat list of all inputs to
    # .update(), which in this code example are
    # tuples of (y_true, y_score).  We reshape
    # that list into two separate lists:
    y_trues, y_scores = zip(*values)

    # Then we return a metric value:
    return sklearn.metrics.roc_auc_score(
        np.array(y_trues), np.array(y_scores)
    )

class MyPyTorchTrial(PyTorchTrial):
    def __init__(self, context):
        self.roc_auc = context.wrap_reducer(
            roc_auc_reducer, name="roc_auc"
        )
        ...

    def evaluate_batch(self, batch):
        ...
        # Function-based reducers are updated with .update().
        # The roc_auc_reducer function will get a list of all
        # inputs that we pass in here:
        self.roc_auc.update((y_true, y_score))

        # The "roc_auc" metric will be included in the final
        # metrics after the workload has completed; no need
        # to return it here.  If that is your only metric,
        # just return an empty dict.
        return {}

MetricReducer objects: the advanced API

The primary motivation for passing a det.pytorch.MetricReducer as the reducer is performance. det.pytorch.MetricReducer allows the user more control in how values are stored and exposes a per_slot_reduce() call which lets users minimize the cost of the network communication before the final cross_slot_reduce().

An additional reason for using the det.pytorch.MetricReducer is for flexibility of the update mechanism, which is completely user-defined when subclassing MetricReducer.

For the full details and a code example, see: MetricReducer.

determined.pytorch.deepspeed.overwrite_deepspeed_config#

determined.pytorch.deepspeed.overwrite_deepspeed_config(base_ds_config: Union[str, Dict], source_ds_dict: Dict[str, Any]) Dict[str, Any]#

Overwrite a base_ds_config with values from a source_ds_dict.

You can use source_ds_dict to overwrite leaf nodes of the base_ds_config. More precisely, we will iterate depth first into source_ds_dict and if a node corresponds to a leaf node of base_ds_config, we copy the node value over to base_ds_config.

Parameters:
  • base_ds_config (str or Dict) – either a path to a DeepSpeed config file or a dictionary.

  • source_ds_dict (Dict) – dictionary with fields that we want to copy to base_ds_config

Returns:

The resulting dictionary when base_ds_config is overwritten with source_ds_dict.

determined.pytorch.deepspeed.ModelParallelUnit#

class determined.pytorch.deepspeed.ModelParallelUnit(data_parallel_rank: int, data_parallel_world_size: int, should_report_metrics: bool, should_build_data_loader: bool)#

This class contains the functions we expect in order to accurately carry out parallel training. For custom model parallel training, you need to subclass and override the functions before passing it to the DeepSpeedTrialContext by calling context.wrap_mpu(mpu).

The following classes and methods overlap with PyTorchTrial (click to go to respective documentation):