Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

Support jstorm error reporter and cluster error manager plugin. #444

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.alibaba.jstorm.client;

import backtype.storm.Config;
import backtype.storm.ConfigValidation;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.config.DefaultConfigUpdateHandler;
import com.alibaba.jstorm.utils.JStormUtils;
Expand Down Expand Up @@ -1310,4 +1311,21 @@ public static Boolean getTopologyAccurateMetric(Map conf) {
public static Integer getTopologyHistogramSize(Map conf) {
return JStormUtils.parseInt(conf.get(TOPOLOGY_HISTOGRAM_SIZE), 256);
}

public static final String WORKER_ERROR_REPORT_PLUGIN = "worker.error.reporter.plugin.class";

public static String getWorkerErrorReportPluginClass(Map conf) {
if (conf.containsKey(WORKER_ERROR_REPORT_PLUGIN)) {
return (String)conf.get(WORKER_ERROR_REPORT_PLUGIN);
}
return "com.alibaba.jstorm.daemon.worker.WorkerReportError";
}

public static final String CLUSTER_ERROR_MANAGER_PLUGIN = "cluster.error.manager.plugin";
public static final String getClusterErrorManagerPlugin(Map conf) {
if (conf.containsKey(CLUSTER_ERROR_MANAGER_PLUGIN)) {
return (String)conf.get(CLUSTER_ERROR_MANAGER_PLUGIN);
}
return "com.alibaba.jstorm.cluster.StormZkErrorManager";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ public static String blacklist_path(String key) {

@SuppressWarnings("rawtypes")
public static StormClusterState mk_storm_cluster_state(Map cluster_state_spec) throws Exception {
return new StormZkClusterState(cluster_state_spec);
return new StormZkClusterState(cluster_state_spec, cluster_state_spec);
}

public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec) throws Exception {
return new StormZkClusterState(cluster_state_spec);
public static StormClusterState mk_storm_cluster_state(ClusterState cluster_state_spec,
Map conf) throws Exception {
return new StormZkClusterState(cluster_state_spec, conf);
}

public static Map<Integer, TaskInfo> get_all_taskInfo(StormClusterState zkCluster, String topologyId) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.alibaba.jstorm.cluster;

import com.alibaba.jstorm.task.error.TaskError;

import java.util.List;
import java.util.Map;

/**
* Created by yunfan on 2017/2/28.
*/
public interface IStormErrorManager {

public void init(Map conf, ClusterState obj);

public List<String> task_error_storms() throws Exception;

public List<String> task_error_ids(String topologyId) throws Exception;

public void report_task_error(String topology_id, int task_id, Throwable error) throws Exception;

public void report_task_error(String topology_id, int task_id, String error) throws Exception;

public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code) throws Exception;

public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration) throws Exception;

public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code, int duration, String tag) throws Exception;

public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception;

public void remove_lastErr_time(String topologyId) throws Exception;

public List<TaskError> task_errors(String topology_id, int task_id) throws Exception;

public void remove_task_error(String topologyId, int taskId) throws Exception;

public List<String> task_error_time(String topologyId, int taskId) throws Exception;

public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception;

public void teardown_task_errors(String topology_id) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.jstorm.cache.JStormCache;
import com.alibaba.jstorm.callback.ClusterStateCallback;
import com.alibaba.jstorm.callback.RunnableCallback;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.supervisor.SupervisorInfo;
import com.alibaba.jstorm.schedule.Assignment;
import com.alibaba.jstorm.schedule.AssignmentBak;
Expand All @@ -46,6 +47,8 @@ public class StormZkClusterState implements StormClusterState {

private ClusterState cluster_state;

private IStormErrorManager stormErrorManager;

private ConcurrentHashMap<String, RunnableCallback> assignment_info_callback;
private AtomicReference<RunnableCallback> supervisors_callback;
private AtomicReference<RunnableCallback> assignments_callback;
Expand All @@ -57,17 +60,20 @@ public class StormZkClusterState implements StormClusterState {

private boolean solo;

public StormZkClusterState(Object cluster_state_spec) throws Exception {
public StormZkClusterState(Object cluster_state_spec, Map conf) throws Exception {

if (cluster_state_spec instanceof ClusterState) {
solo = false;
cluster_state = (ClusterState) cluster_state_spec;
} else {

solo = true;
cluster_state = new DistributedClusterState((Map) cluster_state_spec);
}

String errorPluginClass = ConfigExtension.getClusterErrorManagerPlugin(conf);
stormErrorManager = (IStormErrorManager)Utils.newInstance(errorPluginClass);
stormErrorManager.init(conf, cluster_state);

assignment_info_callback = new ConcurrentHashMap<String, RunnableCallback>();
supervisors_callback = new AtomicReference<RunnableCallback>(null);
assignments_callback = new AtomicReference<RunnableCallback>(null);
Expand Down Expand Up @@ -364,74 +370,32 @@ public void teardown_heartbeats(String topologyId) {

@Override
public void report_task_error(String topologyId, int taskId, Throwable error) throws Exception {
report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error),
stormErrorManager.report_task_error(topologyId, taskId, JStormUtils.getErrorInfo(error),
ErrorConstants.FATAL, ErrorConstants.CODE_USER);
}

@Override
public void report_task_error(String topology_id, int task_id, String error) throws Exception {
// we use this interface only in user level error
report_task_error(topology_id, task_id, error, ErrorConstants.FATAL, ErrorConstants.CODE_USER);
stormErrorManager.report_task_error(topology_id, task_id, error, ErrorConstants.FATAL, ErrorConstants.CODE_USER);
}

@Override
public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code)
throws Exception {
report_task_error(topology_id, task_id, error, error_level, error_code, ErrorConstants.DURATION_SECS_DEFAULT);
stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, ErrorConstants.DURATION_SECS_DEFAULT);
}

@Override
public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code,
int duration_secs) throws Exception {
report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null);
stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, null);
}

