Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notes/Tests #1

Open
ryantanaka opened this issue Mar 30, 2019 · 0 comments
Open

Notes/Tests #1

ryantanaka opened this issue Mar 30, 2019 · 0 comments

Comments

@ryantanaka
Copy link
Member

ryantanaka commented Mar 30, 2019

Notes

Sources:

YARN

Job Submission

Screen Shot 2019-03-29 at 1 44 31 PM

Process of Submitting Job Through YARN

  1. client contacts resource manager and asks it to run an application master process (step 1)
  2. resource manager finds a node manager that can launch the application master in a container (steps 2a and 2b)
  3. the application master may request more containers from the resource manager so that those containers can be used to run distributed computations (such as Map and Reduce tasks in the case of a MapReduce job) (step 3)

Scheduling: 3 schedulers available in YARN

  • FIFO: simple FIFO scheduling; use this one since we won't be simulating a shared cluster
  • Capacity Scheduler: fixed sized queues such as one for small jobs and one for large jobs; comes with the cost of overall cluster utilization
  • Fair Scheduler: essentially giving each job its "fair share"

MapReduce

Job Submission & Initialization

 68   public static void main(String[] args) throws Exception {
 69     Configuration conf = new Configuration();
 70     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
 71     if (otherArgs.length < 2) {
 72       System.err.println("Usage: wordcount <in> [<in>...] <out>");
 73       System.exit(2);
 74     }
 75     Job job = Job.getInstance(conf, "word count");
 76     job.setJarByClass(WordCount.class);
 77     job.setMapperClass(TokenizerMapper.class);
 78     job.setCombinerClass(IntSumReducer.class);
 79     job.setReducerClass(IntSumReducer.class);
 80     job.setOutputKeyClass(Text.class);
 81     job.setOutputValueClass(IntWritable.class);
 82     for (int i = 0; i < otherArgs.length - 1; ++i) {
 83       FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
 84     }
 85     FileOutputFormat.setOutputPath(job,
 86       new Path(otherArgs[otherArgs.length - 1]));
 87     System.exit(job.waitForCompletion(true) ? 0 : 1);
 88   }

Screen Shot 2019-03-29 at 1 06 21 PM

Components

  • client: client node that submits the MapReduce job
  • YARN resource manager: coordinates the allocation of compute resources on the cluster
  • Yarn node managers: launch and monitor compute containers on machines in the cluster
  • MapReduce application master: coordinates tasks running the MR job; the application master and MR tasks run in containers that are scheduled by the resource manager and managed by the node managers

Process of Submitting MapReduce Job Through YARN

  1. Client submits MapReduce job by calling job.waitForCompletion() on line 87. This submits the job if it already hasn't yet been submitted. A JobSubmitter instance is created and submitJobInternal is called.
  2. JobSubmitter does the following:
    • ask RM for application ID
    • makes sure output directory is valid
    • computes input splits
    • copies resources needed to run the job to HDFS (job JAR file, config file, input splits)
    • finally submits job by calling submitApplication()
  3. RM receives call to submitApplication(), hands off request to YARN scheduler (part of RM) then allocates a container for the MR application master
  4. RM launches the MR application master's process in that container, which is under the node manager's management (steps 5a and 5b)
  5. MR application master receives input splits from HDFS so it knows how much map tasks are needed
  6. MR application master requests containers for all map and reduce tasks from the RM (requests for reduce tasks are not made until 5% of map tasks have completed)
    • depending on the value set for mapreduce.job.ubertask.* and if mapreduce.job.ubertask.enable is set to true, the whole job may just be run in the same JVM as the MR application master
  7. MR application master contacts the node manager where container allocation is to be made
  8. that node manager starts an instance of YarnChild in a separate JVM
  9. the instance of YarnChild localizes resources it needs to run the task (JAR files, and any files from a distributed cache)
  10. the YarnChild then can run the map or reduce task

Map and Reduce Tasks

Screen Shot 2019-04-02 at 10 09 10 AM

Map Tasks

  1. output from a map task is written to a circular memory buffer which is 100 MB by default (set with mapreduce.task.io.sort.mb)
    • when the buffer reaches 80% (or whatever is set by mapreduce.map.sort.spill.percent) a background thread will spill contents to disk
    • if the buffer fills up, the map will block until the spill thread is complete
    • each time the spill threshold is reached, a new spill file is created
  2. when the mapping is complete, all spill files (for this map task) are merged into a single partitioned and sorted output file (mapreduce.task.io.sort.factor controls the max number of streams to merge at once, and by default this is 10)
  3. the map task will notify the MR application master with a heartbeat specifying that the output has been produced
  4. the sorted, partitioned output file can then be served by the node manager that ran that map task using HTTP
    • the number of threads that can serve the partitions is twice the number of processors on the machine by default (mapreduce.shuffle.max.threads property)

Reduce Tasks

  1. if a reduce task has been started, and a map task with a partition that the reduce task needs has finished, the reduce task has a small number of threads (by default 5, and can be set with reduce.shuffle.parallelcopies) copy the partitions
    • these threads constantly communicate with the MR application master via heartbeat asking for hosts with map outputs to copy (this is how the reduce threads know where to copy what outputs from what hosts)
  2. files retrieved by the copy threads are copied to the reduce task's JVM memory if they are small enough ( mapreduce.reduce.shuffle.input.buffer.percent specifies the proportion of the heap to use for inputs)
    • when the in-memory buffer reaches a threshold size (mapreduce.reduce.shuffle.merge.percent) or reaches a threshold number of map outputs (mapreduce.reduce.merge.inmem.threshold) it is merged and spilled to disk
  3. copies accumulated on disk are merged into a larger sorted file by a background thread
  4. final round of merges go directly into reduce

Screen Shot 2019-04-02 at 11 25 48 AM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant