diff --git a/cv/classification/resnet50/tensorflow/benchmark_cnn.py b/cv/classification/resnet50/tensorflow/benchmark_cnn.py index 7f6c1db4c5ee81194c43a463c5bfad1235b0a04f..816c4d50a9721c9e13040ba5570d377963744306 100644 --- a/cv/classification/resnet50/tensorflow/benchmark_cnn.py +++ b/cv/classification/resnet50/tensorflow/benchmark_cnn.py @@ -2396,6 +2396,7 @@ class BenchmarkCNN(object): done_fn = lambda: local_step >= end_local_step else: done_fn = global_step_watcher.done + print(f"total_local_step: {end_local_step}") if self.params.debugger is not None: if self.params.debugger == 'cli': log_fn('The CLI TensorFlow debugger will be used.') diff --git a/cv/classification/resnet50/tensorflow/convnet_builder.py b/cv/classification/resnet50/tensorflow/convnet_builder.py index 9903de9247e7401b2982bb061fb6f4bdce7be179..52dd02058344bd52729905df58d22751c95397e8 100644 --- a/cv/classification/resnet50/tensorflow/convnet_builder.py +++ b/cv/classification/resnet50/tensorflow/convnet_builder.py @@ -24,12 +24,12 @@ import contextlib import numpy as np import tensorflow.compat.v1 as tf - +tf.disable_v2_behavior() # pylint: disable=g-direct-tensorflow-import import mlperf from tensorflow.python.layers import convolutional as conv_layers from tensorflow.python.layers import core as core_layers -from tensorflow.python.layers import normalization as normalization_layers +from tensorflow.keras.layers import BatchNormalization as BatchNormalizationLayer from tensorflow.python.layers import pooling as pooling_layers from tensorflow.python.training import moving_averages @@ -173,6 +173,7 @@ class ConvNetBuilder(object): kernel_initializer = tf.variance_scaling_initializer() name = 'conv' + str(self.counts['conv']) self.counts['conv'] += 1 + # print(f"scope_name: {name}") with tf.variable_scope(name): strides = [1, d_height, d_width, 1] if self.data_format == 'NCHW': @@ -466,19 +467,15 @@ class ConvNetBuilder(object): center = True with tf.variable_scope(name) as scope: if self.use_tf_layers: - layer_obj = normalization_layers.BatchNormalization( + bn_layer = BatchNormalizationLayer( momentum=decay, scale=scale, + center=center, epsilon=epsilon, - fused=True, axis=_data_format_to_channel_axis[self.data_format], - # We pass this 'scope' argument for compatibility with checkpoints - # created with the contrib version of batch norm. tf_cnn_benchmarks - # used to use the contrib version. - _scope=scope, - center=center, - name=scope.name) - bn = layer_obj.apply(input_layer, training=self.phase_train) + # name=scope.name.replace('/', '_') + ) + bn = bn_layer(input_layer, training=self.phase_train) else: bn = self._batch_norm_without_layers(input_layer, decay, scale, epsilon) self.top_layer = bn diff --git a/cv/classification/resnet50/tensorflow/models/model.py b/cv/classification/resnet50/tensorflow/models/model.py index 3db13081917f9582704428c6c26956cbd652ae77..c45e8f620a9cb29f808644000d9b84d6013d72a2 100644 --- a/cv/classification/resnet50/tensorflow/models/model.py +++ b/cv/classification/resnet50/tensorflow/models/model.py @@ -306,6 +306,23 @@ class CNNModel(Model): logits = tf.cast(logits, tf.float32) if aux_logits is not None: aux_logits = tf.cast(aux_logits, tf.float32) + + # 将 normalizization 相关的变量从 GLOBAL_VARIABLES 移动到 LOCAL_VARIABLES,这是因为normalization相关的层定义在tf2中已经不再支持, + # 只能使用tf2的接口BatchNormalizationLayer,而当前的代码自动移动上下文 OverrideToLocalVariableIfNotPsVar 捕获不了之,在这里手动移动。 + global_collection = tf.get_default_graph().get_collection_ref(tf.GraphKeys.GLOBAL_VARIABLES) + local_collection = tf.get_default_graph().get_collection_ref(tf.GraphKeys.LOCAL_VARIABLES) + for v in tf.global_variables(): + # print(f"global v.name: {v.name}") + if 'normalization' in v.name: + global_collection.remove(v) + local_collection.append(v) + + # for v in tf.local_variables(): + # print(f"local v.name: {v.name}") + + # for v in tf.trainable_variables(): + # print(f"trainable v.name: {v.name}") + return BuildNetworkResult( logits=logits, extra_info=None if aux_logits is None else aux_logits) diff --git a/cv/classification/resnet50/tensorflow/models/model_config.py b/cv/classification/resnet50/tensorflow/models/model_config.py index 1a31dc6233a71f7609668362a24360b74a6e2262..84f2f00f8223b5033be6f49af23ed157856c466f 100644 --- a/cv/classification/resnet50/tensorflow/models/model_config.py +++ b/cv/classification/resnet50/tensorflow/models/model_config.py @@ -32,7 +32,7 @@ from models import overfeat_model from models import resnet_model from models import trivial_model from models import vgg_model -from models.experimental import deepspeech +# from models.experimental import deepspeech from models.experimental import official_ncf_model @@ -112,8 +112,8 @@ def _get_model_map(dataset_name): return _model_name_to_cifar_model elif dataset_name in ('imagenet', 'synthetic', 'imagenette'): return _model_name_to_imagenet_model - elif dataset_name == 'librispeech': - return {'deepspeech2': deepspeech.DeepSpeech2Model} +# elif dataset_name == 'librispeech': + # return {'deepspeech2': deepspeech.DeepSpeech2Model} elif dataset_name == 'coco': return _model_name_to_object_detection_model else: diff --git a/cv/classification/resnet50/tensorflow/run_train_distributed_imagenette.sh b/cv/classification/resnet50/tensorflow/run_train_distributed_imagenette.sh index b4c1bfd1d99de90bb751eb07adfea086c82f0669..fcf39ec1b570a903ba52d83770d7b604841cacab 100644 --- a/cv/classification/resnet50/tensorflow/run_train_distributed_imagenette.sh +++ b/cv/classification/resnet50/tensorflow/run_train_distributed_imagenette.sh @@ -54,14 +54,14 @@ check_status() ################################################# # Prepare devices ################################################# -devices=$CUDA_VISIBLE_DEVICES -if [ -n "$devices" ]; then - devices=(${devices//,/ }) - num_devices=${#devices[@]} -else - devices=(0 1) - num_devices=2 -fi +# devices=$CUDA_VISIBLE_DEVICES +# if [ -n "$devices" ]; then +# devices=(${devices//,/ }) +# num_devices=${#devices[@]} +# else +devices=(0 1) +num_devices=2 +# fi echo "CUDA_VISIBLE_DEVICES: ${CUDA_VISIBLE_DEVICES}" echo "num_devices: ${num_devices}" @@ -122,25 +122,27 @@ do fi if [ "${i}" == "${last_device}" ]; then - CUDA_VISIBLE_DEVICES=${device} UMD_WAITAFTERLAUNCH=1 python3 -u tf_cnn_benchmarks.py\ + echo "device: ${device}" + UMD_WAITAFTERLAUNCH=1 python3 -u tf_cnn_benchmarks.py\ --data_name=imagenette --data_dir=${DATA_DIR}\ --data_format=NCHW \ --optimizer=${OPTIMIZER} --datasets_use_prefetch=False\ --local_parameter_device=gpu --num_gpus=${num_devices}\ --batch_size=${BATCH_SIZE} --model=${model} \ --variable_update=distributed_replicated \ - --job_name=${job_name} --ps_hosts=127.0.0.1:50000 --worker_hosts="${worker_hosts}"\ + --job_name=${job_name} --ps_hosts=127.0.0.1:40000 --worker_hosts="${worker_hosts}"\ --train_dir=${TRAIN_DIR} --task_index=${task_index} --print_training_accuracy=True "${new_args[@]}" 2>&1 | tee ${LOG_DIR}/${DATE}_${TRAIN_EPOCHS}_${BATCH_SIZE}_${OPTIMIZER}.log; [[ ${PIPESTATUS[0]} == 0 ]] || exit echo "Distributed training PID ($!) on device ${device} where job name = ${job_name}" else - CUDA_VISIBLE_DEVICES=${device} UMD_WAITAFTERLAUNCH=1 python3 -u tf_cnn_benchmarks.py\ + echo "device: ${device}" + UMD_WAITAFTERLAUNCH=1 python3 -u tf_cnn_benchmarks.py\ --data_name=imagenette --data_dir=${DATA_DIR}\ --data_format=NCHW \ --optimizer=${OPTIMIZER} --datasets_use_prefetch=False\ --local_parameter_device=gpu --num_gpus=${num_devices}\ --batch_size=${BATCH_SIZE} --model=${model}\ --variable_update=distributed_replicated\ - --job_name=${job_name} --ps_hosts=127.0.0.1:50000 --worker_hosts="${worker_hosts}"\ + --job_name=${job_name} --ps_hosts=127.0.0.1:40000 --worker_hosts="${worker_hosts}"\ --train_dir=${TRAIN_DIR} --task_index=${task_index} --print_training_accuracy=True "${new_args[@]}" & echo "Distributed training PID ($!) on device ${device} where job name = ${job_name} and task_index = ${task_index}" fi diff --git a/cv/classification/resnet50/tensorflow/tf_cnn_benchmarks.py b/cv/classification/resnet50/tensorflow/tf_cnn_benchmarks.py index 3014ed7a15a9776572be49a7f5cb5b794504914f..7166841e68cea9588d6b833e767279e1571751ad 100644 --- a/cv/classification/resnet50/tensorflow/tf_cnn_benchmarks.py +++ b/cv/classification/resnet50/tensorflow/tf_cnn_benchmarks.py @@ -57,6 +57,7 @@ def main(positional_arguments): % positional_arguments[1:]) params = benchmark_cnn.make_params_from_flags() + print(f"params: {params}") try: from dltest import show_training_arguments show_training_arguments(flags.FLAGS) diff --git a/cv/classification/resnet50/tensorflow/variable_mgr_util.py b/cv/classification/resnet50/tensorflow/variable_mgr_util.py index 94ce3e4b7c48d49797802f3dfadbaf0d4108d902..9264519b4c7adfa44e0ea4b25b1cad3ef4bee696 100644 --- a/cv/classification/resnet50/tensorflow/variable_mgr_util.py +++ b/cv/classification/resnet50/tensorflow/variable_mgr_util.py @@ -314,8 +314,8 @@ class StagedModelVariable(object): return self._value() -ops.register_tensor_conversion_function( - StagedModelVariable, StagedModelVariable._TensorConversionFunction) # pylint: disable=protected-access +# ops.register_tensor_conversion_function( +# StagedModelVariable, StagedModelVariable._TensorConversionFunction) # pylint: disable=protected-access class StagedVariableGetter(object): diff --git a/nlp/language_model/bert/tensorflow/base/init_tf.sh b/nlp/language_model/bert/tensorflow/base/init_tf.sh index 79e2ae63b18d1065fa778e7102b86ea7afc22025..ba68491a90a988ddfd8308eb1a886dad60b42589 100644 --- a/nlp/language_model/bert/tensorflow/base/init_tf.sh +++ b/nlp/language_model/bert/tensorflow/base/init_tf.sh @@ -17,4 +17,5 @@ pip3 uninstall -y protobuf pip3 install "protobuf<4.0.0" pip3 install git+https://github.com/mlperf/logging.git pip3 install git+https://github.com/NVIDIA/dllogger.git -pip3 install pandas==1.3.5 \ No newline at end of file +pip3 install pandas==1.3.5 +pip3 install numpy==1.26.4 \ No newline at end of file diff --git a/nlp/language_model/bert/tensorflow/base/modeling.py b/nlp/language_model/bert/tensorflow/base/modeling.py index 6bb2d9ad4eedfa23089a4f620b90928e1dd7b11a..948d508ba440d7dd127a7261c46ab8a4bd5c6fa0 100644 --- a/nlp/language_model/bert/tensorflow/base/modeling.py +++ b/nlp/language_model/bert/tensorflow/base/modeling.py @@ -244,12 +244,18 @@ class BertModel(object): # We "pool" the model by simply taking the hidden state corresponding # to the first token. We assume that this has been pre-trained first_token_tensor = tf.squeeze(self.sequence_output[:, 0:1, :], axis=1) - self.pooled_output = tf.compat.v1.layers.dense( - first_token_tensor, - config.hidden_size, - activation=tf.tanh, - kernel_initializer=create_initializer(config.initializer_range)) - + # self.pooled_output = tf.compat.v1.layers.dense( + # first_token_tensor, + # config.hidden_size, + # activation=tf.tanh, + # kernel_initializer=create_initializer(config.initializer_range)) + pooled_dense_layer = tf.keras.layers.Dense( + units=config.hidden_size, + activation=tf.tanh, + kernel_initializer=create_initializer(config.initializer_range) + ) + self.pooled_output = pooled_dense_layer(first_token_tensor) + def get_pooled_output(self): return self.pooled_output @@ -755,28 +761,50 @@ def attention_layer(from_tensor, to_tensor_2d = reshape_to_matrix(to_tensor) # `query_layer` = [B*F, N*H] - query_layer = tf.compat.v1.layers.dense( - from_tensor_2d, - num_attention_heads * size_per_head, - activation=query_act, - name="query", - kernel_initializer=create_initializer(initializer_range)) - +# query_layer = tf.compat.v1.layers.dense( +# from_tensor_2d, +# num_attention_heads * size_per_head, +# activation=query_act, +# name="query", +# kernel_initializer=create_initializer(initializer_range)) + + dense_layer_q = tf.keras.layers.Dense( + units=num_attention_heads * size_per_head, + activation=query_act, + kernel_initializer=create_initializer(initializer_range), + name="query" + ) + query_layer = dense_layer_q(from_tensor_2d) + # `key_layer` = [B*T, N*H] - key_layer = tf.compat.v1.layers.dense( - to_tensor_2d, - num_attention_heads * size_per_head, - activation=key_act, - name="key", - kernel_initializer=create_initializer(initializer_range)) +# key_layer = tf.compat.v1.layers.dense( +# to_tensor_2d, +# num_attention_heads * size_per_head, +# activation=key_act, +# name="key", +# kernel_initializer=create_initializer(initializer_range)) + dense_layer_k = tf.keras.layers.Dense( + units=num_attention_heads * size_per_head, + activation=key_act, + kernel_initializer=create_initializer(initializer_range), + name="key" + ) + key_layer = dense_layer_k(to_tensor_2d) # `value_layer` = [B*T, N*H] - value_layer = tf.compat.v1.layers.dense( - to_tensor_2d, - num_attention_heads * size_per_head, - activation=value_act, - name="value", - kernel_initializer=create_initializer(initializer_range)) +# value_layer = tf.compat.v1.layers.dense( +# to_tensor_2d, +# num_attention_heads * size_per_head, +# activation=value_act, +# name="value", +# kernel_initializer=create_initializer(initializer_range)) + dense_layer_v = tf.keras.layers.Dense( + units=num_attention_heads * size_per_head, + activation=value_act, + kernel_initializer=create_initializer(initializer_range), + name="value" + ) + value_layer = dense_layer_v(to_tensor_2d) # `query_layer` = [B, N, F, H] query_layer = transpose_for_scores(query_layer, batch_size, @@ -947,27 +975,45 @@ def transformer_model(input_tensor, # Run a linear projection of `hidden_size` then add a residual # with `layer_input`. with tf.compat.v1.variable_scope("output"): - attention_output = tf.compat.v1.layers.dense( - attention_output, - hidden_size, - kernel_initializer=create_initializer(initializer_range)) + # attention_output = tf.compat.v1.layers.dense( + # attention_output, + # hidden_size, + # kernel_initializer=create_initializer(initializer_range)) + attention_output_dense_layer = tf.keras.layers.Dense( + units=hidden_size, + kernel_initializer=create_initializer(initializer_range), + ) + attention_output = attention_output_dense_layer(attention_output) + attention_output = dropout(attention_output, hidden_dropout_prob) attention_output = layer_norm(attention_output + layer_input) # The activation is only applied to the "intermediate" hidden layer. with tf.compat.v1.variable_scope("intermediate"): - intermediate_output = tf.compat.v1.layers.dense( - attention_output, - intermediate_size, + # intermediate_output = tf.compat.v1.layers.dense( + # attention_output, + # intermediate_size, + # activation=intermediate_act_fn, + # kernel_initializer=create_initializer(initializer_range)) + intermediate_output_dense_layer = tf.keras.layers.Dense( + units=intermediate_size, activation=intermediate_act_fn, - kernel_initializer=create_initializer(initializer_range)) - + kernel_initializer=create_initializer(initializer_range), + ) + intermediate_output = intermediate_output_dense_layer(attention_output) + # Down-project back to `hidden_size` then add the residual. with tf.compat.v1.variable_scope("output"): - layer_output = tf.compat.v1.layers.dense( - intermediate_output, - hidden_size, - kernel_initializer=create_initializer(initializer_range)) + # layer_output = tf.compat.v1.layers.dense( + # intermediate_output, + # hidden_size, + # kernel_initializer=create_initializer(initializer_range)) + layer_output_dense_layer = tf.keras.layers.Dense( + units=hidden_size, + kernel_initializer=create_initializer(initializer_range), + ) + layer_output = layer_output_dense_layer(intermediate_output) + layer_output = dropout(layer_output, hidden_dropout_prob) layer_output = layer_norm(layer_output + attention_output) prev_output = layer_output diff --git a/nlp/language_model/bert/tensorflow/base/optimization.py b/nlp/language_model/bert/tensorflow/base/optimization.py index f7aa9f49103d2bd1380d17ad7610d7a2b6d8c408..c2816cf914ff354c9a304f84faa148592c1a2c6f 100644 --- a/nlp/language_model/bert/tensorflow/base/optimization.py +++ b/nlp/language_model/bert/tensorflow/base/optimization.py @@ -151,8 +151,8 @@ def create_optimizer(loss, init_lr, num_train_steps, num_warmup_steps, manual_fp return optimizer.apply_gradients(list(zip(accum_vars, tvars)), global_step=global_step) train_op = tf.cond(pred=update_step, true_fn=lambda: update(accum_vars), false_fn=lambda: tf.no_op()) - if hvd: - hvd.join() + # if hvd: + # hvd.join() else: grads_and_vars = optimizer.compute_gradients(loss, tvars, gate_gradients=tf.compat.v1.train.Optimizer.GATE_NONE) grads_and_vars = [(g, v) for g, v in grads_and_vars if g is not None] diff --git a/nlp/language_model/bert/tensorflow/base/run_multi_card_FPS.sh b/nlp/language_model/bert/tensorflow/base/run_multi_card_FPS.sh index df03dacae1e587ec2a0921ae56f1b6ec3fdfca3a..391ae2970e42665f4356334a2a70094c3da9e16a 100644 --- a/nlp/language_model/bert/tensorflow/base/run_multi_card_FPS.sh +++ b/nlp/language_model/bert/tensorflow/base/run_multi_card_FPS.sh @@ -30,7 +30,7 @@ horovodrun -np ${IX_NUM_CUDA_VISIBLE_DEVICES} ${HOROVOD_RUN_ARGS} python3 ./run_ --max_eval_steps=100 \ --max_predictions_per_seq=76 \ --max_seq_length=512 \ - --num_train_steps=2000 \ + --num_train_steps=1000 \ --num_accumulation_steps=4 \ --num_warmup_steps=0 \ --save_checkpoints_steps=20000 \ diff --git a/nlp/language_model/bert/tensorflow/base/run_pretraining.py b/nlp/language_model/bert/tensorflow/base/run_pretraining.py index e2078b962eba68eca012e76d2565a6127459ce49..4e6b55aaff4b55b4dbaeb5ad0e7e723a15eb0a45 100644 --- a/nlp/language_model/bert/tensorflow/base/run_pretraining.py +++ b/nlp/language_model/bert/tensorflow/base/run_pretraining.py @@ -1,21 +1,3 @@ -# coding=utf-8 -# Copyright (c) 2019 NVIDIA CORPORATION. All rights reserved. -# Copyright 2018 The Google AI Language Team Authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Run masked LM/next sentence masked_lm pre-training for BERT.""" - from __future__ import absolute_import from __future__ import division from __future__ import print_function @@ -23,45 +5,24 @@ from __future__ import print_function import os import sys import time -import modeling +import modeling import optimization import tensorflow as tf +import tensorflow.compat.v1 as tf_v1 import glob -from utils.utils import LogEvalRunHook -import utils.dllogger_class as dllogger_class -from dllogger import Verbosity -import math -import numbers -import numpy as np -from tensorflow.core.protobuf import rewriter_config_pb2 -from utils.tb_utils import ExamplesPerSecondEstimatorHook, write_hparams_v1 +# from utils.utils import LogEvalRunHook +# from dllogger import Verbosity +# import math +# import numbers +# import numpy as np +# from tensorflow.core.protobuf import rewriter_config_pb2 +# from utils.tb_utils import ExamplesPerSecondEstimatorHook, write_hparams_v1 curr_path = os.path.abspath(os.path.dirname(__file__)) - -flags = tf.compat.v1.flags - +flags = tf_v1.flags FLAGS = flags.FLAGS - -class TrainableVarsAllreducingHookPreOpt(tf.compat.v1.estimator.SessionRunHook): - def __init__(self, num_accumulation_steps=1): - super(TrainableVarsAllreducingHookPreOpt, self).__init__() - # Modify this collection in order to allreduce other set of variables - trainable_vars = tf.compat.v1.trainable_variables() - allreduced_trainable_var_ops = [ v.assign(hvd.allreduce(v)) for v in trainable_vars] - self.allreduce_trainable_vars_op = tf.group(*allreduced_trainable_var_ops) - self.num_accumulation_steps = num_accumulation_steps - self.current_iteration = 1 - - def before_run(self, run_context): - if self.current_iteration % self.num_accumulation_steps == 0: - return tf.compat.v1.train.SessionRunArgs(self.allreduce_trainable_vars_op) - - def after_run(self, run_context, run_values): - self.current_iteration += 1 - trainable_vars_allreduce_result = run_values.results - def init_flags(): ## Required parameters flags.DEFINE_string( @@ -171,465 +132,104 @@ def init_flags(): flags.DEFINE_integer("init_loss_scale", 2**15, "Initial value of loss scale if mixed precision training") -def get_mllog_mlloger(): - from mlperf_logging import mllog - - str_hvd_rank = str(hvd.rank()) if FLAGS.horovod else "0" - mllogger = mllog.get_mllogger() - filenames = os.path.normpath(FLAGS.output_dir) + "/result_rank_" + str_hvd_rank + ".txt" - mllog.config(filename=filenames) - workername = "worker" + str_hvd_rank - mllog.config( - default_namespace = workername, - default_stack_offset = 1, - default_clear_line = False, - root_dir = os.path.normpath( - os.path.join(os.path.dirname(os.path.realpath(__file__)), "..", ".."))) - - return mllogger, mllog - - -def past_stop_threshold(stop_threshold, eval_metric): - """Return a boolean representing whether a model should be stopped. - - Args: - stop_threshold: float, the threshold above which a model should stop - training. - eval_metric: float, the current value of the relevant metric to check. - - Returns: - True if training should stop, False otherwise. - - Raises: - ValueError: if either stop_threshold or eval_metric is not a number - """ - if stop_threshold is None: - return False - - if not isinstance(stop_threshold, numbers.Number): - raise ValueError("Threshold for checking stop conditions must be a number.") - if not isinstance(eval_metric, numbers.Number): - raise ValueError("Eval metric being checked against stop conditions " - "must be a number.") - - if eval_metric >= stop_threshold: - tf.compat.v1.logging.info( - "Stop threshold of {} was passed with metric value {}.".format( - stop_threshold, eval_metric)) - return True - - return False - -#_NUM_EXAMPLES_NAME = "num_examples" - -# report samples/sec, total loss and learning rate during training -class _LogSessionRunHook(tf.estimator.SessionRunHook): - def __init__(self, global_batch_size, num_accumulation_steps, dllogging, display_every=10, - save_ckpt_steps=1000, report_loss=True, hvd_rank=-1): - self.global_batch_size = global_batch_size - self.display_every = display_every - self.save_ckpt_steps = save_ckpt_steps - self.hvd_rank = hvd_rank - self.num_accumulation_steps = num_accumulation_steps - self.dllogging = dllogging - self.report_loss = report_loss - self.skip_iters = 0 - - def after_create_session(self, session, coord): - self.elapsed_secs = 0.0 #elapsed seconds between every print - self.count = 0 # number of global steps between every print - self.all_count = 0 #number of steps (including accumulation) between every print - self.loss = 0.0 # accumulation of loss in each step between every print - - self.total_time = 0.0 # total time taken to train (excluding warmup + ckpt saving steps) - self.step_time = 0.0 # time taken per step - self.init_global_step = session.run(tf.compat.v1.train.get_global_step()) # training starts at init_global_step - self.skipped = 0 - - def before_run(self, run_context): - if FLAGS.horovod and hvd.rank() != 0: - return - self.t0 = time.time() - if self.num_accumulation_steps <= 1: - if FLAGS.manual_fp16 or FLAGS.amp: - return tf.estimator.SessionRunArgs( - fetches=['step_update:0', 'total_loss:0', - 'learning_rate:0', 'nsp_loss:0', - 'mlm_loss:0', 'loss_scale:0']) - else: - return tf.estimator.SessionRunArgs( - fetches=['step_update:0', 'total_loss:0', - 'learning_rate:0', 'nsp_loss:0', - 'mlm_loss:0']) - else: - if FLAGS.manual_fp16 or FLAGS.amp: - return tf.estimator.SessionRunArgs( - fetches=['step_update:0', 'update_step:0', 'total_loss:0', - 'learning_rate:0', 'nsp_loss:0', - 'mlm_loss:0', 'loss_scale:0']) - else: - return tf.estimator.SessionRunArgs( - fetches=['step_update:0', 'update_step:0', 'total_loss:0', - 'learning_rate:0', 'nsp_loss:0', - 'mlm_loss:0']) - - def after_run(self, run_context, run_values): - if FLAGS.horovod and hvd.rank() != 0: - return - run_time = time.time() - self.t0 - - if self.num_accumulation_steps <=1: - if FLAGS.manual_fp16 or FLAGS.amp: - self.global_step, total_loss, lr, nsp_loss, mlm_loss, loss_scaler = run_values.results - else: - self.global_step, total_loss, lr, nsp_loss, mlm_loss = run_values. \ - results - update_step = True - else: - if FLAGS.manual_fp16 or FLAGS.amp: - self.global_step, update_step, total_loss, lr, nsp_loss, mlm_loss, loss_scaler = run_values.results - else: - self.global_step, update_step, total_loss, lr, nsp_loss, mlm_loss = run_values.\ - results - - self.elapsed_secs += run_time - self.step_time += run_time - - print_step = self.global_step + 1 # One-based index for printing. - self.loss += total_loss - self.all_count += 1 - if update_step: - - self.count += 1 - - # Removing first six steps after every checkpoint save from timing - if (self.global_step - self.init_global_step) % self.save_ckpt_steps < self.skip_iters: - print("Skipping time record for ", self.global_step, " due to checkpoint-saving/warmup overhead") - self.skipped += 1 - else: - self.total_time += self.step_time - - self.step_time = 0.0 #Reset Step Time - - if (print_step == 1 or print_step % self.display_every == 0): - dt = self.elapsed_secs / self.count - sent_per_sec = self.global_batch_size / dt - avg_loss_step = self.loss / self.all_count - if self.hvd_rank >= 0 and FLAGS.report_loss: - if FLAGS.manual_fp16 or FLAGS.amp: - self.dllogging.logger.log(step=(print_step), - data={"Rank": int(self.hvd_rank), "throughput_train": float(sent_per_sec), - "mlm_loss":float(mlm_loss), "nsp_loss":float(nsp_loss), - "total_loss":float(total_loss), "avg_loss_step":float(avg_loss_step), - "learning_rate": str(lr), "loss_scaler":int(loss_scaler)}, - verbosity=Verbosity.DEFAULT) - else: - self.dllogging.logger.log(step=int(print_step), - data={"Rank": int(self.hvd_rank), "throughput_train": float(sent_per_sec), - "mlm_loss":float(mlm_loss), "nsp_loss":float(nsp_loss), - "total_loss":float(total_loss), "avg_loss_step":float(avg_loss_step), - "learning_rate": str(lr)}, - verbosity=Verbosity.DEFAULT) - else: - if FLAGS.manual_fp16 or FLAGS.amp: - self.dllogging.logger.log(step=int(print_step), - data={"throughput_train": float(sent_per_sec), - "mlm_loss":float(mlm_loss), "nsp_loss":float(nsp_loss), - "total_loss":float(total_loss), "avg_loss_step":float(avg_loss_step), - "learning_rate": str(lr), "loss_scaler":int(loss_scaler) }, - verbosity=Verbosity.DEFAULT) - else: - self.dllogging.logger.log(step=int(print_step), - data={"throughput_train": float(sent_per_sec), - "mlm_loss":float(mlm_loss), "nsp_loss":float(nsp_loss), - "total_loss":float(total_loss), "avg_loss_step":float(avg_loss_step), - "learning_rate": str(lr)}, - verbosity=Verbosity.DEFAULT) - - self.elapsed_secs = 0.0 - self.count = 0 - self.loss = 0.0 - self.all_count = 0 - - - -train_op_name = None -class MLPerfHook(tf.estimator.SessionRunHook): - def __init__(self, global_batch_size, num_accumulation_steps, num_train_steps, samples_between_eval, - weight_decay_rate, beta_1, beta_2, epsilon, power, enable_device_warmup): - ''' - global_batch_size = train_batch_size * num_accumulation_steps * num_of_devices - num_train_steps = each step consumes global_batch_size samples - samples_between_eval = total samples in each block - ''' - mllogger, mllog = get_mllog_mlloger() - mllogger.event(key=mllog.constants.CACHE_CLEAR) - mllogger.start(key=mllog.constants.INIT_START) - mllogger.event(key=mllog.constants.GLOBAL_BATCH_SIZE, value=global_batch_size) - mllogger.event(key=mllog.constants.TRAIN_SAMPLES, value=global_batch_size * FLAGS.num_train_steps) - mllogger.event(key=mllog.constants.MAX_SEQUENCE_LENGTH, value=FLAGS.max_seq_length) - mllogger.event(key='max_predictions_per_seq', value=FLAGS.max_predictions_per_seq) - mllogger.event(key=mllog.constants.GRADIENT_ACCUMULATION_STEPS, value=FLAGS.num_accumulation_steps) - mllogger.event(key=mllog.constants.OPT_LR_TRAINING_STEPS, value=FLAGS.num_train_steps) - mllogger.event(key=mllog.constants.NUM_WARMUP_STEPS, value=FLAGS.num_warmup_steps) - mllogger.event(key=mllog.constants.OPT_LR_WARMUP_STEPS, value=FLAGS.num_warmup_steps) - mllogger.event(key=mllog.constants.START_WARMUP_STEP, value=0) - mllogger.event(key=mllog.constants.OPT_BASE_LR, value=FLAGS.learning_rate if not FLAGS.horovod else FLAGS.learning_rate * hvd.size()) - mllogger.event(key=mllog.constants.EVAL_SAMPLES, value=10000) - mllogger.event(key=mllog.constants.OPT_LAMB_BETA_1, value=beta_1) - mllogger.event(key=mllog.constants.OPT_LAMB_BETA_2, value=beta_2) - mllogger.event(key=mllog.constants.OPT_LAMB_LR_DECAY_POLY_POWER, value=power) - mllogger.event(key=mllog.constants.OPT_LAMB_WEIGHT_DECAY, value=weight_decay_rate) - mllogger.event(key="opt_epsilon", value=epsilon) - mllogger.start(key=mllog.constants.INIT_STOP) - - self.mllogger = mllogger - self.mllog = mllog - self.chpt_timestamp_dict={} - self.run_start_timestamp=None - self.checkpoint_timestamp_dict={} - self.block_stop_timestamp_dict={} - - num_steps_between_eval = math.ceil(samples_between_eval / global_batch_size) - n_loops = math.ceil(num_train_steps / num_steps_between_eval) - schedule = [num_steps_between_eval for _ in range(int(n_loops))] - schedule[-1] = num_train_steps - sum(schedule[:-1]) - self.num_accumulation_steps = num_accumulation_steps - self.num_steps_between_eval = num_steps_between_eval - self.schedule = schedule - self.cycle_index = 0 - self.count = 0 # global step counter - self.block_started = False - - self.enable_device_warmup = enable_device_warmup - - - def after_create_session(self, session, coord): - - if self.enable_device_warmup: - graph = session.graph - variables = list(filter(lambda op: op.type=='VarHandleOp', graph.get_operations())) - variable_names = [op.name for op in variables] - variable_readers = [name + '/Read/ReadVariableOp:0' for name in variable_names] - variable_assigners = [name + '/Assign' for name in variable_names] - variable_assigners_input1_name = [graph.get_operation_by_name(name + '/Assign').inputs[1].name for name in variable_names] - variable_name_to_assigner_input1_name = dict(zip(variable_names, variable_assigners_input1_name)) - # save state_dict - state_dict = dict(zip(variable_names, variable_readers)) - state_dict = session.run(fetches=state_dict) - # device warmup - fetches = [train_op_name, 'total_loss:0', 'global_step/add:0', 'Merge/MergeSummary:0', 'update_step:0', 'learning_rate:0', 'nsp_loss:0', 'mlm_loss:0', 'step_update:0'] - for _ in range(self.num_accumulation_steps): - result = session.run(fetches) - session.run('global_step/add:0') - session.run('global_step/add:0') - session.run('global_step/add:0') - #assert result[-1] == True - # restore data loader iterator - session.run(graph.get_operation_by_name('MakeIterator')) - # load state_dict - feed_dict = dict() - for key in variable_names: - feed_dict[variable_name_to_assigner_input1_name[key]] = state_dict[key] - session.run(fetches=variable_assigners, feed_dict=feed_dict) - - self.mllogger.start(key=self.mllog.constants.RUN_START) - self.run_start_timestamp=time.time() - - def before_run(self, run_context): - if self.block_started == False: - #self.checkpoint_timestamp_dict[self.cycle_index]=int(time.time()*1e3) - self.mllogger.start(key=self.mllog.constants.BLOCK_START, value=self.cycle_index + 1, metadata={self.mllog.constants.FIRST_EPOCH_NUM: int(self.cycle_index * self.num_steps_between_eval), self.mllog.constants.EPOCH_COUNT: int(self.num_steps_between_eval)}) - self.block_started = True - - if self.num_accumulation_steps <= 1: - return tf.estimator.SessionRunArgs(fetches=['step_update:0']) # global_step - else: - return tf.estimator.SessionRunArgs(fetches=['step_update:0', 'update_step:0']) # global_step, update_step - - def after_run(self, run_context, run_values): - if self.num_accumulation_steps <=1: - self.global_step = run_values.results - update_step = True - if update_step and self.global_step[0] > 0: - self.count = self.global_step[0] % self.schedule[0] - else: - self.global_step, update_step = run_values.results - if update_step and self.global_step > 0: - self.count = self.global_step % self.schedule[0] - #if update_step and self.global_step: - # self.count += 1 - - if self.global_step and (self.count==0):#(self.count > self.schedule[self.cycle_index]): - self.mllogger.end(key=self.mllog.constants.BLOCK_STOP, value=self.cycle_index + 1, metadata={self.mllog.constants.FIRST_EPOCH_NUM: int(self.cycle_index * self.num_steps_between_eval)}) - self.chpt_timestamp_dict[self.cycle_index + 1]=time.time() - self.checkpoint_timestamp_dict[self.cycle_index + 1]=int(time.time()*1e3) - self.block_stop_timestamp_dict[self.cycle_index + 1]=time.time() - self.cycle_index += 1 - self.count = 0 - self.block_started = False - - -def model_fn_builder(bert_config, init_checkpoint, learning_rate, - num_train_steps, num_warmup_steps, - use_one_hot_embeddings, weight_decay_rate, beta_1, beta_2, epsilon, power,hvd): - """Returns `model_fn` closure for TPUEstimator.""" - - def model_fn(features, labels, mode, params): # pylint: disable=unused-argument - """The `model_fn` for TPUEstimator.""" - - tf.compat.v1.logging.info("*** Features ***") - for name in sorted(features.keys()): - tf.compat.v1.logging.info(" name = %s, shape = %s" % (name, features[name].shape)) - - input_ids = features["input_ids"] - input_mask = features["input_mask"] - segment_ids = features["segment_ids"] - masked_lm_positions = features["masked_lm_positions"] - masked_lm_ids = features["masked_lm_ids"] - masked_lm_weights = features["masked_lm_weights"] - next_sentence_labels = features["next_sentence_labels"] - - is_training = (mode == tf.estimator.ModeKeys.TRAIN) +def _decode_record(record, name_to_features): + example = tf.io.parse_single_example(record, name_to_features) + for name in list(example.keys()): + t = example[name] + if t.dtype == tf.int64: + t = tf.cast(t, tf.int32) + example[name] = t + return example + +# ======================================== +# 1. Input Pipeline (unchanged) +# ======================================== +def input_fn_builder(input_files, + batch_size, + max_seq_length, + max_predictions_per_seq, + is_training, + num_cpu_threads=4): + name_to_features = { + "input_ids": tf.io.FixedLenFeature([max_seq_length], tf.int64), + "input_mask": tf.io.FixedLenFeature([max_seq_length], tf.int64), + "segment_ids": tf.io.FixedLenFeature([max_seq_length], tf.int64), + "masked_lm_positions": tf.io.FixedLenFeature([max_predictions_per_seq], tf.int64), + "masked_lm_ids": tf.io.FixedLenFeature([max_predictions_per_seq], tf.int64), + "masked_lm_weights": tf.io.FixedLenFeature([max_predictions_per_seq], tf.float32), + "next_sentence_labels": tf.io.FixedLenFeature([1], tf.int64), + } - model = modeling.BertModel( - config=bert_config, - is_training=is_training, - input_ids=input_ids, - input_mask=input_mask, - token_type_ids=segment_ids, - use_one_hot_embeddings=use_one_hot_embeddings, - compute_type=tf.float16 if FLAGS.manual_fp16 else tf.float32) - - (masked_lm_loss, - masked_lm_example_loss, masked_lm_log_probs) = get_masked_lm_output( - bert_config, model.get_sequence_output(), model.get_embedding_table(), - masked_lm_positions, masked_lm_ids, - masked_lm_weights) - - (next_sentence_loss, next_sentence_example_loss, - next_sentence_log_probs) = get_next_sentence_output( - bert_config, model.get_pooled_output(), next_sentence_labels) - - masked_lm_loss = tf.identity(masked_lm_loss, name="mlm_loss") - next_sentence_loss = tf.identity(next_sentence_loss, name="nsp_loss") - total_loss = masked_lm_loss + next_sentence_loss - total_loss = tf.identity(total_loss, name='total_loss') - - tvars = tf.compat.v1.trainable_variables() - - initialized_variable_names = {} - if init_checkpoint: - (assignment_map, initialized_variable_names - ) = modeling.get_assignment_map_from_checkpoint(tvars, init_checkpoint) - - tf.compat.v1.train.init_from_checkpoint(init_checkpoint, assignment_map) - - if FLAGS.verbose_logging: - tf.compat.v1.logging.info("**** Trainable Variables ****") - for var in tvars: - init_string = "" - if var.name in initialized_variable_names: - init_string = ", *INIT_FROM_CKPT*" - tf.compat.v1.logging.info(" %d :: name = %s, shape = %s%s", 0 if FLAGS.horovod else hvd.rank(), var.name, var.shape, - init_string) - - output_spec = None - if mode == tf.estimator.ModeKeys.TRAIN: - train_op = optimization.create_optimizer( - total_loss, learning_rate, num_train_steps, num_warmup_steps, - FLAGS.manual_fp16, FLAGS.amp, FLAGS.num_accumulation_steps, FLAGS.optimizer_type, FLAGS.allreduce_post_accumulation, FLAGS.init_loss_scale, weight_decay_rate, beta_1, beta_2, epsilon, power, hvd) - global train_op_name - train_op_name = train_op.name - - output_spec = tf.estimator.EstimatorSpec( - mode=mode, - loss=total_loss, - train_op=train_op) - elif mode == tf.estimator.ModeKeys.EVAL: - - def metric_fn(masked_lm_example_loss, masked_lm_log_probs, masked_lm_ids, - masked_lm_weights, next_sentence_example_loss, - next_sentence_log_probs, next_sentence_labels): - """Computes the loss and accuracy of the model.""" - masked_lm_log_probs = tf.reshape(masked_lm_log_probs, - [-1, masked_lm_log_probs.shape[-1]]) - masked_lm_predictions = tf.argmax( - input=masked_lm_log_probs, axis=-1, output_type=tf.int32) - masked_lm_example_loss = tf.reshape(masked_lm_example_loss, [-1]) - masked_lm_ids = tf.reshape(masked_lm_ids, [-1]) - masked_lm_weights = tf.reshape(masked_lm_weights, [-1]) - masked_lm_accuracy = tf.compat.v1.metrics.accuracy( - labels=masked_lm_ids, - predictions=masked_lm_predictions, - weights=masked_lm_weights) - masked_lm_mean_loss = tf.compat.v1.metrics.mean( - values=masked_lm_example_loss, weights=masked_lm_weights) - - next_sentence_log_probs = tf.reshape( - next_sentence_log_probs, [-1, next_sentence_log_probs.shape[-1]]) - next_sentence_predictions = tf.argmax( - input=next_sentence_log_probs, axis=-1, output_type=tf.int32) - next_sentence_labels = tf.reshape(next_sentence_labels, [-1]) - next_sentence_accuracy = tf.compat.v1.metrics.accuracy( - labels=next_sentence_labels, predictions=next_sentence_predictions) - next_sentence_mean_loss = tf.compat.v1.metrics.mean( - values=next_sentence_example_loss) - - return { - "masked_lm_accuracy": masked_lm_accuracy, - "masked_lm_loss": masked_lm_mean_loss, - "next_sentence_accuracy": next_sentence_accuracy, - "next_sentence_loss": next_sentence_mean_loss, - } - - eval_metric_ops = metric_fn( - masked_lm_example_loss, masked_lm_log_probs, masked_lm_ids, - masked_lm_weights, next_sentence_example_loss, - next_sentence_log_probs, next_sentence_labels - ) - output_spec = tf.estimator.EstimatorSpec( - mode=mode, - loss=total_loss, - eval_metric_ops=eval_metric_ops) + if is_training: + d = tf_v1.data.Dataset.from_tensor_slices(tf.constant(input_files)) + if FLAGS.horovod: + d = d.shard(hvd.size(), hvd.rank()) + d = d.repeat() + d = d.shuffle(buffer_size=len(input_files)) + cycle_length = min(num_cpu_threads, len(input_files)) + d = d.apply( + tf_v1.data.experimental.parallel_interleave( + tf_v1.data.TFRecordDataset, + sloppy=is_training, + cycle_length=cycle_length)) + d = d.shuffle(buffer_size=100) else: - raise ValueError("Only TRAIN and EVAL modes are supported: %s" % (mode)) + d = tf_v1.data.TFRecordDataset(input_files) + d = d.repeat() + + d = d.apply( + tf_v1.data.experimental.map_and_batch( + lambda record: _decode_record(record, name_to_features), + batch_size=batch_size, + num_parallel_batches=num_cpu_threads, + drop_remainder=True if is_training else False)) + return d - return output_spec - return model_fn +def gather_indexes(sequence_tensor, positions): + """Gathers the vectors at the specific positions over a minibatch.""" + sequence_shape = modeling.get_shape_list(sequence_tensor, expected_rank=3) + batch_size = sequence_shape[0] + seq_length = sequence_shape[1] + width = sequence_shape[2] + flat_offsets = tf.reshape( + tf.range(0, batch_size, dtype=tf.int32) * seq_length, [-1, 1]) + flat_positions = tf.reshape(positions + flat_offsets, [-1]) + flat_sequence_tensor = tf.reshape(sequence_tensor, + [batch_size * seq_length, width]) + output_tensor = tf.gather(flat_sequence_tensor, flat_positions) + return output_tensor def get_masked_lm_output(bert_config, input_tensor, output_weights, positions, label_ids, label_weights): """Get loss and log probs for the masked LM.""" input_tensor = gather_indexes(input_tensor, positions) - with tf.compat.v1.variable_scope("cls/predictions"): + with tf_v1.variable_scope("cls/predictions"): # We apply one more non-linear transformation before the output layer. # This matrix is not used after pre-training. - with tf.compat.v1.variable_scope("transform"): - input_tensor = tf.compat.v1.layers.dense( - input_tensor, - units=bert_config.hidden_size, - activation=modeling.get_activation(bert_config.hidden_act), - kernel_initializer=modeling.create_initializer( - bert_config.initializer_range)) + with tf_v1.variable_scope("transform"): + input_tensor_dense_layer = tf.keras.layers.Dense( + units=bert_config.hidden_size, + activation=modeling.get_activation(bert_config.hidden_act), + kernel_initializer=modeling.create_initializer(bert_config.initializer_range) + ) + input_tensor = input_tensor_dense_layer(input_tensor) + input_tensor = modeling.layer_norm(input_tensor) # The output weights are the same as the input embeddings, but there is # an output-only bias for each token. - output_bias = tf.compat.v1.get_variable( + output_bias = tf_v1.get_variable( "output_bias", shape=[bert_config.vocab_size], - initializer=tf.compat.v1.zeros_initializer()) + initializer=tf_v1.zeros_initializer()) logits = tf.matmul(tf.cast(input_tensor, tf.float32), output_weights, transpose_b=True) logits = tf.nn.bias_add(logits, output_bias) log_probs = tf.nn.log_softmax(logits - tf.reduce_max(input_tensor=logits, keepdims=True, axis=-1), axis=-1) - #log_probs = tf.nn.log_softmax(logits, axis=-1) - label_ids = tf.reshape(label_ids, [-1]) label_weights = tf.reshape(label_weights, [-1]) @@ -653,13 +253,13 @@ def get_next_sentence_output(bert_config, input_tensor, labels): # Simple binary classification. Note that 0 is "next sentence" and 1 is # "random sentence". This weight matrix is not used after pre-training. - with tf.compat.v1.variable_scope("cls/seq_relationship"): - output_weights = tf.compat.v1.get_variable( + with tf_v1.variable_scope("cls/seq_relationship"): + output_weights = tf_v1.get_variable( "output_weights", shape=[2, bert_config.hidden_size], initializer=modeling.create_initializer(bert_config.initializer_range)) - output_bias = tf.compat.v1.get_variable( - "output_bias", shape=[2], initializer=tf.compat.v1.zeros_initializer()) + output_bias = tf_v1.get_variable( + "output_bias", shape=[2], initializer=tf_v1.zeros_initializer()) logits = tf.matmul(tf.cast(input_tensor, tf.float32), output_weights, transpose_b=True) logits = tf.nn.bias_add(logits, output_bias) @@ -672,446 +272,171 @@ def get_next_sentence_output(bert_config, input_tensor, labels): loss = tf.reduce_mean(input_tensor=per_example_loss) return (loss, per_example_loss, log_probs) +# ======================================== +# 3. Main Training Script (No Estimator) +# ======================================== +def main(_): + tf_v1.logging.set_verbosity(tf_v1.logging.INFO) -def gather_indexes(sequence_tensor, positions): - """Gathers the vectors at the specific positions over a minibatch.""" - sequence_shape = modeling.get_shape_list(sequence_tensor, expected_rank=3) - batch_size = sequence_shape[0] - seq_length = sequence_shape[1] - width = sequence_shape[2] - - flat_offsets = tf.reshape( - tf.range(0, batch_size, dtype=tf.int32) * seq_length, [-1, 1]) - flat_positions = tf.reshape(positions + flat_offsets, [-1]) - flat_sequence_tensor = tf.reshape(sequence_tensor, - [batch_size * seq_length, width]) - output_tensor = tf.gather(flat_sequence_tensor, flat_positions) - return output_tensor - - -def input_fn_builder(input_files, - batch_size, - max_seq_length, - max_predictions_per_seq, - is_training, - num_cpu_threads=4): - """Creates an `input_fn` closure to be passed to Estimator.""" - - def input_fn(): - """The actual input function.""" - - name_to_features = { - "input_ids": - tf.io.FixedLenFeature([max_seq_length], tf.int64), - "input_mask": - tf.io.FixedLenFeature([max_seq_length], tf.int64), - "segment_ids": - tf.io.FixedLenFeature([max_seq_length], tf.int64), - "masked_lm_positions": - tf.io.FixedLenFeature([max_predictions_per_seq], tf.int64), - "masked_lm_ids": - tf.io.FixedLenFeature([max_predictions_per_seq], tf.int64), - "masked_lm_weights": - tf.io.FixedLenFeature([max_predictions_per_seq], tf.float32), - "next_sentence_labels": - tf.io.FixedLenFeature([1], tf.int64), - } - - # For training, we want a lot of parallel reading and shuffling. - # For eval, we want no shuffling and parallel reading doesn't matter. - if is_training: - d = tf.data.Dataset.from_tensor_slices(tf.constant(input_files)) - if FLAGS.horovod: #and FLAGS.is_dist_eval_enabled: - d = d.shard(hvd.size(), hvd.rank()) - d = d.repeat() - d = d.shuffle(buffer_size=len(input_files)) - - # `cycle_length` is the number of parallel files that get read. - cycle_length = min(num_cpu_threads, len(input_files)) - - # `sloppy` mode means that the interleaving is not exact. This adds - # even more randomness to the training pipeline. - d = d.apply( - tf.data.experimental.parallel_interleave( - tf.data.TFRecordDataset, - sloppy=is_training, - cycle_length=cycle_length)) - d = d.shuffle(buffer_size=100) - else: - d = tf.data.TFRecordDataset(input_files) - # Since we evaluate for a fixed number of steps we don't want to encounter - # out-of-range exceptions. - #if FLAGS.horovod: d = d.shard(hvd.size(), hvd.rank()) - d = d.repeat() - - # We must `drop_remainder` on training because the TPU requires fixed - # size dimensions. For eval, we assume we are evaluating on the CPU or GPU - # and we *don't* want to drop the remainder, otherwise we wont cover - # every sample. - d = d.apply( - tf.data.experimental.map_and_batch( - lambda record: _decode_record(record, name_to_features), - batch_size=batch_size, - num_parallel_batches=num_cpu_threads, - drop_remainder=True if is_training else False)) - return d - - return input_fn - + bert_config = modeling.BertConfig.from_json_file(FLAGS.bert_config_file) + tf.io.gfile.makedirs(FLAGS.output_dir) -def _decode_record(record, name_to_features): - """Decodes a record to a TensorFlow example.""" - example = tf.io.parse_single_example(serialized=record, features=name_to_features) + input_files = [] + for input_file_dir in FLAGS.input_files_dir.split(","): + input_files.extend(glob.glob(os.path.join(input_file_dir, "*"))) - # tf.Example only supports tf.int64, but the TPU only supports tf.int32. - # So cast all int64 to int32. - for name in list(example.keys()): - t = example[name] - if t.dtype == tf.int64: - t = tf.cast(t, dtype=tf.int32) - example[name] = t + if FLAGS.horovod and len(input_files) < hvd.size(): + input_files = [input_files[0]] * hvd.size() - return example + # ======================================== + # 4. Build Graph + # ======================================== + tf_v1.reset_default_graph() -def main(_): - os.environ["TF_XLA_FLAGS"] = "--tf_xla_enable_lazy_compilation=false" #causes memory fragmentation for bert leading to OOM - - tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO) - - dllogging = dllogger_class.dllogger_class(FLAGS.dllog_path) - - if not FLAGS.do_train and not FLAGS.do_eval: - raise ValueError("At least one of `do_train` or `do_eval` must be True.") - - # In multi-node scenario, on each of HLSes there must be a checkpoint directly in the output_dir (read by Phase 2). - # There may be only one worker with comm_local_rank() == 0 on each machine and this worker will put its checkpoints there. - # All other workers use sub-directories to keep checkpoints. - if FLAGS.horovod and hvd.rank() != 0: - FLAGS.output_dir = os.path.join(FLAGS.output_dir, f'worker_{hvd.rank()}') - - bert_config = modeling.BertConfig.from_json_file(FLAGS.bert_config_file) - - tf.io.gfile.makedirs(FLAGS.output_dir) - - input_files = [] - for input_file_dir in FLAGS.input_files_dir.split(","): - input_files.extend(tf.io.gfile.glob(os.path.join(input_file_dir, "*"))) - - if FLAGS.horovod and len(input_files) < hvd.size(): - tf.compat.v1.logging.warning("Input files count lower then expected. Using single file for OVERFIT test.") - input_files = [input_files[0] for i in range(hvd.size())] - if FLAGS.amp and FLAGS.manual_fp16: - raise ValueError("AMP and Manual Mixed Precision Training are both activated! Error") - - is_per_host = tf.compat.v1.estimator.tpu.InputPipelineConfig.PER_HOST_V2 - session_config = tf.compat.v1.ConfigProto() - - session_config.allow_soft_placement=True - session_config.log_device_placement = False - session_config.gpu_options.allow_growth = True - if FLAGS.horovod: - print("*************************: ", hvd.local_rank()) - if hvd.rank() == 0: - tf.compat.v1.logging.info("***** Configuaration *****") - for key in FLAGS.__flags.keys(): - tf.compat.v1.logging.info(' {}: {}'.format(key, getattr(FLAGS, key))) - tf.compat.v1.logging.info("**************************") - -# config.gpu_options.per_process_gpu_memory_fraction = 0.7 - if FLAGS.use_xla: - session_config.graph_options.optimizer_options.global_jit_level = tf.compat.v1.OptimizerOptions.ON_1 - session_config.graph_options.rewrite_options.memory_optimization = rewriter_config_pb2.RewriterConfig.NO_MEM_OPT - if FLAGS.amp: - tf.compat.v1.enable_resource_variables() - - run_config = tf.estimator.RunConfig( - tf_random_seed=17645, - session_config=session_config, - save_checkpoints_steps=FLAGS.save_checkpoints_steps if not FLAGS.horovod or hvd.rank() == 0 else None, - save_checkpoints_secs=None, - keep_checkpoint_max=5, - save_summary_steps=FLAGS.save_checkpoints_steps if not FLAGS.horovod or hvd.rank() == 0 else None, - log_step_count_steps=FLAGS.display_loss_steps) - - if FLAGS.optimizer_type == "lamb": - weight_decay_rate=0.01 - beta_1=0.9 - beta_2=0.999 - epsilon=1e-6 - power = 1 - - model_fn = model_fn_builder( - bert_config=bert_config, - init_checkpoint=FLAGS.init_checkpoint, - learning_rate=FLAGS.learning_rate if not FLAGS.horovod else FLAGS.learning_rate*hvd.size(), - num_train_steps=FLAGS.num_train_steps, - num_warmup_steps=0, #FLAGS.num_warmup_steps, - use_one_hot_embeddings=False, weight_decay_rate=weight_decay_rate, beta_1=beta_1, beta_2=beta_2, epsilon=epsilon, power=power, hvd=hvd if FLAGS.horovod else None) - - estimator = tf.estimator.Estimator( - model_fn=model_fn, - model_dir=FLAGS.output_dir if hvd.rank() == 0 else None, - config=run_config) - - batch_size_per_node = FLAGS.train_batch_size * FLAGS.num_accumulation_steps - global_batch_size = (hvd.size() if FLAGS.horovod else 1) * batch_size_per_node - write_hparams_v1(FLAGS.output_dir, { - 'batch_size': FLAGS.train_batch_size, - 'batch_size_per_pu': FLAGS.train_batch_size, - 'batch_size_per_node': batch_size_per_node, - 'global_batch_size': global_batch_size, - **{x: getattr(FLAGS, x) for x in FLAGS} - }) - - if FLAGS.do_train: - - training_hooks = [] - if FLAGS.horovod: - if True: #os.environ.get("FORCE_WEIGHT_SYNC", "False").lower() in ["true", "1"]: - # Use this hook to allreduce trainable variables before the optimizer run - training_hooks.append(TrainableVarsAllreducingHookPreOpt(FLAGS.num_accumulation_steps)) - - train_log_hook = _LogSessionRunHook( - global_batch_size, FLAGS.num_accumulation_steps, dllogging, - FLAGS.display_loss_steps, FLAGS.save_checkpoints_steps, FLAGS.report_loss) - training_hooks.append(train_log_hook) - - training_hooks.append(ExamplesPerSecondEstimatorHook( - batch_size=batch_size_per_node, output_dir=FLAGS.output_dir, - extra_metrics={'global_examples/sec': global_batch_size})) - mlperfhook = MLPerfHook(global_batch_size, FLAGS.num_accumulation_steps, FLAGS.num_train_steps, FLAGS.samples_between_eval, - weight_decay_rate, beta_1, beta_2, epsilon, power, FLAGS.enable_device_warmup) - training_hooks.append(mlperfhook) - - # Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states from - # rank 0 to all other processes. This is necessary to ensure consistent - # initialization of all workers when training is started with random weights or - # restored from a checkpoint. - bcast_hook = hvd.BroadcastGlobalVariablesHook(0) - training_hooks.append(bcast_hook) - tf.compat.v1.logging.info("***** Running training *****") - tf.compat.v1.logging.info(" Batch size = %d", FLAGS.train_batch_size) - train_input_fn = input_fn_builder( + # Placeholders for features (not needed if using dataset iterator directly) + train_dataset = input_fn_builder( input_files=input_files, batch_size=FLAGS.train_batch_size, max_seq_length=FLAGS.max_seq_length, max_predictions_per_seq=FLAGS.max_predictions_per_seq, - is_training=True) - - #train_start_time = time.time() - #estimator.train(input_fn=train_input_fn, hooks=training_hooks, max_steps=FLAGS.num_train_steps) - #if FLAGS.horovod: - # hvd.join() - #train_time_elapsed = time.time() - train_start_time - - #do offline evaluation right after training for mlperf - #tf.compat.v1.logging.info("***** Running offline evaluation right after training for mlperf *****") - #converged = False - eval_start_time = time.time() - mlperf_chpt_timestamp_dict = mlperfhook.chpt_timestamp_dict - mlperf_run_start_timestamp = mlperfhook.run_start_timestamp - mlperf_checkpoint_timestamp_dict = mlperfhook.checkpoint_timestamp_dict - mlperf_mlloger = mlperfhook.mllogger - mlperf_mllog = mlperfhook.mllog - mlperf_block_stop_timestamp_dict = mlperfhook.block_stop_timestamp_dict - num_steps_between_eval = math.ceil(FLAGS.samples_between_eval / global_batch_size) - print("mlperf_run_start_timestamp={}".format(mlperf_run_start_timestamp)) - print("mlperf_checkpoint_timestamp_dict={}".format(mlperf_checkpoint_timestamp_dict)) - print("mlperf_block_stop_timestamp_dict={}".format(mlperf_block_stop_timestamp_dict)) - ''' - chpt_file_path = FLAGS.output_dir + "/checkpoint" - chpt_files = [] - with open(chpt_file_path, "r") as file: - for line in file: - tmp,chpt_step = line.split(":") - if tmp == 'all_model_checkpoint_paths': - step = int(chpt_step.strip().split("-")[1].strip('"')) - if step >0: - chpt_files.append(FLAGS.output_dir + '/'+ chpt_step.strip().strip('"')) - ''' - eval_files = [] - for eval_file_dir in FLAGS.eval_files_dir.split(","): - eval_files.extend(tf.io.gfile.glob(os.path.join(eval_file_dir, "*"))) - - eval_input_fn = input_fn_builder( - input_files=eval_files, - batch_size=FLAGS.eval_batch_size, - max_seq_length=FLAGS.max_seq_length, - max_predictions_per_seq=FLAGS.max_predictions_per_seq, - is_training=False) - - eval_hooks = [LogEvalRunHook(FLAGS.eval_batch_size)] - train_hooks_for_earlystop = tf.estimator.experimental.stop_if_higher_hook(estimator, metric_name="masked_lm_accuracy", threshold=FLAGS.stop_threshold) - training_hooks.append(train_hooks_for_earlystop) - estimator.train(input_fn=train_input_fn, hooks=training_hooks, max_steps=FLAGS.num_train_steps) - ''' - - if True: - tf.compat.v1.logging.info("***** Running offline NON-distributed evaluation for mlperf *****") - for ckpt_ind,chpt_path in enumerate(chpt_files): - print("checkpoint file path={}".format(chpt_path)) - eval_results = estimator.evaluate( - input_fn=eval_input_fn, steps=FLAGS.max_eval_steps, hooks=eval_hooks, checkpoint_path=chpt_path) - mlperf_mlloger.event(key=mlperf_mllog.constants.EVAL_ACCURACY,value=eval_results["masked_lm_accuracy"],time_ms=mlperf_checkpoint_timestamp_dict[ckpt_ind + 1],metadata={'epoch_num': (ckpt_ind + 1)*FLAGS.samples_between_eval,'epoch_count': ckpt_ind + 1}) - - print("per rank mlm accuracy={}".format(eval_results["masked_lm_accuracy"])) - if FLAGS.stop_threshold: - success = bool(eval_results["masked_lm_accuracy"] >= FLAGS.stop_threshold) + is_training=True + ) + iterator = tf_v1.data.make_one_shot_iterator(train_dataset) + features = iterator.get_next() - if FLAGS.horovod: - hvd.join() - past_treshold = tf.cast(past_stop_threshold( - FLAGS.stop_threshold, eval_results["masked_lm_accuracy"]), tf.float32) - global_past_treshold = tf.math.greater( - hvd.allreduce(past_treshold, op=hvd.Sum), tf.zeros(1, tf.float32)) - if global_past_treshold.numpy(): - converged = True - print("converged") - step = int(chpt_path.strip().split("-")[1].strip('"')) - print("step={}".format(step)) - converge_block_idx = int(step / num_steps_between_eval ) - print("converged at step:{}, block:{}".format(step, converge_block_idx)) - break - else: - if past_stop_threshold( - FLAGS.stop_threshold, eval_results["masked_lm_accuracy"]): - converged = True - print("converged") - step = int(chpt_path.strip().split("-")[1].strip('"')) - print("step={}".format(step)) - converge_block_idx = int(step / num_steps_between_eval ) - print("converged at step:{}, block:{}".format(step, converge_block_idx)) - break - eval_time_elapsed = time.time() - eval_start_time - print("Total offline non-distributed evaluation time={} seconds".format(eval_time_elapsed)) - if converged: - total_train_time_secs = (mlperf_block_stop_timestamp_dict[converge_block_idx] - mlperf_run_start_timestamp) - mlperf_run_stop_timestamp = mlperf_block_stop_timestamp_dict[converge_block_idx] + eval_time_elapsed - time_to_train_minutes = (total_train_time_secs + eval_time_elapsed) / 60 - mlperf_mlloger.end(key=mlperf_mllog.constants.RUN_STOP,value=eval_results["masked_lm_accuracy"],time_ms=mlperf_checkpoint_timestamp_dict[ckpt_ind + 1],metadata={'epoch_num': (ckpt_ind + 1)*FLAGS.samples_between_eval,'epoch_count': ckpt_ind + 1,'status': 'success'}) - print("Total time-to-train is {} minutes ( = pure training time {} minutes + pure evaluation time {} minutes), converged in {} blocks ".format(time_to_train_minutes, total_train_time_secs/60, eval_time_elapsed / 60, converge_block_idx)) - else: - mlperf_mlloger.end(key=mlperf_mllog.constants.RUN_STOP,value=eval_results["masked_lm_accuracy"],time_ms=mlperf_checkpoint_timestamp_dict[ckpt_ind + 1],metadata={'epoch_num': (ckpt_ind + 1)*FLAGS.samples_between_eval,'epoch_count': ckpt_ind + 1,'status': 'fail'}) - ''' - if FLAGS.do_eval: - if FLAGS.horovod: - if hvd.rank() is not 0: - return - converged = False - num_steps_between_eval = math.ceil(FLAGS.samples_between_eval / global_batch_size) - eval_start_time = time.time() - #Stand-alone offline evaluation of multiple checkpoints - chpt_file_path = FLAGS.output_dir + "/checkpoint" - chpt_files = [] - with open(chpt_file_path, "r") as file: - for line in file: - tmp,chpt_step = line.split(":") - if tmp == 'all_model_checkpoint_paths': - step = int(chpt_step.strip().split("-")[1].strip('"')) - if step > 0: - chpt_files.append(FLAGS.output_dir + '/'+ chpt_step.strip().strip('"')) - eval_files = [] - for eval_file_dir in FLAGS.eval_files_dir.split(","): - eval_files.extend(tf.io.gfile.glob(os.path.join(eval_file_dir, "*"))) - - eval_input_fn = input_fn_builder( - input_files=eval_files, - batch_size=FLAGS.eval_batch_size, - max_seq_length=FLAGS.max_seq_length, - max_predictions_per_seq=FLAGS.max_predictions_per_seq, - is_training=False) - - eval_hooks = [LogEvalRunHook(FLAGS.eval_batch_size)] - - if FLAGS.horovod and FLAGS.is_dist_eval_enabled: - tf.compat.v1.logging.info("***** Running standalone offline distributed evaluation for mlperf *****") - - #need to shard the dataset!!!! - eval_samples = 10000 / hvd.size() - max_eval_steps = math.ceil(FLAGS.max_eval_steps / hvd.size()) - for chpt_path in chpt_files: - print("checkpoint file path={}".format(chpt_path)) - eval_results = estimator.evaluate( - input_fn=eval_input_fn, steps=max_eval_steps, hooks=eval_hooks, checkpoint_path=chpt_path) - - if FLAGS.stop_threshold: - partial_eval_masked_lm_accuracy = eval_results["masked_lm_accuracy"] * eval_samples - print("per rank masked_lm_accuracy={}".format(eval_results["masked_lm_accuracy"])) - partial_eval_masked_lm_accuracy_FP32=tf.cast(partial_eval_masked_lm_accuracy, tf.float32) - total_eval_masked_lm_accuracy_FP32 = hvd.allreduce(partial_eval_masked_lm_accuracy_FP32, op=hvd.Sum) - total_eval_masked_lm_accuracy_FP32 /= 10000.0 - success = bool(total_eval_masked_lm_accuracy_FP32 >= FLAGS.stop_threshold) - print("average eval_masked_lm_accuracy_FP32={}".format(total_eval_masked_lm_accuracy_FP32)) - if success: - converged = True - step = int(chpt_path.strip().split("-")[1].strip('"')) - converge_block_idx = int(step / num_steps_between_eval ) - print("converged at step:{}, block:{}".format(step, converge_block_idx)) - break - eval_time_elapsed = time.time() - eval_start_time - print("Total stand-alone offline distributed evaluation time={} seconds".format(eval_time_elapsed)) + input_ids = features["input_ids"] + input_mask = features["input_mask"] + segment_ids = features["segment_ids"] + masked_lm_positions = features["masked_lm_positions"] + masked_lm_ids = features["masked_lm_ids"] + masked_lm_weights = features["masked_lm_weights"] + next_sentence_labels = features["next_sentence_labels"] + + # Build model + model = modeling.BertModel( + config=bert_config, + is_training=True, + input_ids=input_ids, + input_mask=input_mask, + token_type_ids=segment_ids, + use_one_hot_embeddings=False, + compute_type=tf.float16 if FLAGS.manual_fp16 else tf.float32 + ) + + # Loss + (masked_lm_loss, _, _) = get_masked_lm_output( + bert_config, model.get_sequence_output(), model.get_embedding_table(), + masked_lm_positions, masked_lm_ids, masked_lm_weights) + (next_sentence_loss, _, _) = get_next_sentence_output( + bert_config, model.get_pooled_output(), next_sentence_labels) + + total_loss = tf.identity(masked_lm_loss + next_sentence_loss, name='total_loss') + + # Optimizer + if FLAGS.optimizer_type == "lamb": + weight_decay_rate = 0.01 + beta_1, beta_2, epsilon, power = 0.9, 0.999, 1e-6, 1 else: - tf.compat.v1.logging.info("***** Running standalone offline NON-distributed evaluation for mlperf *****") - for chpt_path in chpt_files: - print("checkpoint file path={}".format(chpt_path)) - eval_results = estimator.evaluate( - input_fn=eval_input_fn, steps=FLAGS.max_eval_steps, hooks=eval_hooks, checkpoint_path=chpt_path) - print("per rank mlm accuracy={}".format(eval_results["masked_lm_accuracy"])) - if FLAGS.stop_threshold: - success = bool(eval_results["masked_lm_accuracy"] >= FLAGS.stop_threshold) - - if False: + weight_decay_rate = beta_1 = beta_2 = epsilon = power = None + + learning_rate = FLAGS.learning_rate * (hvd.size() if FLAGS.horovod else 1) + + # Create optimizer (returns train_op and loss_scale_var if AMP) + train_op = optimization.create_optimizer( + loss=total_loss, + init_lr=learning_rate, + num_train_steps=FLAGS.num_train_steps, + num_warmup_steps=0, + manual_fp16=FLAGS.manual_fp16, + use_fp16=FLAGS.amp, + num_accumulation_steps=FLAGS.num_accumulation_steps, + optimizer_type=FLAGS.optimizer_type, + allreduce_post_accumulation=FLAGS.allreduce_post_accumulation, + init_loss_scale=FLAGS.init_loss_scale, + weight_decay_rate=weight_decay_rate, + beta_1=beta_1, + beta_2=beta_2, + epsilon=epsilon, + power=power, + hvd=hvd if FLAGS.horovod else None + ) + # Horovod: broadcast initial variables + bcast_op = hvd.broadcast_global_variables(0) if FLAGS.horovod else None + + + # Checkpoint and summary + global_step = tf_v1.train.get_or_create_global_step() + saver = tf_v1.train.Saver(max_to_keep=5) + + # Session config + session_config = tf_v1.ConfigProto() + session_config.allow_soft_placement = True + session_config.log_device_placement = False + session_config.gpu_options.allow_growth = True + print(f"size:{hvd.size()}, local_rank:{hvd.local_rank()}") + + # Training loop + with tf_v1.Session(config=session_config) as sess: + sess.run(tf_v1.global_variables_initializer()) + + # Horovod: broadcast initial weights + if FLAGS.horovod: + sess.run(bcast_op) + + tf_v1.logging.info("***** Running training *****") + tf_v1.logging.info(" Batch size = %d", FLAGS.train_batch_size) + + step = 0 + total_time = 0 + total_setences = 0 + acc_steps = 0 + while step < FLAGS.num_train_steps: + try: + start_time = time.time() + _, step, total_loss_val, nsp_loss_val, mlm_loss_val = sess.run([train_op, global_step, total_loss, next_sentence_loss, masked_lm_loss]) + + total_time += time.time() - start_time + total_setences += FLAGS.train_batch_size*(hvd.size() if FLAGS.horovod else 1) + + if step % FLAGS.display_loss_steps == 0 and acc_steps % FLAGS.num_accumulation_steps == 0: + print(f"Step {step}, total_loss: {total_loss_val:.4f}, nsp_loss: {nsp_loss_val:.4f}, mlm_loss: {mlm_loss_val:.4f}, sentences_per_second: {total_setences/total_time:.4f}") + + acc_steps += 1 + except tf.errors.OutOfRangeError: + break + + if hvd: hvd.join() - past_treshold = tf.cast(past_stop_threshold( - FLAGS.stop_threshold, eval_results["masked_lm_accuracy"]), tf.float32) - global_past_treshold = tf.math.greater( - hvd.allreduce(past_treshold, op=hvd.Sum), tf.zeros(1, tf.float32)) - if global_past_treshold.numpy(): - converged = True - step = int(chpt_path.strip().split("-")[1].strip('"')) - converge_block_idx = int(step / num_steps_between_eval ) - print("converged at step:{}, block:{}".format(step, converge_block_idx)) - break - else: - if past_stop_threshold( - FLAGS.stop_threshold, eval_results["masked_lm_accuracy"]): - converged = True - step = int(chpt_path.strip().split("-")[1].strip('"')) - converge_block_idx = int(step / num_steps_between_eval ) - print("converged at step:{}, block:{}".format(step, converge_block_idx)) - break - eval_time_elapsed = time.time() - eval_start_time - print("Total stand-alone offline non-distributed evaluation time={} seconds".format(eval_time_elapsed)) - if __name__ == "__main__": - start_time = time.time() - init_flags() - try: - from dltest import show_training_arguments - show_training_arguments(FLAGS) - except: - pass - print("*****************************************") - print("Arguments passed to this program: run_pretraining.") - tf.compat.v1.enable_eager_execution() - tf.compat.v1.enable_resource_variables() - if FLAGS.horovod: - import horovod.tensorflow as hvd - hvd.init() - os.environ['CUDA_VISIBLE_DEVICES'] = str(hvd.local_rank()) - #load_habana_module() - flags.mark_flag_as_required("input_files_dir") - if FLAGS.do_eval: - flags.mark_flag_as_required("eval_files_dir") - flags.mark_flag_as_required("bert_config_file") - flags.mark_flag_as_required("output_dir") - if FLAGS.use_xla and FLAGS.manual_fp16: - print('WARNING! Combining --use_xla with --manual_fp16 may prevent convergence.') - print(' This warning message will be removed when the underlying') - print(' issues have been fixed and you are running a TF version') - print(' that has that fix.') - tf.compat.v1.app.run() - end_time = time.time() - e2e_time = end_time - start_time - print("e2e_time:",e2e_time) + start_time = time.time() + init_flags() + try: + from dltest import show_training_arguments + show_training_arguments(FLAGS) + except: + pass + print("*****************************************") + print("Arguments passed to this program: run_pretraining.") + tf_v1.disable_v2_behavior() + tf_v1.disable_eager_execution() + tf_v1.enable_resource_variables() + if FLAGS.horovod: + import horovod.tensorflow as hvd + hvd.init() + os.environ['CUDA_VISIBLE_DEVICES'] = str(hvd.local_rank()) + #load_habana_module() + flags.mark_flag_as_required("input_files_dir") + if FLAGS.do_eval: + flags.mark_flag_as_required("eval_files_dir") + flags.mark_flag_as_required("bert_config_file") + flags.mark_flag_as_required("output_dir") + if FLAGS.use_xla and FLAGS.manual_fp16: + print('WARNING! Combining --use_xla with --manual_fp16 may prevent convergence.') + print(' This warning message will be removed when the underlying') + print(' issues have been fixed and you are running a TF version') + print(' that has that fix.') + tf_v1.app.run() + end_time = time.time() + e2e_time = end_time - start_time + print("e2e_time:",e2e_time)