@Override
public void report_task_error(String topology_id, int task_id, String error, String error_level, int error_code,
int duration_secs, String tag) throws Exception {
boolean found = false;
String path = Cluster.taskerror_path(topology_id, task_id);
cluster_state.mkdirs(path);

List<Integer> children = new ArrayList<Integer>();

int timeSecs = TimeUtils.current_time_secs();
String timestampPath = path + Cluster.ZK_SEPERATOR + timeSecs;
TaskError taskError = new TaskError(error, error_level, error_code, timeSecs, duration_secs);

for (String str : cluster_state.get_children(path, false)) {
String errorPath = path + Cluster.ZK_SEPERATOR + str;
Object obj = getObject(errorPath, false);
if (obj == null){
deleteObject(errorPath);
continue;
}

TaskError errorInfo = (TaskError) obj;

// replace the old one if needed
if (errorInfo.getError().equals(error)
|| (tag != null && errorInfo.getError().startsWith(tag))) {
cluster_state.delete_node(errorPath);
setObject(timestampPath, taskError);
removeLastErrInfoDuration(topology_id, taskError.getDurationSecs());
found = true;
break;
}

children.add(Integer.parseInt(str));
}

if (!found) {
Collections.sort(children);

while (children.size() >= 3) {
deleteObject(path + Cluster.ZK_SEPERATOR + children.remove(0));
}

setObject(timestampPath, taskError);
}
setLastErrInfo(topology_id, duration_secs, timeSecs);
stormErrorManager.report_task_error(topology_id, task_id, error, error_level, error_code, duration_secs, tag);
}

private static final String TASK_IS_DEAD = "is dead on"; // Full string is
Expand Down Expand Up @@ -477,40 +441,32 @@ private void setLastErrInfo(String topologyId, int duration, int timeStamp) thro

@Override
public void remove_task_error(String topologyId, int taskId) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
cluster_state.delete_node(path);
stormErrorManager.remove_task_error(topologyId, taskId);
}

@Override
public Map<Integer, String> topo_lastErr_time(String topologyId) throws Exception {
String path = Cluster.lasterror_path(topologyId);

return (Map<Integer, String>) getObject(path, false);
return stormErrorManager.topo_lastErr_time(topologyId);
}

@Override
public void remove_lastErr_time(String topologyId) throws Exception {
String path = Cluster.lasterror_path(topologyId);
deleteObject(path);
stormErrorManager.remove_lastErr_time(topologyId);
}

@Override
public List<String> task_error_storms() throws Exception {
return cluster_state.get_children(Cluster.TASKERRORS_SUBTREE, false);
return stormErrorManager.task_error_storms();
}

@Override
public List<String> task_error_ids(String topologyId) throws Exception {
return cluster_state.get_children(Cluster.taskerror_storm_root(topologyId), false);
return stormErrorManager.task_error_ids(topologyId);
}

@Override
public List<String> task_error_time(String topologyId, int taskId) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
if (cluster_state.node_existed(path, false) == false) {
return new ArrayList<String>();
}
return cluster_state.get_children(path, false);
return stormErrorManager.task_error_time(topologyId, taskId);
}

@Override
Expand All @@ -529,56 +485,17 @@ public void remove_task(String topologyId, Set<Integer> taskIds) throws Exceptio

@Override
public TaskError task_error_info(String topologyId, int taskId, long timeStamp) throws Exception {
String path = Cluster.taskerror_path(topologyId, taskId);
path = path + "/" + timeStamp;
return (TaskError) getObject(path, false);
return stormErrorManager.task_error_info(topologyId, taskId, timeStamp);
}

@Override
public List<TaskError> task_errors(String topologyId, int taskId) throws Exception {
List<TaskError> errors = new ArrayList<TaskError>();
String path = Cluster.taskerror_path(topologyId, taskId);
if (cluster_state.node_existed(path, false) == false) {
return errors;
}

List<String> children = cluster_state.get_children(path, false);


for (String str : children) {
Object obj = getObject(path + Cluster.ZK_SEPERATOR + str, false);
if (obj != null) {
TaskError error = (TaskError) obj;
errors.add(error);
}
}

Collections.sort(errors, new Comparator<TaskError>() {

@Override
public int compare(TaskError o1, TaskError o2) {
if (o1.getTimSecs() > o2.getTimSecs()) {
return 1;
}
if (o1.getTimSecs() < o2.getTimSecs()) {
return -1;
}
return 0;
}
});

return errors;

return stormErrorManager.task_errors(topologyId, taskId);
}

@Override
public void teardown_task_errors(String topologyId) {
try {
String taskerrPath = Cluster.taskerror_storm_root(topologyId);
deleteObject(taskerrPath);
} catch (Exception e) {
LOG.error("Could not teardown errors for " + topologyId, e);
}
public void teardown_task_errors(String topologyId) throws Exception {
stormErrorManager.teardown_task_errors(topologyId);
}

@Override
Expand Down
Loading