Skip to content

Frequently Asked Questions

leewyang edited this page Jul 20, 2017 · 19 revisions

FAQ

Why use only one core/task per executor?

This was a design choice as the most obvious, easiest-to-reason-about mapping of a Spark resource to a TF node. The most visible example is that each executor's logs will only contain the logs of a single TF node, which is much easier to debug vs. interspersed logs. Additionally, most Spark/YARN configurations (e.g. memory) can be almost directly mapped to the TF node. Finally, in Hadoop/YARN environments (which was our primary focus), you can still have multiple executors assigned to the same physical host if there are enough available resources.

Why doesn't algorithm X scale when I add more nodes?

Different algorithms are scalable in different ways, depending on whether they're limited by compute, memory, or I/O. In cases where an algorithm can run comfortably on a single-node (completing in a few seconds/minutes), adding the overhead of launching nodes in a cluster, coordinating work between nodes, and communicating gradients/weights across the cluster will generally be slower. For example, the distributed MNIST example is provided to demonstrate the process of porting a simple, easy-to-understand, distributed TF application over to TFoS. It it not intended to run faster than a single-node TF instance, where you can actually load the entire training set in memory and finish 1000 steps in a few seconds.

Similarly, the other examples also just demonstrate the conversion of different TF apps, and again they do not to demonstrate re-writing the code to be more scalable. Note that these examples were originally designed with different scalability models, as follows:

  • cifar10 - single-node, multi-gpu
  • mnist - multi-node, single-gpu-per-node
  • imagenet - multi-node, single-gpu-per-node
  • slim - single-node, multi-gpu OR multi-node, single-gpu-per-node

Finally, if an application takes a "painfully long" time to complete on a single-node, then it is a good candidate for scaling. However, you will still need to understand which resource is limited and how to modify in your code in order to scale for that resource in a distributed cluster.

What does KeyError: 'input' error mean?

Because we assume that one executor hosts only one TF node, you will encounter this error if your cluster is not configured accordingly. Specifically, this is seen when a RDD feeding task lands on an executor that doesn't contain a TF worker node, e.g. it lands on the PS node or even an idle executor.

Why does MNIST example hang?

There are two main causes for this symptom:

  1. In Hadoop/YARN environments, you are likely missing the path to the libhdfs.so in your spark.executorEnv.LD_LIBRARY_PATH. TensorFlow 1.1+ requires this to be set, and unfortunately, it just hangs around the Session creation time (and model/checkpoint creation) if it cannot find this library.
  2. If you are using different settings than the original example, you may need to increase the amount of data being fed to the cluster with the --epochs argument, and you may need to adjust the --steps argument accordingly. This is typically seen in InputMode.SPARK where most of the executors complete the feeding task, but the remaining executors are still waiting for more data. The root cause is that RDD partitions are fed to each executor, and the TF nodes are reading --batch_size elements from the partition at a time. And if the last (few) partitions do not align with a multiple of batch_size, then the TF node will continue to wait for more data, which never arrives. The simplest solution is to adjust the --steps to terminate the loop earlier than the last few partitions.
Clone this wiki locally