det.keras API Reference#

User Guide

Keras API

determined.keras.DeterminedCallback#

class determined.keras.DeterminedCallback(core_context: Context, checkpoint: Optional[str], continue_id: Union[int, str], *, train_metrics_report_period: Union[int, str] = 'epoch', checkpoint_epochs: int = 1)#

DeterminedCallback adds Determined tracking, checkpointing, pausing, and restoring to a Keras model.fit() call. Just include it as one of your callbacks.

DeterminedCallback must not be used with a BackupAndRestore callback or a ModelCheckpoint callback, which have conflicting behaviors.

When using DeterminedCallback:
  • The initial_epoch parameter to model.fit() will be overridden. Rely on the checkpoint and continue_id parameters to DeterminedCallback instead.

  • Checkpoints are saved and uploaded to Determined’s checkpoint storage every epoch by default, but can be saved less frequently based on the checkpoint_epochs parameter. Checkpoints are always saved when training finishes or is preempted.

  • Training will check for preemption every epoch. This means, for instance, if you click the “pause” button in the UI, training will continue until the next epoch boundary.

  • The normal verbose=1 TQDM progress bars are replaced with a more log-friendly output.

  • By default, checkpoints are saved with model.save_weights() and restored with model.load_weights(). This is configurable by subclassing DeterminedCallback and implementing custom save_model and load_model methods.

  • By default, weights are saved to the path model_checkpoint inside the checkpoint directory, which you can pass to model.load_weights() to load a trained model from a downloaded checkpoint after training is complete.

Parameters:
  • core_context – the result of a det.core.init() call

  • checkpoint – Either None, or a checkpoint uuid to start from. When you are training on-cluster, this is likely the output of det.get_cluster_info().latest_checkpoint.

  • continue_id – A unique identifier that is saved with the checkpoint. When you are training on-cluster, this is likely the output of det.get_cluster_info().trial.trial_id. When loading an existing checkpoint, if the provided continue_id matches what was in the checkpoint, training will continue from the epoch where it left off (a pause-and-unpause scenario). If the provided continue_id does not match the checkpoint, the model weights will be loaded but training will begin from epoch=0 (a warm-start scenario).

  • train_metrics_report_period – Either the string "epoch" or a number of batches to wait between reporting training metrics to Determined master. Default: "epoch".

  • checkpoint_epochs – Save every N epochs. Checkpoints are always saved when training is preempted, or at the end of training. A value of 0 means to only save at those times. Default: 1.

__init__(core_context: Context, checkpoint: Optional[str], continue_id: Union[int, str], *, train_metrics_report_period: Union[int, str] = 'epoch', checkpoint_epochs: int = 1) None#
save_model(model: Model, path: str, distributed: DistributedContext) None#

Users can subclass this if they need to customize how they save their model.

This method is responsible for meeting the requirements of checkpointing according to the needs of the active Strategy.

See the TensorFlow docs for more details.

Parameters:
  • model – the model to save

  • path – the destination to save to

  • distributed – the value of core_context.distributed, which can be used for detecting the current process’s rank, or inter-worker coordination, as needed.

load_model(model: Model, path: str, distributed: DistributedContext) None#

Users can subclass this if they need to customize how they load their model.

Parameters:
  • model – the model to load

  • path – the destination to load from

  • distributed – the value of core_context.distributed, which can be used for detecting the current process’s rank, or inter-worker coordination, as needed.

determined.keras.TensorBoard#

class determined.keras.TensorBoard(*args, **kwargs)#

This is a thin wrapper over the TensorBoard callback that ships with tf.keras. For more information, see the TensorBoard Guide or the upstream docs for tf.keras.callbacks.TensorBoard.

Note that if a log_dir argument is passed to the constructor, it will be ignored if the core_context is configured for tensorboard (which is the default when on-cluster).

Deprecated APIs#

The following APIs have been deprecated as of Determined 0.38.0 and will be removed in a future version. Please migrate your TFKerasTrial-based training to use the new DeterminedCallback instead.

determined.keras.TFKerasTrial#

class determined.keras.TFKerasTrial(context: TFKerasTrialContext)#

To implement a new tf.keras trial, subclass this class and implement the abstract methods described below (build_model(), build_training_data_loader(), and build_validation_data_loader()). In most cases you should provide a custom __init__() method as well.

