-
Notifications
You must be signed in to change notification settings - Fork 858
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
unified architecture v0.1 #418
base: master
Are you sure you want to change the base?
Conversation
try { | ||
AutoTuningMetricsController.init(); | ||
/* AutoTuningMetricsController.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove the commented code
@@ -18,6 +18,8 @@ | |||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; | |||
import com.linkedin.drelephant.ElephantContext; | |||
import com.linkedin.drelephant.tuning.obt.AutoTuningOptimizeManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these 2 imports required? There isnt any other code added in the file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , they are required . AutoTuningOptimizeManager is being refactored .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I did not see associated code change hence gave the comment. The file has been moved to a different package hence the change is required.
import org.apache.log4j.Logger; | ||
|
||
|
||
public abstract class AbstractAlgorithmManager implements Manager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Algorithm=>Tuning/TuningType? We are using the term tuning type to differentiate between HBT/OBT. OBT will then have further algorithms. Keep the code consistent with that terminology?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed .
*/ | ||
|
||
package com.linkedin.drelephant.tuning; | ||
package com.linkedin.drelephant.tuning.obt; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File header should not be removed
import org.apache.commons.io.FileUtils; | ||
import org.apache.log4j.Logger; | ||
import play.libs.Json; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
||
public class AlgorithmManagerOBT extends AbstractAlgorithmManager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The abstraction with regards to the algorithm isn't perfect here. We have code related to PSO in this class. Basically this class is meant for OBT and PSO code is algorithm specific. Code related to PSO/IPSO should go in a separate class and we should have further abstraction for specific optimization algorithm implementation. This could be the driver class which in turn invokes algorithm methods which implementations implement.
public void createAlgorithmManagersPipeline() { | ||
List<Manager> algorithmManagers = new ArrayList<Manager>(); | ||
algorithmManagers.add(new AlgorithmManagerHBT(new com.linkedin.drelephant.tuning.engine.MRExecutionEngine())); | ||
algorithmManagers.add(new AlgorithmManagerHBT(new com.linkedin.drelephant.tuning.engine.SparkExecutionEngine())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The class is already imported. Do not need to give full package name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , changed
*/ | ||
protected Boolean updateDataBase(List<TuningJobDefinition> tuningJobDefinitions) { | ||
for (TuningJobDefinition tuningJobDefinition : tuningJobDefinitions) { | ||
tuningJobDefinition.update(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if an exception is thrown during update? If it's taken up in next run, returning true from this method and checking it in caller is inconsequential
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes , agreed. Actually I return true for the sake of completeness in most of the methods. I am adding exception handling now in all those cases.
} | ||
} | ||
AutoTuningMetricsController.setBaselineComputeWaitJobs(baselineComputeWaitJobs); | ||
return true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true is always returned. Any need for this method to return boolean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again if exception is there , it should return false . Changed the code
TUNING_SCRIPT_PATH = PSO_DIR_PATH + "/pso_param_generation.py"; | ||
this._executionEngine=executionEngine; | ||
logger.info("Tuning script path: " + TUNING_SCRIPT_PATH); | ||
logger.info("Python path: " + PYTHON_PATH); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any need of logging python path?
Expr.eq(TuningJobExecutionParamSet.TABLE.jobExecution + '.' + JobExecution.TABLE.executionState, | ||
JobExecution.ExecutionState.CANCELLED)) | ||
.isNull(TuningJobExecutionParamSet.TABLE.jobExecution + '.' + JobExecution.TABLE.resourceUsage) | ||
.eq(TuningJobDefinition.TABLE.tuningAlgorithm, TuningAlgorithm.OptimizationAlgo.PSO.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is optimization algorithm specific code in OBT class. We should abstract out optimization algorithm into a separate class. Maybe one class which does it all based on the algorithm. Updating of DB can be in the OBT class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't understand .Lets discuss
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, talking specifically about TuningAlgorithm.OptimizationAlgo.PSO.name() being present in OBT class. Algorithm class should be abstracted out and PSO/IPSO implementations for it. Basically consider any reference to algorithm as PSO/IPSO in my comments. I will refer to HBT/OBT as tuning type
|
||
|
||
public abstract class AbstractAlgorithmManager implements Manager { | ||
protected final String JSON_CURRENT_POPULATION_KEY = "current_population"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the generation of param for each job is different, we can have a thread pool here, in the baseline manager, and in fitness compute manager as well to parallelize execution as much as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we are planning to have ThreadPool for each manager . That would be in nxt version
public BaselineManagerOBT() { | ||
NUM_JOBS_FOR_BASELINE_DEFAULT = 30; | ||
Configuration configuration = ElephantContext.instance().getAutoTuningConf(); | ||
_numJobsForBaseline = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we expecting this configuration to be different for OBT and HBT? If yes, this config should be named differently. If no, it should be read in base class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done . Only possible thing that can be different is NUM_JOBS_FOR_BASELINE_DEFAULT.
public abstract class AbstractAlgorithmManager implements Manager { | ||
protected final String JSON_CURRENT_POPULATION_KEY = "current_population"; | ||
private final Logger logger = Logger.getLogger(getClass()); | ||
protected abstract List<JobTuningInfo> detectJobsForParameterGeneration(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the method declaration below variable declaration.
} | ||
|
||
|
||
protected abstract JobTuningInfo generateParamSet(JobTuningInfo jobTuningInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As these abstract methods will have to be implemented, write a javadoc in detail explaining what each abstract method is supposed to do. Good for maintainability.
Applies for each abstract method in all the abstract classes added as part of this PR.
public void createJobStatusManagersPipeline() { | ||
List<Manager> jobStatusManagers = new ArrayList<Manager>(); | ||
jobStatusManagers.add(new AzkabanJobStatusManager()); | ||
//jobStatusManagers.add(new JobStatusManagerOBT()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove this line
|
||
|
||
public class Flow { | ||
Map<String, List<Manager>> pipelines = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason for adding a map here? We are just starting a thread for each type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope , my bad . Changed this to List of List
public void createAlgorithmManagersPipeline() { | ||
List<Manager> algorithmManagers = new ArrayList<Manager>(); | ||
algorithmManagers.add(new AlgorithmManagerHBT(new com.linkedin.drelephant.tuning.engine.MRExecutionEngine())); | ||
algorithmManagers.add(new AlgorithmManagerHBT(new com.linkedin.drelephant.tuning.engine.SparkExecutionEngine())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about having the execution engine i.e. Spark/MR being invoked inside AlgorithmManagerOBT/HBT? This can be tuning implementation specific. Having abstraction like this would ensure that we may not have to implement all execution types for a particular tuning type and we can probably not show it on the UI as well for a particular job type.
@@ -1014,6 +1022,7 @@ private void populateTestData() { | |||
private void populateAutoTuningTestData1() { | |||
try { | |||
initAutoTuningDB1(); | |||
//initDB(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove this line
@@ -163,8 +164,11 @@ public void run() { | |||
jobSuggestedParamSet.paramSetState = ParamSetStatus.EXECUTED; | |||
jobSuggestedParamSet.update(); | |||
|
|||
FitnessComputeUtil fitnessComputeUtil = new FitnessComputeUtil(); | |||
fitnessComputeUtil.updateFitness(); | |||
/*FitnessComputeUtil fitnessComputeUtil = new FitnessComputeUtil(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Remove this code and elsewhere i.e. where commented code is present
Expr.eq(TuningJobExecutionParamSet.TABLE.jobExecution + '.' + JobExecution.TABLE.executionState, | ||
JobExecution.ExecutionState.CANCELLED)) | ||
.isNull(TuningJobExecutionParamSet.TABLE.jobExecution + '.' + JobExecution.TABLE.resourceUsage) | ||
.eq(TuningJobDefinition.TABLE.tuningAlgorithm, TuningAlgorithm.OptimizationAlgo.PSO.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, talking specifically about TuningAlgorithm.OptimizationAlgo.PSO.name() being present in OBT class. Algorithm class should be abstracted out and PSO/IPSO implementations for it. Basically consider any reference to algorithm as PSO/IPSO in my comments. I will refer to HBT/OBT as tuning type
/* | ||
Use to execute the logic of all the managers . | ||
*/ | ||
Boolean execute(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary autoboxing. Do we need it? Same comment elsewhere as well
import java.util.List; | ||
|
||
|
||
public class AlgorithmManagerHBT extends AbstractAlgorithmManager{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Space before the brace. Space before and after "=". Checkstyle would catch it. We can scan through the code and fix them
import org.apache.log4j.Logger; | ||
|
||
|
||
public abstract class AbstractJobStatusManager implements Manager { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can have two schedulers supported in future? For instance, we use AppWorx as well. Should we have scheduler type in TuningJobExecutionParamSet then and this class then should call the scheduler implementation depending on scheduler type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes ,we have planned for it . This may be in the next version. Currently autotuning is for Azkaban.
Map<String, List<Manager>> pipelines = null; | ||
|
||
public Flow() { | ||
pipelines = new HashMap<String, List<Manager>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can think of Flow as 4 steps, running in parallel.
- Baseline
- Job status from Scheduler.
- Fitness Manager
- Parameter generation
Instead of having everything in the class Flow, which is primarily a driver class, should we have one interface for Tuning type and have implementations for it. This implementation can then give us further implementations for that specific tuning type for baseline, fitness compute and parameter generation.
This interface for tuning type would then be point of reference whenever we add a new tuning type.
Similarly we can do for scheduler in JobStatus manager.
For instance,
interface TuningType {
Baseline getBaselineImpl();
FitnessCompute getFitnessComputeImpl();
ParamGeneration getParamGenerationImpl();
}
Also as discussed and you said you have planned for it as well, we can add further thread pool at each step. Just capturing it as a comment
@@ -40,21 +43,22 @@ public void run() { | |||
JobSuggestedParamSet jobSuggestedParamSet = | |||
JobSuggestedParamSet.find.where().eq("fitness_job_execution_id", 1541).findUnique(); | |||
JobExecution jobExecution = JobExecution.find.byId(1541L); | |||
AutoTuningOptimizeManager optimizeManager = checkIPSOManager(tuningAlgorithm); | |||
com.linkedin.drelephant.tuning.obt.AutoTuningOptimizeManager optimizeManager = checkIPSOManager(tuningAlgorithm); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Full package name is not required. Is at other places too. Fix it wherever required.
Utils.getNonNegativeLong(configuration, IGNORE_EXECUTION_WAIT_INTERVAL, 2 * 60 * AutoTuner.ONE_MIN); | ||
|
||
// #executions after which tuning will stop even if parameters don't converge | ||
maxTuningExecutions = Utils.getNonNegativeInt(configuration, MAX_TUNING_EXECUTIONS, 39); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion: The default values can be made constants
@@ -18,6 +18,8 @@ | |||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; | |||
import com.linkedin.drelephant.ElephantContext; | |||
import com.linkedin.drelephant.tuning.obt.AutoTuningOptimizeManager; | |||
import com.linkedin.drelephant.tuning.obt.OptimizationAlgoFactory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we have to change getCurrentRunParameters method? Now the tuning type input would come from UI. Also TuningJobDefinition is set from this method
@@ -18,6 +18,8 @@ | |||
|
|||
import com.fasterxml.jackson.databind.ObjectMapper; | |||
import com.linkedin.drelephant.ElephantContext; | |||
import com.linkedin.drelephant.tuning.obt.AutoTuningOptimizeManager; | |||
import com.linkedin.drelephant.tuning.obt.OptimizationAlgoFactory; | |||
import com.linkedin.drelephant.util.Utils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we set penalty now?
protected Boolean calculateBaseLine(List<TuningJobDefinition> tuningJobDefinitions) { | ||
for (TuningJobDefinition tuningJobDefinition : tuningJobDefinitions) { | ||
try { | ||
logger.info("Computing and updating baseline metric values for job: " + tuningJobDefinition.job.jobName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tuning_job_definition has a column named tuningAlgorithm which is meant for PSO/IPSO. While with only two tuning types i.e. HBT/OBT as of now, we can probably use this table for HBT when tuningAlgorithm is NULL. But we should ideally have a column for tuning type to make the code extensible for a new tuning type in future (say ML based).Same goes for other tables as well
* @return List of jobs whose baseline needs to be added | ||
*/ | ||
|
||
protected abstract List<TuningJobDefinition> detectJobsForBaseLineComputation(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a javadoc here
|
||
public abstract class AbstractJobStatusManager implements Manager { | ||
private final Logger logger = Logger.getLogger(getClass()); | ||
protected abstract Boolean analyzeCompletedJobsExecution(List<TuningJobExecutionParamSet> inProgressExecutionParamSet); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pralabhkumar , it would be great if you can provide a link to the design doc or mention what is the goal of this unified architecture in the description.
protected final String BASELINE_EXECUTION_COUNT = "baseline.execution.count"; | ||
protected Integer NUM_JOBS_FOR_BASELINE_DEFAULT = 30; | ||
protected String baseLineCalculationSQL = | ||
"SELECT AVG(resource_used) AS resource_used, AVG(execution_time) AS execution_time FROM " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could use the java ebeans rather than hard coding the sql statements here. You may refer to other parts of Dr. Elephant for reference.
Review to move over to #430 |
This is pull request for the unified architecture