-
Notifications
You must be signed in to change notification settings - Fork 5
/
03_multi_worker_with_estimator.py
123 lines (96 loc) · 3.57 KB
/
03_multi_worker_with_estimator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import mnist_dataset
import tensorflow as tf
import os, json
def input_fn(mode, num_epochs, batch_size, input_context=None):
datasets = mnist_dataset.load()
_dataset = (datasets['train'] if mode == tf.estimator.ModeKeys.TRAIN else
datasets['test'])
if input_context:
_dataset = _dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
_dataset = _dataset.shuffle(buffer_size= 2 * batch_size + 1).batch(batch_size).repeat(num_epochs)
return _dataset
def model_fn(features, labels, mode, params):
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28,28,1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])
logits = model(features)
predicted_logit = tf.argmax(input=logits, axis=1, output_type=tf.int32)
score = tf.compat.v1.math.softmax(logits)
predictions = {'logits': logits, 'classes': predicted_logit, 'probabilities': score}
if mode == tf.estimator.ModeKeys.PREDICT:
return tf.estimator.EstimatorSpec(labels=labels, predictions=predictions)
LEARNING_RATE = params['learning_rate']
BATCH_SIZE = params['batch_size']
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=LEARNING_RATE)
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction=tf.compat.v1.losses.Reduction.NONE)(labels, logits)
loss = tf.reduce_sum(loss) * (1. / BATCH_SIZE)
accuracy = tf.compat.v1.metrics.accuracy(labels, predicted_logit)
eval_metric = { 'accuracy': accuracy }
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(
mode,
loss=loss,
train_op=None,
eval_metric_ops=eval_metric,
predictions=predictions
)
return tf.estimator.EstimatorSpec(
mode=mode,
loss=loss,
train_op=optimizer.minimize(loss, tf.compat.v1.train.get_or_create_global_step()),
eval_metric_ops=eval_metric,
predictions=predictions
)
if __name__ == "__main__":
tfconfig = dict({
'cluster': {
'worker': ["192.168.1.10:2222", "192.168.1.11:2222"]
},
'task': {'type': 'worker', 'index': 0}
})
os.environ['TF_CONFIG'] = json.dumps(tfconfig)
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(communication=tf.distribute.experimental.CollectiveCommunication.RING)
OUTDIR='tmp/multiworker'
BATCH_SIZE = 100
NUM_EPOCHS = 5
config = tf.estimator.RunConfig(
train_distribute=strategy,
eval_distribute=strategy,
log_step_count_steps=100,
tf_random_seed=19830610,
model_dir=OUTDIR
)
hparams = dict({'learning_rate': 1e-4, 'batch_size': BATCH_SIZE})
classifier = tf.estimator.Estimator(
model_fn=model_fn, config=config, params= hparams)
train_spec = tf.estimator.TrainSpec(
input_fn= lambda: input_fn(
mode= tf.estimator.ModeKeys.TRAIN,
num_epochs= NUM_EPOCHS,
batch_size= BATCH_SIZE,
input_context=tf.distribute.InputContext(len(tfconfig['cluster']['worker']), tfconfig['task']['index'])
),
hooks= None,
)
eval_spec = tf.estimator.EvalSpec(
input_fn= lambda: input_fn(
mode= tf.estimator.ModeKeys.EVAL,
num_epochs= 1,
batch_size= 5000,
input_context=tf.distribute.InputContext(len(tfconfig['cluster']['worker']), tfconfig['task']['index'])
),
throttle_secs = 30,
steps=None
)
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
tf.estimator.train_and_evaluate(
classifier,
train_spec=train_spec,
eval_spec=eval_spec,
)