By default, experiments use TensorFlow 2.x. To configure your trial to use legacy TensorFlow 1.x, specify a TensorFlow 1.x image in the environment.image field of the experiment configuration (e.g., determinedai/environments:cuda-10.2-pytorch-1.7-tf-1.15-gpu-0.21.2).

Trials default to using eager execution with TensorFlow 2.x but not with TensorFlow 1.x. To override the default behavior, call the appropriate function at the top of your code. For example, if you want to disable eager execution while using TensorFlow 2.x, call tf.compat.v1.disable_eager_execution after your import statements. If you are using TensorFlow 1.x in eager mode, please add experimental_run_tf_function=False to your model compile function.

Warning

TFKerasTrial has been deprecated in Determined 0.38.0 and will be removed in a future version. Please use the new DeterminedCallback for training.

__init__(context: TFKerasTrialContext) None#

Initializes a trial using the provided context.

This method should typically be overridden by trial definitions: at minimum, it is important to store context as an instance variable so that it can be accessed by other methods of the trial class. This can also be a convenient place to initialize other state that is shared between methods.

abstract build_model() Model#

Returns the deep learning architecture associated with a trial. The architecture might depend on the current values of the model’s hyperparameters, which can be accessed via context.get_hparam(). This function returns a tf.keras.Model object.

After constructing the tf.keras.Model object, users must do two things before returning it:

  1. Wrap the model using context.wrap_model().

  2. Compile the model using model.compile().

abstract build_training_data_loader() Union[Sequence, DatasetV2, SequenceAdapter, tuple]#

Defines the data loader to use during training.

Should return one of the following:

1) A tuple (x_train, y_train), where x_train is a NumPy array (or array-like), a list of arrays (in case the model has multiple inputs), or a dict mapping input names to the corresponding array, if the model has named inputs. y_train should be a NumPy array.

2) A tuple (x_train, y_train, sample_weights) of NumPy arrays.

3) A tf.data.Dataset returning a tuple of either (inputs, targets) or (inputs, targets, sample_weights).

4) A keras.utils.Sequence returning a tuple of either (inputs, targets) or (inputs, targets, sample weights).

When using tf.data.Dataset, you must wrap the dataset using determined.keras.TFKerasTrialContext.wrap_dataset(). This wrapper is used to shard the dataset for distributed training. For optimal performance, users should wrap a dataset immediately after creating it.

Warning

If you are using tf.data.Dataset, Determined’s support for automatically checkpointing the dataset does not currently work correctly. This means that resuming workloads will start from the beginning of the dataset if using tf.data.Dataset.

abstract build_validation_data_loader() Union[Sequence, DatasetV2, SequenceAdapter, tuple]#

Defines the data loader to use during validation.

Should return one of the following:

1) A tuple (x_val, y_val), where x_val is a NumPy array (or array-like), a list of arrays (in case the model has multiple inputs), or a dict mapping input names to the corresponding array, if the model has named inputs. y_val should be a NumPy array.

2) A tuple (x_val, y_val, sample_weights) of NumPy arrays.

3) A tf.data.Dataset returning a tuple of either (inputs, targets) or (inputs, targets, sample_weights).

4) A keras.utils.Sequence returning a tuple of either (inputs, targets) or (inputs, targets, sample weights).

When using tf.data.Dataset, you must wrap the dataset using determined.keras.TFKerasTrialContext.wrap_dataset(). This wrapper is used to shard the dataset for distributed training. For optimal performance, users should wrap a dataset immediately after creating it.

session_config() ConfigProto#

Specifies the tf.ConfigProto to be used by the TensorFlow session. By default, tf.ConfigProto(allow_soft_placement=True) is used.

keras_callbacks() List[Callback]#

Specifies a list of determined.keras.callback.Callback objects to be used during training.

determined.keras.TFKerasTrialContext#

class determined.keras.TFKerasTrialContext(*arg: Any, **kwarg: Any)#

Base context class that contains runtime information for any Determined workflow that uses the tf.keras API.

TFKerasTrialContext always has a DistributedContext accessible via context.distributed for information related to distributed training.

TFKerasTrialContext always has a TFKerasExperimentalContext accessible via context.experimental for information related to experimental features.

get_global_batch_size() int#

Return the global batch size.

get_per_slot_batch_size() int#

Return the per-slot batch size. When a model is trained with a single GPU, this is equal to the global batch size. When multi-GPU training is used, this is equal to the global batch size divided by the number of GPUs used to train the model.

configure_fit(verbose: ~typing.Optional[bool] = None, class_weight: ~typing.Any = <determined.keras._tf_keras_context._ArgNotProvided object>, workers: ~typing.Optional[int] = None, use_multiprocessing: ~typing.Optional[bool] = None, max_queue_size: ~typing.Optional[bool] = None, shuffle: ~typing.Optional[bool] = None, validation_steps: ~typing.Any = <determined.keras._tf_keras_context._ArgNotProvided object>) None#

Configure parameters of model.fit(). See the Keras documentation for the meaning of each parameter.

Note that the output of verbose=True will be visually different in Determined than with Keras, for better rendering in trial logs.

Note that if configure_fit() is called multiple times, any keyword arguments which are not provided in the second call will not overwrite any settings configured by the first call.

Usage Example

class MyTFKerasTrial(det.keras.TFKerasTrial):
    def __init__(self, context):
        ...
        self.context.configure_fit(verbose=False, workers=5)

        # It is safe to call configure_fit() multiple times.
        self.context.configure_fit(use_multiprocessing=True)
wrap_dataset(dataset: Any, shard_dataset: bool = True) Any#

This should be used to wrap tf.data.Dataset objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their dataset. If users create multiple datasets (e.g., one for training and one for validation), users should wrap each dataset independently.

Parameters:
  • dataset – tf.data.Dataset

  • shard_dataset – When performing multi-slot (distributed) training, this controls whether the dataset is sharded so that each training process (one per slot) sees unique data. If set to False, users must manually configure each process to use unique data.

wrap_optimizer(optimizer: Optimizer) Optimizer#

This should be user to wrap tf.keras.optimizers.Optimizer objects. Users should use the output use the output of this wrapper as the new instance of their optimizer. If users create multiple optimizers, users should wrap each optimizer independently.

Parameters:

optimizer – tf.keras.optimizers.Optimizer

wrap_model(model: Any) Any#

This should be used to wrap tf.keras.Model objects immediately after they have been created but before they have been compiled. This function takes a tf.keras.Model and returns a wrapped version of the model; the return value should be used in place of the original model.

Parameters:

model – tf.keras.Model

classmethod from_config(config: Dict[str, Any]) TrialContext#

Create a context object suitable for debugging outside of Determined.

An example for a subclass of DeepSpeedTrial:

config = { ... }
context = det.pytorch.deepspeed.DeepSpeedTrialContext.from_config(config)
my_trial = MyDeepSpeedTrial(context)

train_ds = my_trial.build_training_data_loader()
for epoch_idx in range(3):
    for batch_idx, batch in enumerate(train_ds):
        metrics = my_trial.train_batch(batch, epoch_idx, batch_idx)
        ...

An example for a subclass of TFKerasTrial:

config = { ... }
context = det.keras.TFKerasTrialContext.from_config(config)
my_trial = tf_keras_one_var_model.OneVarTrial(context)

model = my_trial.build_model()
model.fit(my_trial.build_training_data_loader())
eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
Parameters:

config – An experiment config file, in dictionary form.

get_data_config() Dict[str, Any]#

Return the data configuration.

get_experiment_config() Dict[str, Any]#

Return the experiment configuration.

get_experiment_id() int#

Return the experiment ID of the current trial.

get_hparam(name: str) Any#

Return the current value of the hyperparameter with the given name.

get_hparams() Dict[str, Any]#

Return a dictionary of hyperparameter names to values.

get_stop_requested() bool#

Return whether a trial stoppage has been requested.

get_tensorboard_path() Path#

Get the path where files for consumption by TensorBoard should be written

get_trial_id() int#

Return the trial ID of the current trial.

set_stop_requested(stop_requested: bool) None#

Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.

determined.keras.TFKerasTrialContext.distributed#

class determined.core._distributed.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)

DistributedContext provides useful methods for effective distributed training.

A DistributedContext has the following required args:
  • rank: the index of this worker in the entire job

  • size: the number of workers in the entire job

  • local_rank: the index of this worker on this machine

  • local_size: the number of workers on this machine

  • cross_rank: the index of this machine in the entire job

  • cross_size: the number of machines in the entire job

Additionally, any time that cross_size > 1, you must also provide:
  • chief_ip: the ip address to reach the chief worker (where rank==0)

Note

DistributedContext has .allgather(), .gather(), and .broadcast() methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.

classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) DistributedContext

Create a DistributedContext using the provided hvd module to determine rank information.

Example:

import horovod.torch as hvd
hvd.init()
distributed = DistributedContext.from_horovod(hvd)

The IP address for the chief worker is required whenever hvd.cross_size() > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_deepspeed(chief_ip: Optional[str] = None) DistributedContext

Create a DistributedContext using the standard deepspeed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_torch_distributed(chief_ip: Optional[str] = None) DistributedContext

Create a DistributedContext using the standard torch distributed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_tf_config() Tuple[DistributedContext, tensorflow.distribute.Strategy]

Create a DistributedContext and a tf.distribute.Strategy based on the TF_CONFIG environment variable.

Note that the determined.launch.tensorflow launcher will automatically create a TF_CONFIG environment variable for on-cluster training, so you may not need to configure it yourself.

Presently, the only supported configurations are:
  • MultiMirroredWorkerStrategy, when there are multiple nodes participating in training.

  • Mirrored strategy, when there is one node but multiple GPUs for training.

  • The default strategy otherwise.

get_rank() int

Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.

get_local_rank() int

Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.

get_size() int

Return the number of slots this trial is running on.

get_num_agents() int

Return the number of agents this trial is running on.

gather(stuff: Any) Optional[List]

Gather stuff to the chief. The chief returns a list of all stuff, and workers return None.

gather() is not a replacement for the gather functionality of your distributed training framework.

gather_local(stuff: Any) Optional[List]

Gather stuff to the local chief. The local chief returns a list of all stuff, and local workers return None.

gather_local() is not a replacement for the gather functionality of your distributed training framework.

allgather(stuff: Any) List

Gather stuff to the chief and broadcast all of it back to the workers.

allgather() is not a replacement for the allgather functionality of your distributed training framework.

allgather_local(stuff: Any) List

Gather stuff to the local chief and broadcast all of it back to the local workers.

allgather_local() is not a replacement for the allgather functionality of your distributed training framework.

broadcast(stuff: Any) Any

Every worker gets the stuff sent by the chief.

broadcast() is not a replacement for the broadcast functionality of your distributed training framework.

broadcast_local(stuff: Optional[Any] = None) Any

Every worker gets the stuff sent by the local chief.

broadcast_local() is not a replacement for the broadcast functionality of your distributed training framework.

determined.keras.TFKerasExperimentalContext#

class determined.keras.TFKerasExperimentalContext#

Bases: object

Context class that contains experimental runtime information and features for any Determined workflow that uses the tf.keras API.

TFKerasExperimentalContext extends TFKerasTrialContext under the context.experimental namespace.

determined.keras.callbacks#

class determined.keras.callbacks.Callback#

A Determined subclass of the tf.keras.callbacks.Callback interface which supports additional new callbacks.

Warning

The following behaviors differ between normal Keras operation and Keras operation within Determined:

  • Keras calls on_epoch_end at the end of the training dataset, but Determined calls it based on the records_per_epoch setting in the experiment config.

  • Keras calls on_epoch_end with training and validation logs, but Determined does not schedule training or validation around epochs in general, so Determined cannot guarantee that those values are available for on_epoch_end calls. As a result, on_epoch_end will be called with an empty dictionary for its logs.

  • Keras does not support stateful callbacks, but Determined does. Therefore:

    The Determined versions are based around on_test_end rather than on_epoch_end, which can be influenced by setting min_validation_period in the experiment configuration.

Warning

det.keras.callbacks.Callback has been deprecated in Determined 0.38.0 and will be removed in a future version. This Callback class is designed to work with TFKerasTrial, which is also deprecated. Please use the new DeterminedCallback for training, and use normal keras Callabacks with it.

get_state() Any#

get_state should return a pickleable object that represents the state of this callback.

When training is continued from a checkpoint, the value returned from get_state() will be passed back to the Callback object via load_state().

load_state(state: Any) None#

load_state should accept the exact pickleable object returned by get_state to restore the internal state of a stateful Callback as it was when load_state was called.

on_checkpoint_end(checkpoint_dir: str) None#

on_checkpoint_end is called after a checkpoint is finished, and allows users to save arbitrary files alongside the checkpoint.

Parameters:

checkpoint_dir – The path to the checkpoint_dir where new files may be added.

on_train_workload_begin(total_batches_trained: int, batches_requested: Optional[int], logs: Dict) None#

on_train_workload_begin is called before a chunk of model training. The number of batches in the workload may vary, but will not exceed the scheduling_unit setting for the experiment.

Parameters:
  • total_batches_trained – The number of batches trained at the start of the workload.

  • batches_requested – The number of batches expected to train during the workload.

  • logs – a dictionary (presently always an empty dictionary)

on_train_workload_end(total_batches_trained: int, logs: Dict) None#

on_train_workload_end is called after a chunk of model training.

Parameters:
  • total_batches_trained – The number of batches trained at the end of the workload.

  • logs – a dictionary of training metrics aggregated during this workload.

class determined.keras.callbacks.EarlyStopping(*arg: Any, **kwarg: Any)#

EarlyStopping behaves exactly like the tf.keras.callbacks.EarlyStopping except that it checks after every on_test_end() rather than every on_epoch_end() and it can save and restore its state after pauses in training.

Therefore, part of configuring the Determined implementation of EarlyStopping is to configure min_validation_period for the experiment appropriately (likely it should be configured to validate every epoch).

In Determined, on_test_end may be called slightly more often than min_validation_period during some types of hyperparameter searches, but it is unlikely for that to occur often enough have a meaningful impact on this callback’s operation.

Warning

EarlyStopping has been deprecated in Determined 0.38.0 and will be removed in a future version. Determined’s EarlyStopping is a customization of keras’ EarlyStopping callback that is specific to TFKerasTrial, which is also deprecated. Please use the new DeterminedCallback for training, and use keras’ EarlyStopping with it.

class determined.keras.callbacks.ReduceLROnPlateau(*arg: Any, **kwarg: Any)#

ReduceLROnPlateau behaves exactly like the tf.keras.callbacks.ReduceLROnPlateau except that it checks after every on_test_end() rather than every on_epoch_end() and it can save and restore its state after pauses in training.

Therefore, part of configuring the Determined implementation of ReduceLROnPlateau is to configure min_validation_period for the experiment appropriately (likely it should be configured to validate every epoch).

In Determined, on_test_end may be called slightly more often than min_validation_period during some types of hyperparameter searches, but it is unlikely for that to occur often enough have a meaningful impact on this callback’s operation.

Warning

ReduceLROnPlateau has been deprecated in Determined 0.38.0 and will be removed in a future version. Determined’s ReduceLROnPlateau is a customization of keras’ ReduceLROnPlateau callback that is specific to TFKerasTrial, which is also deprecated. Please use the new DeterminedCallback for training, and use keras’ ReduceLROnPlateau with it.

class determined.keras.callbacks.TensorBoard(*args, **kwargs)#

This is a thin wrapper over the TensorBoard callback that ships with tf.keras. For more information, see the TensorBoard Guide or the upstream docs for tf.keras.callbacks.TensorBoard.

Note that if a log_dir argument is passed to the constructor, it will be ignored.

Warning

det.keras.callbacks.TensorBoard has been deprecated in Determined 0.38.0 and will be removed in a future version. This version of keras’ TensorBoard callback is designed to work with TFKerasTrial, which is also deprecated. Please use the new DeterminedCallback for training, and use the new det.keras.TensorBoard with it.

determined.keras.load_model_from_checkpoint_path#

class determined.keras.load_model_from_checkpoint_path(path: str, tags: Optional[List[str]] = None)#

Loads a checkpoint written by a TFKerasTrial.

You should have already downloaded the checkpoint files, likely with Checkpoint.download().

The return type is a TensorFlow AutoTrackable object.

Parameters:
  • path (string) – Top level directory to load the checkpoint from.

  • tags (list string, optional) – Specifies which tags are loaded from the TensorFlow SavedModel. See documentation for tf.compat.v1.saved_model.load_v2.

Warning

load_model_from_checkpoint_path has been deprecated in Determined 0.38.0 and will be removed in a future version. This function is designed to work with TFKerasTrial, which is also deprecated. Please use the new DeterminedCallback for training instead, which allows you to use model.load_weights() to restore from checkpoints.