diff --git a/README.md b/README.md
index 23e01437..403ac1f3 100644
--- a/README.md
+++ b/README.md
@@ -78,20 +78,9 @@ otter之前开源的一个子项目,开源链接地址:http://github.com/alibaba/yugong
-
-
-问题反馈
-
-注意:canal&otter QQ讨论群已经建立,群号:161559791 ,欢迎加入进行技术讨论。
-1. qq交流群: 161559791
-2. 邮件交流: jianghang115@gmail.com
-3. 新浪微博: agapple0002
-4. 报告issue:issues
-
-【招聘】阿里巴巴中间件团队招聘JAVA高级工程师
-岗位主要为技术型内容(非业务部门),阿里中间件整个体系对于未来想在技术上有所沉淀的同学还是非常有帮助的
-工作地点:杭州、北京均可. ps. 阿里待遇向来都是不错的,有意者可以QQ、微博私聊.
-具体招聘内容:https://job.alibaba.com/zhaopin/position_detail.htm?positionId=32666
+这个是基于阿里的otter修改的,因otter的不少包太老,特别是guava ,老的不行,本版本修改了google包的依赖
+添加了clickhouse的支持,postsql在完善中
+这个版本在公司里已经平稳运行一年,请放心使用
diff --git a/manager/biz/pom.xml b/manager/biz/pom.xml
index 86378fc4..eef14d15 100644
--- a/manager/biz/pom.xml
+++ b/manager/biz/pom.xml
@@ -62,6 +62,12 @@
ojdbc14
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+ 0.1.24
+
+
org.springframework
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/DataSourceCreator.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/DataSourceCreator.java
index 2ff9b3a5..e4d4c338 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/DataSourceCreator.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/DataSourceCreator.java
@@ -184,6 +184,10 @@ private DataSource createDataSource(String url, String userName, String password
}
}
// dbcpDs.setValidationQuery("select 1");
+ } else if (dataMediaType.isClickHouse()) {
+
+ dbcpDs.setValidationQuery("select 1");
+
} else {
logger.error("ERROR ## Unknow database type");
}
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/arbitrate/ArbitrateConfigImpl.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/arbitrate/ArbitrateConfigImpl.java
index 34a72cd6..f6cb2592 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/arbitrate/ArbitrateConfigImpl.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/common/arbitrate/ArbitrateConfigImpl.java
@@ -19,6 +19,9 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.springframework.beans.factory.InitializingBean;
import com.alibaba.otter.manager.biz.config.channel.ChannelService;
@@ -45,7 +48,7 @@ public class ArbitrateConfigImpl implements ArbitrateConfig, InitializingBean {
private static final Long DEFAULT_PERIOD = 60 * 1000L;
private Long timeout = DEFAULT_PERIOD;
private RefreshMemoryMirror channelCache;
- private Map channelMapping;
+ private LoadingCache channelMapping;
private ChannelService channelService;
private NodeService nodeService;
private RefreshMemoryMirror nodeCache;
@@ -68,12 +71,12 @@ public Channel findChannel(Long channelId) {
}
public Channel findChannelByPipelineId(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
return channelCache.get(channelId);
}
public Pipeline findOppositePipeline(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
@@ -86,7 +89,7 @@ public Pipeline findOppositePipeline(Long pipelineId) {
}
public Pipeline findPipeline(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
@@ -100,6 +103,7 @@ public Pipeline findPipeline(Long pipelineId) {
public void afterPropertiesSet() throws Exception {
// 获取一下nid变量
+ /* delete by liyc
channelMapping = new MapMaker().makeComputingMap(new Function() {
public Long apply(Long pipelineId) {
@@ -115,7 +119,22 @@ public Long apply(Long pipelineId) {
}
});
+ */
+ channelMapping = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public Long load(Long pipelineId) {
+ // 处理下pipline -> channel映射关系不存在的情况
+ Channel channel = channelService.findByPipelineId(pipelineId);
+ if (channel == null) {
+ throw new ConfigException("No Such Channel by pipelineId[" + pipelineId + "]");
+ }
+
+ updateMapping(channel, pipelineId);// 排除下自己
+ channelCache.put(channel.getId(), channel);// 更新下channelCache
+ return channel.getId();
+
+ }
+ });
channelCache = new RefreshMemoryMirror(timeout, new ComputeFunction() {
public Channel apply(Long key, Channel oldValue) {
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamedia/impl/DataMediaServiceImpl.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamedia/impl/DataMediaServiceImpl.java
index cbb88ab2..fcc71061 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamedia/impl/DataMediaServiceImpl.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamedia/impl/DataMediaServiceImpl.java
@@ -310,7 +310,9 @@ private DataMedia doToModel(DataMediaDO dataMediaDo) {
DataMedia dataMedia = null;
try {
DataMediaSource dataMediaSource = dataMediaSourceService.findById(dataMediaDo.getDataMediaSourceId());
- if (dataMediaSource.getType().isMysql() || dataMediaSource.getType().isOracle()) {
+ if (dataMediaSource.getType().isMysql()
+ || dataMediaSource.getType().isOracle()
+ || dataMediaSource.getType().isClickHouse()) {
dataMedia = JsonUtils.unmarshalFromString(dataMediaDo.getProperties(), DbDataMedia.class);
dataMedia.setSource(dataMediaSource);
} else if (dataMediaSource.getType().isNapoli() || dataMediaSource.getType().isMq()) {
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamediasource/impl/DataMediaSourceServiceImpl.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamediasource/impl/DataMediaSourceServiceImpl.java
index e76a3c66..b46acddb 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamediasource/impl/DataMediaSourceServiceImpl.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/config/datamediasource/impl/DataMediaSourceServiceImpl.java
@@ -218,7 +218,9 @@ private DataMediaSource doToModel(DataMediaSourceDO dataMediaSourceDo) {
DataMediaSource dataMediaSource = new DbMediaSource();
try {
- if (dataMediaSourceDo.getType().isMysql() || dataMediaSourceDo.getType().isOracle()) {
+ if (dataMediaSourceDo.getType().isMysql()
+ || dataMediaSourceDo.getType().isOracle()
+ || dataMediaSourceDo.getType().isClickHouse()) {
dataMediaSource = JsonUtils.unmarshalFromString(dataMediaSourceDo.getProperties(), DbMediaSource.class);
} else if (dataMediaSourceDo.getType().isNapoli() || dataMediaSourceDo.getType().isMq()) {
dataMediaSource = JsonUtils.unmarshalFromString(dataMediaSourceDo.getProperties(), MqMediaSource.class);
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/monitor/impl/DefaultAlarmController.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/monitor/impl/DefaultAlarmController.java
index 547bfc2e..01cdde5d 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/monitor/impl/DefaultAlarmController.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/monitor/impl/DefaultAlarmController.java
@@ -24,6 +24,8 @@
import com.alibaba.otter.manager.biz.monitor.AlarmRecovery;
import com.alibaba.otter.shared.common.model.config.alarm.AlarmRule;
import com.alibaba.otter.shared.common.model.config.alarm.MonitorName;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import com.google.common.collect.MapMaker;
/**
@@ -34,7 +36,10 @@ public class DefaultAlarmController implements AlarmController {
// seconds
private Long DEFAULT_THRESHOLD = 1800L;
- private Map pool = new MapMaker().expireAfterWrite(1, TimeUnit.HOURS).makeMap();
+// private Map pool = new MapMaker().expireAfterWrite(1, TimeUnit.HOURS).makeMap();
+ //add by liyc
+ private Cache pool = CacheBuilder.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).build();
+
private AlarmRecovery restartAlarmRecovery;
@Override
@@ -48,7 +53,7 @@ public AlarmMessage control(AlarmRule rule, String message, AlarmMessage data) {
Long threshold = rule.getIntervalTime() == null ? DEFAULT_THRESHOLD : rule.getIntervalTime();
PoolKey key = new PoolKey(rule, message, data);
- PoolValue value = pool.get(key);
+ PoolValue value = pool.getIfPresent(key); //edit by liyc
boolean needAlarm = true;
Long now = System.currentTimeMillis();
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/NodeMBeanServiceImpl.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/NodeMBeanServiceImpl.java
index a57d15f9..c43d187d 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/NodeMBeanServiceImpl.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/NodeMBeanServiceImpl.java
@@ -32,8 +32,11 @@
import com.alibaba.otter.manager.biz.remote.NodeRemoteService;
import com.alibaba.otter.shared.common.model.config.node.Node;
import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
@@ -47,7 +50,7 @@ public class NodeMBeanServiceImpl implements NodeRemoteService {
private static final String SERVICE_URL = "service:jmx:rmi://{0}/jndi/rmi://{0}:{1}/mbean";
private ObjectName objectName;
private NodeService nodeService;
- private Map mbeanServers;
+ private LoadingCache mbeanServers;
public NodeMBeanServiceImpl(){
try {
@@ -55,7 +58,7 @@ public NodeMBeanServiceImpl(){
} catch (Exception e) {
throw new ManagerException(e);
}
-
+ /*
GenericMapMaker mapMaker = null;
mapMaker = new MapMaker().expireAfterAccess(5, TimeUnit.MINUTES)
.softValues()
@@ -94,6 +97,36 @@ public MBeanServerConnection apply(Long nid) {
}
});
+ */
+ mbeanServers= CacheBuilder.newBuilder()
+ .expireAfterAccess(5, TimeUnit.MINUTES)
+ .build(new CacheLoader() {
+ public MBeanServerConnection load(Long nid) {
+ Node node = nodeService.findById(nid);
+ String ip = node.getIp();
+ if (node.getParameters().getUseExternalIp()) {
+ ip = node.getParameters().getExternalIp();
+ }
+
+ int port = node.getPort().intValue() + 1;
+ Integer mbeanPort = node.getParameters().getMbeanPort();
+ if (mbeanPort != null && mbeanPort != 0) {// 做个兼容处理,<=4.2.2版本没有mbeanPort设置
+ port = mbeanPort;
+ }
+
+ try {
+ JMXServiceURL serviceURL = new JMXServiceURL(MessageFormat.format(SERVICE_URL,
+ ip,
+ String.valueOf(port)));
+ JMXConnector cntor = JMXConnectorFactory.connect(serviceURL, null);
+ MBeanServerConnection mbsc = cntor.getMBeanServerConnection();
+ return mbsc;
+ } catch (Exception e) {
+ throw new ManagerException(e);
+ }
+ }
+
+ });
}
public String getHeapMemoryUsage(Long nid) {
@@ -127,7 +160,8 @@ public void setProfile(Long nid, boolean profile) {
new Object[] { profile },
new String[] { "java.lang.Boolean" });
} catch (Exception e) {
- mbeanServers.remove(nid);
+ //mbeanServers.remove(nid); edit by liyc
+ mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
@@ -139,7 +173,8 @@ public void setThreadPoolSize(Long nid, int size) {
new Object[] { size },
new String[] { "java.lang.Integer" });
} catch (Exception e) {
- mbeanServers.remove(nid);
+ //mbeanServers.remove(nid);edit by liyc
+ mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
@@ -200,7 +235,8 @@ private Object getAttribute(Long nid, String attribute) {
try {
return mbeanServers.get(nid).getAttribute(objectName, attribute);
} catch (Exception e) {
- mbeanServers.remove(nid);
+ //mbeanServers.remove(nid);edit by liyc
+ mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
@@ -212,7 +248,8 @@ private Object invoke(Long nid, Long pipelineId, String method) {
new Object[] { pipelineId },
new String[] { "java.lang.Long" });
} catch (Exception e) {
- mbeanServers.remove(nid);
+ //mbeanServers.remove(nid);edit by liyc
+ mbeanServers.invalidate(nid);
throw new ManagerException(e);
}
}
diff --git a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/StatsRemoteServiceImpl.java b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/StatsRemoteServiceImpl.java
index 81d7ce9f..4a17fece 100644
--- a/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/StatsRemoteServiceImpl.java
+++ b/manager/biz/src/main/java/com/alibaba/otter/manager/biz/remote/impl/StatsRemoteServiceImpl.java
@@ -24,6 +24,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -61,8 +64,8 @@ public class StatsRemoteServiceImpl implements StatsRemoteService {
private ThroughputStatService throughputStatService;
private Long statUnit = 60 * 1000L; //统计周期,默认60秒
private ScheduledThreadPoolExecutor scheduler;
- private Map delayStats;
- private Map> throughputStats;
+ private LoadingCache delayStats;
+ private LoadingCache> throughputStats;
public StatsRemoteServiceImpl(){
// 注册一下事件处理
@@ -70,6 +73,7 @@ public StatsRemoteServiceImpl(){
CommunicationRegistry.regist(StatisticsEventType.tableStat, this);
CommunicationRegistry.regist(StatisticsEventType.throughputStat, this);
+ /* delete by liyc
delayStats = new MapMaker().makeComputingMap(new Function() {
public AvgStat apply(Long pipelineId) {
@@ -82,7 +86,20 @@ public Map apply(Long pipelineId) {
return new HashMap();
}
});
+ */
+ delayStats = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public AvgStat load(Long pipelineId) {
+ return new AvgStat();
+ }
+ });
+
+ throughputStats = CacheBuilder.newBuilder().build(new CacheLoader>() {
+
+ public Map load(Long pipelineId) {
+ return new HashMap();
+ }
+ });
scheduler = new ScheduledThreadPoolExecutor(DEFAULT_POOL, new NamedThreadFactory("Otter-Statistics-Server"),
new ThreadPoolExecutor.CallerRunsPolicy());
if (statUnit > 0) {
@@ -126,7 +143,7 @@ public void onDelayCount(DelayCountEvent event) {
delayStatService.createDelayStat(stat);
} else {
synchronized (delayStats) {
- delayStats.get(count.getPipelineId()).merge(stat);
+ delayStats.getUnchecked(count.getPipelineId()).merge(stat); //edit by liyc
}
}
}
@@ -141,7 +158,7 @@ public void onThroughputStat(ThroughputStatEvent event) {
} else {
synchronized (throughputStats) {
for (ThroughputStat stat : event.getStats()) {
- Map data = throughputStats.get(stat.getPipelineId());
+ Map data = throughputStats.getUnchecked(stat.getPipelineId()); //edit by liyc
ThroughputStat old = data.get(stat.getType());
if (old != null) {
//执行合并
@@ -173,7 +190,7 @@ public void onTableStat(TableStatEvent event) {
private void flushDelayStat() {
synchronized (delayStats) {
// 需要做同步,避免delay数据丢失
- for (Map.Entry stat : delayStats.entrySet()) {
+ for (Map.Entry stat : delayStats.asMap().entrySet()) { //edit by liyc
if (stat.getValue().count.get() > 0) {
DelayStat delay = new DelayStat();
delay.setPipelineId(stat.getKey());
@@ -182,19 +199,19 @@ private void flushDelayStat() {
delayStatService.createDelayStat(delay);
}
}
- delayStats.clear();
+ delayStats.cleanUp(); //edit by liyc
}
}
private void flushThroughputStat() {
synchronized (throughputStats) {
- Collection
+
org.springframework
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/ColumnPairAction.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/ColumnPairAction.java
index 0dd0a01c..4f985375 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/ColumnPairAction.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/ColumnPairAction.java
@@ -98,7 +98,9 @@ public void doSave(@Param("dataMediaPairId") Long dataMediaPairId, @Param("submi
columnPair.setDataMediaPairId(dataMediaPairId);
columnPairs.add(columnPair);
}
- } else if (targetMedia.getSource().getType().isMysql() || targetMedia.getSource().getType().isOracle()) {
+ } else if (targetMedia.getSource().getType().isMysql()
+ || targetMedia.getSource().getType().isOracle()
+ || targetMedia.getSource().getType().isClickHouse()) {
for (ColumnPair columnPair : columnPairsInDb) {
int i = 0;
for (String sourceColumnName : sourceColumnNames) {
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaAction.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaAction.java
index 760b373b..0a8d9273 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaAction.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaAction.java
@@ -60,7 +60,9 @@ public void doAdd(@FormGroup("dataMediaInfo") Group dataMediaInfo,
DataMedia dataMedia = new DataMedia();
dataMediaInfo.setProperties(dataMedia);
DataMediaSource dataMediaSource = dataMediaSourceService.findById(dataMediaInfo.getField("sourceId").getLongValue());
- if (dataMediaSource.getType().isMysql() || dataMediaSource.getType().isOracle()) {
+ if (dataMediaSource.getType().isMysql()
+ || dataMediaSource.getType().isOracle()
+ || dataMediaSource.getType().isClickHouse()) {
dataMedia.setSource((DbMediaSource) dataMediaSource);
} else if (dataMediaSource.getType().isNapoli() || dataMediaSource.getType().isMq()) {
dataMedia.setSource((MqMediaSource) dataMediaSource);
@@ -95,7 +97,9 @@ public void doEdit(@FormGroup("dataMediaInfo") Group dataMediaInfo, @Param("page
DataMedia dataMedia = new DataMedia();
dataMediaInfo.setProperties(dataMedia);
DataMediaSource dataMediaSource = dataMediaSourceService.findById(dataMediaInfo.getField("sourceId").getLongValue());
- if (dataMediaSource.getType().isMysql() || dataMediaSource.getType().isOracle()) {
+ if (dataMediaSource.getType().isMysql()
+ || dataMediaSource.getType().isOracle()
+ || dataMediaSource.getType().isClickHouse()) {
dataMedia.setSource((DbMediaSource) dataMediaSource);
} else if (dataMediaSource.getType().isNapoli() || dataMediaSource.getType().isMq()) {
dataMedia.setSource((MqMediaSource) dataMediaSource);
diff --git a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaSourceAction.java b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaSourceAction.java
index f0dc00c8..2467a75a 100644
--- a/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaSourceAction.java
+++ b/manager/web/src/main/java/com/alibaba/otter/manager/web/home/module/action/DataMediaSourceAction.java
@@ -54,11 +54,15 @@ public void doAdd(@FormGroup("dataMediaSourceInfo") Group dataMediaSourceInfo,
DataMediaSource dataMediaSource = new DataMediaSource();
dataMediaSourceInfo.setProperties(dataMediaSource);
- if (dataMediaSource.getType().isMysql() || dataMediaSource.getType().isOracle()) {
+ if (dataMediaSource.getType().isMysql()
+ || dataMediaSource.getType().isOracle()
+ || dataMediaSource.getType().isClickHouse()) {
DbMediaSource dbMediaSource = new DbMediaSource();
dataMediaSourceInfo.setProperties(dbMediaSource);
if (dataMediaSource.getType().isMysql()) {
dbMediaSource.setDriver("com.mysql.jdbc.Driver");
+ } else if (dataMediaSource.getType().isClickHouse()) {
+ dbMediaSource.setDriver("ru.yandex.clickhouse.ClickHouseDriver");
} else if (dataMediaSource.getType().isOracle()) {
dbMediaSource.setDriver("oracle.jdbc.driver.OracleDriver");
}
@@ -105,6 +109,8 @@ public void doEdit(@FormGroup("dataMediaSourceInfo") Group dataMediaSourceInfo,
if (dbMediaSource.getType().isMysql()) {
dbMediaSource.setDriver("com.mysql.jdbc.Driver");
+ } else if (dbMediaSource.getType().isClickHouse()) {
+ dbMediaSource.setDriver("ru.yandex.clickhouse.ClickHouseDriver");
} else if (dbMediaSource.getType().isOracle()) {
dbMediaSource.setDriver("oracle.jdbc.driver.OracleDriver");
}
diff --git a/node/common/src/main/java/com/alibaba/otter/node/common/config/impl/ConfigClientServiceImpl.java b/node/common/src/main/java/com/alibaba/otter/node/common/config/impl/ConfigClientServiceImpl.java
index a140b7f3..0108c626 100644
--- a/node/common/src/main/java/com/alibaba/otter/node/common/config/impl/ConfigClientServiceImpl.java
+++ b/node/common/src/main/java/com/alibaba/otter/node/common/config/impl/ConfigClientServiceImpl.java
@@ -19,6 +19,9 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +57,7 @@ public class ConfigClientServiceImpl implements InternalConfigClientService, Arb
private Long nid;
private NodeCommmunicationClient nodeCommmunicationClient;
private RefreshMemoryMirror channelCache;
- private Map channelMapping; // 将pipelineId映射为channelId
+ private LoadingCache channelMapping; // 将pipelineId映射为channelId
private RefreshMemoryMirror nodeCache;
public ConfigClientServiceImpl(){
@@ -77,12 +80,12 @@ public Channel findChannel(Long channelId) {
}
public Channel findChannelByPipelineId(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId);//edit by liyc
return channelCache.get(channelId);
}
public Pipeline findOppositePipeline(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
@@ -95,7 +98,7 @@ public Pipeline findOppositePipeline(Long pipelineId) {
}
public Pipeline findPipeline(Long pipelineId) {
- Long channelId = channelMapping.get(pipelineId);
+ Long channelId = channelMapping.getUnchecked(pipelineId); //edit by liyc
Channel channel = channelCache.get(channelId);
List pipelines = channel.getPipelines();
for (Pipeline pipeline : pipelines) {
@@ -119,7 +122,7 @@ public void afterPropertiesSet() throws Exception {
}
this.nid = Long.valueOf(nid);
-
+ /* delete by liyc
channelMapping = new MapMaker().makeComputingMap(new Function() {
public Long apply(Long pipelineId) {
@@ -141,7 +144,28 @@ public Long apply(Long pipelineId) {
throw new ConfigException("No Such Channel by pipelineId[" + pipelineId + "]");
}
});
+ */
+ channelMapping = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public Long load(Long pipelineId) {
+ // 处理下pipline -> channel映射关系不存在的情况
+ FindChannelEvent event = new FindChannelEvent();
+ event.setPipelineId(pipelineId);
+ try {
+ Object obj = nodeCommmunicationClient.callManager(event);
+ if (obj != null && obj instanceof Channel) {
+ Channel channel = (Channel) obj;
+ updateMapping(channel, pipelineId);// 排除下自己
+ channelCache.put(channel.getId(), channel);// 更新下channelCache
+ return channel.getId();
+ }
+ } catch (Exception e) {
+ logger.error("call_manager_error", event.toString(), e);
+ }
+
+ throw new ConfigException("No Such Channel by pipelineId[" + pipelineId + "]");
+ }
+ });
nodeCache = new RefreshMemoryMirror(timeout, new ComputeFunction() {
public Node apply(Long key, Node oldValue) {
diff --git a/node/deployer/src/main/bin/startup.sh b/node/deployer/src/main/bin/startup.sh
index 1cc6b4cd..bed6e6af 100644
--- a/node/deployer/src/main/bin/startup.sh
+++ b/node/deployer/src/main/bin/startup.sh
@@ -95,9 +95,9 @@ esac
str=`file $JAVA_HOME/bin/java | grep 64-bit`
if [ -n "$str" ]; then
- JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
+ JAVA_OPTS="-server -Xms2048m -Xmx4096m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
- JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
+ JAVA_OPTS="-server -Xms1024m -Xmx4096 -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
fi
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
diff --git a/node/etl/pom.xml b/node/etl/pom.xml
index f30f3d3e..5617a5ed 100644
--- a/node/etl/pom.xml
+++ b/node/etl/pom.xml
@@ -55,6 +55,13 @@
com.oracle
ojdbc14
+
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+ 0.1.24
+
+
org.apache.commons
commons-compress
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java
index 35434068..22a0d4fb 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/OtterController.java
@@ -25,6 +25,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
+import com.google.common.cache.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,12 +68,35 @@ public class OtterController implements NodeTaskListener, OtterControllerMBean {
private static final Logger logger = LoggerFactory.getLogger(OtterController.class);
// 第一层为pipelineId,第二层为S.E.T.L模块
+ /* delete by liyc
private Map> controllers = new MapMaker().makeComputingMap(new Function>() {
public Map apply(Long pipelineId) {
return new MapMaker().makeMap();
}
});
+ */
+ private LoadingCache> controllers = CacheBuilder
+ .newBuilder()
+ .removalListener(new RemovalListener>() {
+ public void onRemoval(RemovalNotification> removal) {
+ if (removal != null) {
+ logger.info("INFO ## shutdown this pipeline sync ,the pipelineId = {} and tasks = {}",
+ removal.getKey(),
+ removal.getValue().keySet());
+ stopPipeline(removal.getKey(), removal.getValue());
+ } else {
+ logger.info("INFO ## this pipeline id = {} is not start sync", removal.getKey());
+ }
+ }
+ })
+ .build(new CacheLoader>() {
+
+ public Map load(Long pipelineId) {
+ return new MapMaker().makeMap();
+ }
+ });
+
private ConfigClientService configClientService;
private ArbitrateManageService arbitrateManageService;
private NodeTaskService nodeTaskService;
@@ -91,7 +115,7 @@ public void start() throws Throwable {
}
public void stop() throws Throwable {
- for (Map tasks : controllers.values()) {
+ for (Map tasks : controllers.asMap().values()) {
for (GlobalTask task : tasks.values()) {
try {
task.shutdown();
@@ -138,6 +162,7 @@ public boolean process(List nodeTasks) {
boolean shutdown = nodeTask.isShutdown();
Long pipelineId = nodeTask.getPipeline().getId();
if (shutdown) {
+ /* delete by liyc
Map tasks = controllers.remove(pipelineId);
if (tasks != null) {
logger.info("INFO ## shutdown this pipeline sync ,the pipelineId = {} and tasks = {}", pipelineId,
@@ -146,6 +171,8 @@ public boolean process(List nodeTasks) {
} else {
logger.info("INFO ## this pipeline id = {} is not start sync", pipelineId);
}
+ */
+ controllers.invalidate(pipelineId);
} else {
startPipeline(nodeTask);
}
@@ -159,7 +186,7 @@ public boolean process(List nodeTasks) {
public void startPipeline(NodeTask nodeTask) {
Long pipelineId = nodeTask.getPipeline().getId();
releasePipeline(pipelineId);
- Map tasks = controllers.get(pipelineId);
+ Map tasks = controllers.getUnchecked(pipelineId); //edit by liyc
// 处理具体的任务命令
List stage = nodeTask.getStage();
List event = nodeTask.getEvent();
@@ -285,12 +312,12 @@ public String getNodeVersionInfo() {
}
public int getRunningPipelineCount() {
- return controllers.size();
+ return controllers.asMap().size(); //edit by liyc
}
public List getRunningPipelines() {
- return new ArrayList(controllers.keySet());
- }
+ return new ArrayList(controllers.asMap().keySet());
+ } //edit by liyc
public int getThreadActiveSize() {
if (executorService instanceof ThreadPoolExecutor) {
@@ -323,19 +350,19 @@ public void setProfile(boolean profile) {
}
public boolean isSelectRunning(Long pipelineId) {
- return controllers.get(pipelineId).containsKey(StageType.SELECT);
+ return controllers.getUnchecked(pipelineId).containsKey(StageType.SELECT); //edit by liyc
}
public boolean isExtractRunning(Long pipelineId) {
- return controllers.get(pipelineId).containsKey(StageType.EXTRACT);
+ return controllers.getUnchecked(pipelineId).containsKey(StageType.EXTRACT);//edit by liyc
}
public boolean isTransformRunning(Long pipelineId) {
- return controllers.get(pipelineId).containsKey(StageType.TRANSFORM);
+ return controllers.getUnchecked(pipelineId).containsKey(StageType.TRANSFORM); //edit by liyc
}
public boolean isLoadRunning(Long pipelineId) {
- return controllers.get(pipelineId).containsKey(StageType.LOAD);
+ return controllers.getUnchecked(pipelineId).containsKey(StageType.LOAD); //edit by liyc
}
public String selectStageAggregation(Long pipelineId) {
@@ -371,7 +398,7 @@ public String loadPendingProcess(Long pipelineId) {
}
private String pendingProcess(Long pipelineId, StageType stage) {
- GlobalTask task = controllers.get(pipelineId).get(stage);
+ GlobalTask task = controllers.getUnchecked(pipelineId).get(stage); //edit by liyc
if (task != null) {
return "stage:" + stage + " , pending:[" + StringUtils.join(task.getPendingProcess(), ',') + "]";
} else {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/datasource/impl/DBDataSourceService.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/datasource/impl/DBDataSourceService.java
index 1686a0cd..dbb518c0 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/datasource/impl/DBDataSourceService.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/datasource/impl/DBDataSourceService.java
@@ -20,9 +20,11 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import javax.sql.DataSource;
+import com.google.common.cache.*;
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -37,10 +39,9 @@
import com.alibaba.otter.shared.common.model.config.data.DataMediaType;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
-
/**
* Comment of DataSourceServiceImpl
*
@@ -76,9 +77,10 @@ public class DBDataSourceService implements DataSourceService, DisposableBean {
* key = pipelineId
* value = key(dataMediaSourceId)-value(DataSource)
*/
- private Map> dataSources;
+ private LoadingCache> dataSources;
public DBDataSourceService(){
+ /* delete by liyc
// 设置soft策略
GenericMapMaker mapMaker = new MapMaker().softValues();
mapMaker = ((MapMaker) mapMaker).evictionListener(new MapEvictionListener>() {
@@ -129,15 +131,73 @@ public DataSource apply(DbMediaSource dbMediaSource) {
});
}
});
+ */
+ // 设置soft策略
+ RemovalListener> removalListener =
+ new RemovalListener>(){
+ public void onRemoval(RemovalNotification> tmp) {
+
+ if (tmp.getValue() == null) {
+ return;
+ }
+
+ for (DataSource dataSource : tmp.getValue().asMap().values()) {
+ // for filter to destroy custom datasource
+ if (letHandlerDestroyIfSupport(tmp.getKey(), dataSource)) {
+ continue;
+ }
+ BasicDataSource basicDataSource = (BasicDataSource) dataSource;
+ try {
+ basicDataSource.close();
+ } catch (SQLException e) {
+ logger.error("ERROR ## close the datasource has an error", e);
+ }
+ }
+ tmp.getValue().cleanUp();
+ }
+ };
+
+ // 构建第一层map
+ dataSources = CacheBuilder.newBuilder().removalListener(removalListener)
+ .build(new CacheLoader>() {
+ public LoadingCache load(final Long pipelineId) {
+ // 构建第二层map
+ return CacheBuilder.newBuilder().build(new CacheLoader() {
+ public DataSource load(DbMediaSource dbMediaSource) {
+ // 扩展功能,可以自定义一些自己实现的 dataSource
+ DataSource customDataSource = preCreate(pipelineId, dbMediaSource);
+ if (customDataSource != null) {
+ return customDataSource;
+ }
+
+ return createDataSource(dbMediaSource.getUrl(),
+ dbMediaSource.getUsername(),
+ dbMediaSource.getPassword(),
+ dbMediaSource.getDriver(),
+ dbMediaSource.getType(),
+ dbMediaSource.getEncode());
+ }
+
+ });
+ }
+ });
}
- public DataSource getDataSource(long pipelineId, DataMediaSource dataMediaSource) {
+
+ public DataSource getDataSource(long pipelineId, DataMediaSource dataMediaSource) {
Assert.notNull(dataMediaSource);
- return dataSources.get(pipelineId).get(dataMediaSource);
+ try {
+ return dataSources.get(pipelineId).get((DbMediaSource)dataMediaSource); //edit by liyc
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ }
+ return null;
}
public void destroy(Long pipelineId) {
+ dataSources.invalidate(pipelineId);
+ /*
Map sources = dataSources.remove(pipelineId);
if (sources != null) {
for (DataSource source : sources.values()) {
@@ -158,6 +218,7 @@ public void destroy(Long pipelineId) {
sources.clear();
}
+ */
}
private boolean letHandlerDestroyIfSupport(Long pipelineId, DataSource dataSource) {
@@ -178,7 +239,7 @@ private boolean letHandlerDestroyIfSupport(Long pipelineId, DataSource dataSourc
}
public void destroy() throws Exception {
- for (Long pipelineId : dataSources.keySet()) {
+ for (Long pipelineId : dataSources.asMap().keySet()) {
destroy(pipelineId);
}
}
@@ -227,6 +288,10 @@ private DataSource createDataSource(String url, String userName, String password
}
}
dbcpDs.setValidationQuery("select 1");
+ } else if (dataMediaType.isClickHouse()) {
+
+ dbcpDs.setValidationQuery("select 1");
+
} else {
logger.error("ERROR ## Unknow database type");
}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/AbstractDbDialect.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/AbstractDbDialect.java
index 27f61b77..7788a005 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/AbstractDbDialect.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/AbstractDbDialect.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.NestableRuntimeException;
import org.apache.ddlutils.model.Table;
@@ -41,8 +42,8 @@
import com.alibaba.otter.shared.common.utils.meta.DdlUtils;
import com.alibaba.otter.shared.common.utils.meta.DdlUtilsFilter;
import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
@@ -60,7 +61,7 @@ public abstract class AbstractDbDialect implements DbDialect {
protected JdbcTemplate jdbcTemplate;
protected TransactionTemplate transactionTemplate;
protected LobHandler lobHandler;
- protected Map, Table> tables;
+ protected LoadingCache, Table> tables;
public AbstractDbDialect(final JdbcTemplate jdbcTemplate, LobHandler lobHandler){
this.jdbcTemplate = jdbcTemplate;
@@ -105,10 +106,11 @@ public AbstractDbDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, Strin
public Table findTable(String schema, String table, boolean useCache) {
List key = Arrays.asList(schema, table);
if (useCache == false) {
- tables.remove(key);
+ //tables.remove(key); delete by liyc
+ tables.invalidate(key);
}
- return tables.get(key);
+ return tables.getUnchecked(key); //edit by liyc
}
public Table findTable(String schema, String table) {
@@ -117,10 +119,12 @@ public Table findTable(String schema, String table) {
public void reloadTable(String schema, String table) {
if (StringUtils.isNotEmpty(table)) {
- tables.remove(Arrays.asList(schema, table));
+ //tables.remove(Arrays.asList(schema, table)); delete by liyc
+ tables.invalidate(Arrays.asList(schema, table));
} else {
// 如果没有存在表名,则直接清空所有的table,重新加载
- tables.clear();
+ //tables.clear(); delete by liyc
+ tables.cleanUp();
}
}
@@ -171,7 +175,7 @@ public void destory() {
// ================================ helper method ==========================
private void initTables(final JdbcTemplate jdbcTemplate) {
- // soft引用设置,避免内存爆了
+ /* soft引用设置,避免内存爆了
GenericMapMaker mapMaker = null;
mapMaker = new MapMaker().softValues().evictionListener(new MapEvictionListener, Table>() {
@@ -201,6 +205,35 @@ public Table apply(List names) {
}
}
});
+ */
+ RemovalListener,Table> removalListener = new RemovalListener,Table>() {
+ public void onRemoval(RemovalNotification,Table> removal) {
+ logger.warn("Eviction For Table:" + removal.getValue());
+ }
+ };
+ this.tables = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .removalListener(removalListener)
+ .build(new CacheLoader, Table>() {
+ public Table load(List names) {
+ Assert.isTrue(names.size() == 2);
+ try {
+ beforeFindTable(jdbcTemplate, names.get(0), names.get(0), names.get(1));
+ DdlUtilsFilter filter = getDdlUtilsFilter(jdbcTemplate, names.get(0), names.get(0), names.get(1));
+ Table table = DdlUtils.findTable(jdbcTemplate, names.get(0), names.get(0), names.get(1), filter);
+ afterFindTable(table, jdbcTemplate, names.get(0), names.get(0), names.get(1));
+ if (table == null) {
+ throw new NestableRuntimeException("no found table [" + names.get(0) + "." + names.get(1)
+ + "] , pls check");
+ } else {
+ return table;
+ }
+ } catch (Exception e) {
+ throw new NestableRuntimeException("find table [" + names.get(0) + "." + names.get(1) + "] error",
+ e);
+ }
+ }
+ });
}
protected DdlUtilsFilter getDdlUtilsFilter(JdbcTemplate jdbcTemplate, String catalogName, String schemaName,
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectFactory.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectFactory.java
index 57d6daa8..8847e25a 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectFactory.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectFactory.java
@@ -25,6 +25,7 @@
import javax.sql.DataSource;
+import com.google.common.cache.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
@@ -34,9 +35,9 @@
import com.alibaba.otter.node.etl.common.datasource.DataSourceService;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
-import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+//import com.google.common.base.Function;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
@@ -50,9 +51,10 @@ public class DbDialectFactory implements DisposableBean {
private DbDialectGenerator dbDialectGenerator;
// 第一层pipelineId , 第二层DbMediaSource id
- private Map> dialects;
-
+ private LoadingCache> dialects;
+ private RemovalListener> removalListener;
public DbDialectFactory(){
+ /*
// 构建第一层map
GenericMapMaker mapMaker = null;
mapMaker = new MapMaker().softValues()
@@ -110,24 +112,80 @@ public Object doInConnection(Connection c) throws SQLException, DataAccessExcept
});
}
});
+ */
+ removalListener = new RemovalListener>() {
+ public void onRemoval(RemovalNotification> removal) {
+ LoadingCache tmp = removal.getValue();
+ if (tmp != null) {
+ for (DbDialect dbDialect : tmp.asMap().values()) {
+ dbDialect.destory();
+ }
+ }
+ }
+ };
+
+ dialects = CacheBuilder.newBuilder().removalListener(removalListener).build(new CacheLoader>() {
+
+ public LoadingCache load(final Long pipelineId) {
+ // 构建第二层map
+ return CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public DbDialect load(final DbMediaSource source) {
+ DataSource dataSource = dataSourceService.getDataSource(pipelineId, source);
+ final JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
+ return (DbDialect) jdbcTemplate.execute(new ConnectionCallback() {
+
+ public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+ DatabaseMetaData meta = c.getMetaData();
+ String databaseName = meta.getDatabaseProductName();
+ String databaseVersion = meta.getDatabaseProductVersion();
+ int databaseMajorVersion = meta.getDatabaseMajorVersion();
+ int databaseMinorVersion = meta.getDatabaseMinorVersion();
+ DbDialect dialect = dbDialectGenerator.generate(jdbcTemplate,
+ databaseName,
+ databaseVersion,
+ databaseMajorVersion,
+ databaseMinorVersion,
+ source.getType());
+ if (dialect == null) {
+ throw new UnsupportedOperationException("no dialect for" + databaseName);
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info(String.format("--- DATABASE: %s, SCHEMA: %s ---",
+ databaseName,
+ (dialect.getDefaultSchema() == null) ? dialect.getDefaultCatalog() : dialect.getDefaultSchema()));
+ }
+
+ return dialect;
+ }
+ });
+
+ }
+ });
+ }
+ });
}
public DbDialect getDbDialect(Long pipelineId, DbMediaSource source) {
- return dialects.get(pipelineId).get(source);
+ return dialects.getUnchecked(pipelineId).getUnchecked(source);
}
public void destory(Long pipelineId) {
+
+ dialects.invalidate(pipelineId);
+ /* delete by liyc
Map dialect = dialects.remove(pipelineId);
if (dialect != null) {
for (DbDialect dbDialect : dialect.values()) {
dbDialect.destory();
- }
+ }
}
+ */
}
public void destroy() throws Exception {
- Set pipelineIds = new HashSet(dialects.keySet());
+ Set pipelineIds = new HashSet(dialects.asMap().keySet());
for (Long pipelineId : pipelineIds) {
destory(pipelineId);
}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectGenerator.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectGenerator.java
index 524fe6c9..c697da09 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectGenerator.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/DbDialectGenerator.java
@@ -16,6 +16,7 @@
package com.alibaba.otter.node.etl.common.db.dialect;
+import com.alibaba.otter.node.etl.common.db.dialect.clickhouse.ClickHouseDialect;
import org.apache.commons.lang.StringUtils;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.support.lob.LobHandler;
@@ -32,6 +33,7 @@ public class DbDialectGenerator {
protected static final String ORACLE = "oracle";
protected static final String MYSQL = "mysql";
+ protected static final String CLICKHOUSE = "ClickHouse";
protected static final String TDDL_GROUP = "TGroupDatabase";
protected static final String TDDL_CLIENT = "TDDL";
@@ -41,7 +43,7 @@ public class DbDialectGenerator {
protected DbDialect generate(JdbcTemplate jdbcTemplate, String databaseName, String databaseNameVersion,
int databaseMajorVersion, int databaseMinorVersion, DataMediaType dataMediaType) {
DbDialect dialect = null;
-
+ //TODO add by liyc 如果要添加库,要在这里添加
if (StringUtils.startsWithIgnoreCase(databaseName, ORACLE)) { // for
// oracle
dialect = new OracleDialect(jdbcTemplate,
@@ -57,6 +59,13 @@ protected DbDialect generate(JdbcTemplate jdbcTemplate, String databaseName, Str
databaseNameVersion,
databaseMajorVersion,
databaseMinorVersion);
+ } else if (StringUtils.startsWithIgnoreCase(databaseName,CLICKHOUSE )) { // for
+ // clickhouse
+ dialect = new ClickHouseDialect(jdbcTemplate,
+ oracleLobHandler,
+ databaseName,
+ databaseMajorVersion,
+ databaseMinorVersion);
} else if (StringUtils.startsWithIgnoreCase(databaseName, TDDL_GROUP)) { // for
// tddl
// group
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseDialect.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseDialect.java
new file mode 100644
index 00000000..9f0a363f
--- /dev/null
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseDialect.java
@@ -0,0 +1,52 @@
+package com.alibaba.otter.node.etl.common.db.dialect.clickhouse;
+
+import com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect;
+import com.alibaba.otter.node.etl.common.db.dialect.oracle.OracleSqlTemplate;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.support.lob.LobHandler;
+
+/**
+ * Created by liyc on 2017/5/9 0009.
+ * 只支持insert,其他的忽略
+ */
+public class ClickHouseDialect extends AbstractDbDialect {
+ public ClickHouseDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler){
+ super(jdbcTemplate, lobHandler);
+ sqlTemplate = new ClickHouseSqlTemplate();
+ }
+
+ public ClickHouseDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String name, int majorVersion,
+ int minorVersion){
+ super(jdbcTemplate, lobHandler, name, majorVersion, minorVersion);
+ sqlTemplate = new ClickHouseSqlTemplate();
+ }
+
+ public boolean isCharSpacePadded() {
+ return true;
+ }
+
+ public boolean isCharSpaceTrimmed() {
+ return false;
+ }
+
+ public boolean isEmptyStringNulled() {
+ return true;
+ }
+
+ public boolean storesUpperCaseNamesInCatalog() {
+ return false;
+ }
+
+ public boolean isSupportMergeSql() {
+ return false;
+ }
+
+ public String getDefaultCatalog() {
+ return "default";
+ }
+
+ public String getDefaultSchema() {
+ return (String) jdbcTemplate.queryForObject("SELECT currentDatabase()", String.class);
+ }
+
+}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseSqlTemplate.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseSqlTemplate.java
new file mode 100644
index 00000000..61439245
--- /dev/null
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/clickhouse/ClickHouseSqlTemplate.java
@@ -0,0 +1,28 @@
+package com.alibaba.otter.node.etl.common.db.dialect.clickhouse;
+
+import com.alibaba.otter.node.etl.common.db.dialect.AbstractSqlTemplate;
+
+/**
+ * Created by liyc on 2017/5/9 0009.
+ */
+public class ClickHouseSqlTemplate extends AbstractSqlTemplate {
+
+ private static final String ESCAPE = "`";
+
+ public String getMergeSql(String schemaName, String tableName, String[] pkNames, String[] columnNames,
+ String[] viewColumnNames, boolean includePks) {
+ return getInsertSql(schemaName,tableName,pkNames,columnNames);
+ }
+
+ public String getDeleteSql(String schemaName, String tableName, String[] pkNames) {
+ return null;
+ }
+
+ public String getUpdateSql(String schemaName, String tableName, String[] pkNames, String[] columnNames) {
+ return null;
+ }
+
+ protected String appendEscape(String columnName) {
+ return ESCAPE + columnName + ESCAPE;
+ }
+}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/mysql/MysqlDialect.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/mysql/MysqlDialect.java
index 70eaa8b6..d4ec8fce 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/mysql/MysqlDialect.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/mysql/MysqlDialect.java
@@ -20,6 +20,7 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.NestableRuntimeException;
import org.apache.ddlutils.model.Table;
@@ -30,8 +31,8 @@
import com.alibaba.otter.node.etl.common.db.dialect.AbstractDbDialect;
import com.alibaba.otter.shared.common.utils.meta.DdlUtils;
import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
@@ -43,7 +44,7 @@
public class MysqlDialect extends AbstractDbDialect {
private boolean isDRDS = false;
- private Map, String> shardColumns;
+ private LoadingCache, String> shardColumns;
public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler){
super(jdbcTemplate, lobHandler);
@@ -62,7 +63,7 @@ public MysqlDialect(JdbcTemplate jdbcTemplate, LobHandler lobHandler, String nam
}
private void initShardColumns() {
- // soft引用设置,避免内存爆了
+ /* soft引用设置,避免内存爆了 delete by liyc
GenericMapMaker mapMaker = null;
mapMaker = new MapMaker().softValues().evictionListener(new MapEvictionListener, Table>() {
@@ -88,6 +89,31 @@ public String apply(List names) {
}
}
});
+ */
+ RemovalListener,String> removalListener = new RemovalListener,String>() {
+ public void onRemoval(RemovalNotification,String> removal) {
+ logger.warn("Eviction For Table:" + removal.getValue());
+ }
+ };
+ this.shardColumns = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .removalListener(removalListener)
+ .build(new CacheLoader, String>() {
+ public String load(List names) {
+ Assert.isTrue(names.size() == 2);
+ try {
+ String result = DdlUtils.getShardKeyByDRDS(jdbcTemplate, names.get(0), names.get(0), names.get(1));
+ if (StringUtils.isEmpty(result)) {
+ return "";
+ } else {
+ return result;
+ }
+ } catch (Exception e) {
+ throw new NestableRuntimeException("find table [" + names.get(0) + "." + names.get(1) + "] error",
+ e);
+ }
+ }
+ });
}
public boolean isCharSpacePadded() {
@@ -116,7 +142,7 @@ public boolean isDRDS() {
public String getShardColumns(String schema, String table) {
if (isDRDS()) {
- return shardColumns.get(Arrays.asList(schema, table));
+ return shardColumns.getUnchecked(Arrays.asList(schema, table)); //edit by liyc
} else {
return null;
}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlDialect.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlDialect.java
new file mode 100644
index 00000000..10f319fe
--- /dev/null
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlDialect.java
@@ -0,0 +1,7 @@
+package com.alibaba.otter.node.etl.common.db.dialect.postgresql;
+
+/**
+ * Created by Administrator on 2017/5/9 0009.
+ */
+public class PostgresqlDialect {
+}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlTemplate.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlTemplate.java
new file mode 100644
index 00000000..520626f6
--- /dev/null
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/dialect/postgresql/PostgresqlTemplate.java
@@ -0,0 +1,7 @@
+package com.alibaba.otter.node.etl.common.db.dialect.postgresql;
+
+/**
+ * Created by Administrator on 2017/5/9 0009.
+ */
+public class PostgresqlTemplate {
+}
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/utils/SqlUtils.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/utils/SqlUtils.java
index c436a2f7..4ec36b81 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/utils/SqlUtils.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/db/utils/SqlUtils.java
@@ -35,7 +35,7 @@
*/
public class SqlUtils {
- public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = " ";
+ public static final String REQUIRED_FIELD_NULL_SUBSTITUTE = "";
public static final String SQLDATE_FORMAT = "yyyy-MM-dd";
public static final String TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final Map> sqlTypeToJavaTypeMap = new HashMap>();
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/jmx/StageAggregationCollector.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/jmx/StageAggregationCollector.java
index edaaa025..6c80fe3d 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/jmx/StageAggregationCollector.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/jmx/StageAggregationCollector.java
@@ -22,6 +22,9 @@
import com.alibaba.otter.node.etl.common.jmx.StageAggregation.AggregationItem;
import com.alibaba.otter.shared.common.model.config.enums.StageType;
import com.google.common.base.Function;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapMaker;
/**
@@ -32,20 +35,35 @@
*/
public class StageAggregationCollector {
- private Map> collector;
+ private LoadingCache> collector;
private AtomicBoolean profiling = new AtomicBoolean(true);
+
public StageAggregationCollector(){
this(1024);
}
+ /* delete by liyc
+ public StageAggregationCollector(final int bufferSize){
+ collector = new MapMaker().makeComputingMap(new Function>() {
+
+ public Map apply(Long input) {
+ return new MapMaker().makeComputingMap(new Function() {
+ public StageAggregation apply(StageType input) {
+ return new StageAggregation(bufferSize);
+ }
+ });
+ }
+ });
+ }
+ */
public StageAggregationCollector(final int bufferSize){
- collector = new MapMaker().makeComputingMap(new Function>() {
+ collector = CacheBuilder.newBuilder().build(new CacheLoader>() {
- public Map apply(Long input) {
- return new MapMaker().makeComputingMap(new Function() {
+ public LoadingCache load(Long input) {
+ return CacheBuilder.newBuilder().build(new CacheLoader() {
- public StageAggregation apply(StageType input) {
+ public StageAggregation load(StageType input) {
return new StageAggregation(bufferSize);
}
});
@@ -54,11 +72,11 @@ public StageAggregation apply(StageType input) {
}
public void push(Long pipelineId, StageType stage, AggregationItem aggregationItem) {
- collector.get(pipelineId).get(stage).push(aggregationItem);
+ collector.getUnchecked(pipelineId).getUnchecked(stage).push(aggregationItem); //edit by liyc
}
public String histogram(Long pipelineId, StageType stage) {
- return collector.get(pipelineId).get(stage).histogram();
+ return collector.getUnchecked(pipelineId).getUnchecked(stage).histogram();
}
public boolean isProfiling() {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/AbstractMemoryPipe.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/AbstractMemoryPipe.java
index 42c4759d..a98fa8d0 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/AbstractMemoryPipe.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/AbstractMemoryPipe.java
@@ -19,6 +19,9 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+//import com.google.common.cache.CacheLoader;
import org.springframework.beans.factory.InitializingBean;
import com.alibaba.otter.node.etl.common.pipe.Pipe;
@@ -35,11 +38,15 @@ public abstract class AbstractMemoryPipe implement
protected Long timeout = 60 * 1000L; // 对应的超时时间,1分钟
- protected Map cache;
+ protected Cache cache;
public void afterPropertiesSet() throws Exception {
+
+ /* delete by liyc
// 一定要设置过期时间,因为针对rollback操作,不会有后续的节点来获取数据,需要自动过期删除掉
cache = new MapMaker().expireAfterWrite(timeout, TimeUnit.MILLISECONDS).softValues().makeMap();
+ */
+ cache = CacheBuilder.newBuilder().expireAfterWrite(timeout, TimeUnit.MILLISECONDS).softValues().build();
}
// ============== setter / getter ===============
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/RowDataMemoryPipe.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/RowDataMemoryPipe.java
index aeae56fe..9e3206d6 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/RowDataMemoryPipe.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/memory/RowDataMemoryPipe.java
@@ -25,6 +25,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
@@ -65,7 +66,10 @@ public MemoryPipeKey put(DbBatch data) {
}
public DbBatch get(MemoryPipeKey key) {
- return cache.remove(key);
+ //return cache.remove(key); edit by liyc
+ DbBatch x=cache.getIfPresent(key);
+ cache.invalidate(key);
+ return x;
}
// 处理对应的附件
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/AbstractRpcPipe.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/AbstractRpcPipe.java
index 4667a17e..ab494bee 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/AbstractRpcPipe.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/AbstractRpcPipe.java
@@ -19,6 +19,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.springframework.beans.factory.InitializingBean;
import com.alibaba.otter.node.etl.common.pipe.Pipe;
@@ -42,10 +44,15 @@ public abstract class AbstractRpcPipe implements Pipe
protected Long timeout = 60 * 1000L; // 对应的超时时间,1分钟
- protected Map cache;
+ protected Cache cache;
public void afterPropertiesSet() throws Exception {
+ /* delete by liyc
cache = new MapMaker().expireAfterWrite(timeout, TimeUnit.MILLISECONDS).softValues().makeMap();
+ */
+ cache = CacheBuilder.newBuilder()
+ .expireAfterWrite(timeout, TimeUnit.MILLISECONDS)
+ .softValues().build();
}
// rpc get操作事件
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/RowDataRpcPipe.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/RowDataRpcPipe.java
index b5397adb..d14f11fe 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/RowDataRpcPipe.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/common/pipe/impl/rpc/RowDataRpcPipe.java
@@ -24,6 +24,8 @@
import com.alibaba.otter.shared.communication.core.model.EventType;
import com.alibaba.otter.shared.etl.model.DbBatch;
+import java.util.concurrent.ExecutionException;
+
/**
* 基于rpc调用实现rowData的数据传递
*
@@ -63,7 +65,8 @@ public DbBatch get(RpcPipeKey key) throws PipeException {
@SuppressWarnings("unused")
// 处理rpc调用事件
private DbBatch onGet(RpcEvent event) {
- return cache.remove(event.getKey()); // 不建议使用remove,rpc调用容易有retry请求,导致第二次拿到的数据为null
+ return cache.getIfPresent(event.getKey());
+ //return cache.remove(event.getKey()); // 不建议使用remove,rpc调用容易有retry请求,导致第二次拿到的数据为null
}
private Long getNid() {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/LoadStatsTracker.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/LoadStatsTracker.java
index 0d3586e7..e565bf12 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/LoadStatsTracker.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/LoadStatsTracker.java
@@ -22,6 +22,9 @@
import com.alibaba.otter.shared.etl.model.Identity;
import com.google.common.base.Function;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapMaker;
/**
@@ -32,46 +35,66 @@
*/
public class LoadStatsTracker {
- private Map throughputs;
+ private LoadingCache throughputs;
public LoadStatsTracker(){
+ /* add by liyc
throughputs = new MapMaker().makeComputingMap(new Function() {
public LoadThroughput apply(Identity identity) {
return new LoadThroughput(identity);
}
});
+ */
+ throughputs = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public LoadThroughput load(Identity identity) {
+ return new LoadThroughput(identity);
+ }
+ });
+
}
public LoadThroughput getStat(Identity identity) {
- return throughputs.get(identity);
+ return throughputs.getUnchecked(identity);
}
public void removeStat(Identity identity) {
- throughputs.remove(identity);
+ //add by liyc
+ throughputs.invalidate(identity);
+ //throughputs.remove(identity);
}
public static class LoadThroughput {
private Identity identity;
private Long startTime;
- private Map counters;
+ private LoadingCache counters;
public LoadThroughput(Identity identity){
+ /* delete by liyc
counters = new MapMaker().makeComputingMap(new Function() {
public LoadCounter apply(Long pairId) {
return new LoadCounter(pairId);
}
});
+ */
+ counters = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public LoadCounter load(Long pairId) {
+ return new LoadCounter(pairId);
+ }
+ });
}
public LoadCounter getStat(Long pairId) {
- return counters.get(pairId);
+ //return counters.get(pairId); delete by liyc
+ return counters.getUnchecked(pairId);
}
public Collection getStats() {
- return counters.values();
+ return counters.asMap().values();// edit by liyc
}
public Identity getIdentity() {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DataBatchLoader.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DataBatchLoader.java
index c0f216ac..77402949 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DataBatchLoader.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DataBatchLoader.java
@@ -26,6 +26,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -209,6 +212,7 @@ public DbLoadContext call() throws Exception {
*/
private List split(RowBatch rowBatch) {
final Identity identity = rowBatch.getIdentity();
+ /*
Map result = new MapMaker().makeComputingMap(new Function() {
public RowBatch apply(DataMediaSource input) {
@@ -217,15 +221,24 @@ public RowBatch apply(DataMediaSource input) {
return rowBatch;
}
});
+ */
+ LoadingCache result = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public RowBatch load(DataMediaSource input) {
+ RowBatch rowBatch = new RowBatch();
+ rowBatch.setIdentity(identity);
+ return rowBatch;
+ }
+ });
for (EventData eventData : rowBatch.getDatas()) {
// 获取介质信息
DataMedia media = ConfigHelper.findDataMedia(configClientService.findPipeline(identity.getPipelineId()),
eventData.getTableId());
- result.get(media.getSource()).merge(eventData); // 归类
+ result.getUnchecked(media.getSource()).merge(eventData); // 归类 edit by liyc
}
- return new ArrayList(result.values());
+ return new ArrayList(result.asMap().values());
}
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DbLoadAction.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DbLoadAction.java
index ad686921..9e7be892 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DbLoadAction.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/DbLoadAction.java
@@ -32,6 +32,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.alibaba.otter.node.etl.common.db.dialect.clickhouse.ClickHouseDialect;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.ddlutils.model.Column;
@@ -106,14 +107,17 @@ public class DbLoadAction implements InitializingBean, DisposableBean {
/**
* 返回结果为已处理成功的记录
+ * add by liyc 这里做特殊处理,es不用启动多线程处理,clickhouse 需要验证线程池的稳定性
*/
+ //todo es client本来就是一个笨重的多线程程序
+
public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
Assert.notNull(rowBatch);
Identity identity = rowBatch.getIdentity();
DbLoadContext context = buildContext(identity);
-
try {
List datas = rowBatch.getDatas();
+ System.out.println("********************begin datas size " + datas.size()+"****************");
context.setPrepareDatas(datas);
// 执行重复录入数据过滤
datas = context.getPrepareDatas();
@@ -129,6 +133,7 @@ public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
interceptor.prepare(context);
// 执行重复录入数据过滤
datas = context.getPrepareDatas();
+
// 处理下ddl语句,ddl/dml语句不可能是在同一个batch中,由canal进行控制
// 主要考虑ddl的幂等性问题,尽可能一个ddl一个batch,失败或者回滚都只针对这条sql
if (isDdlDatas(datas)) {
@@ -151,9 +156,12 @@ public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
List items = buckets.getItems(weight);
logger.debug("##start load for weight:" + weight);
// 预处理下数据
-
// 进行一次数据合并,合并相同pk的多次I/U/D操作
- items = DbLoadMerger.merge(items);
+ DbDialect dbDialect = dbDialectFactory.getDbDialect(context.getIdentity().getPipelineId(),
+ (DbMediaSource)context.getDataMediaSource());
+ if (!(dbDialect instanceof ClickHouseDialect)){
+ items = DbLoadMerger.merge(items);
+ }
// 按I/U/D进行归并处理
DbLoadData loadData = new DbLoadData();
doBefore(items, context, loadData);
@@ -161,6 +169,7 @@ public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
doLoad(context, loadData);
controller.single(weight.intValue());
logger.debug("##end load for weight:" + weight);
+ logger.debug("##end load for weight:" + weight +" record: "+items.size() + " preeced id :" + identity.getProcessId());
}
}
interceptor.commit(context);
@@ -171,8 +180,9 @@ public DbLoadContext load(RowBatch rowBatch, WeightController controller) {
interceptor.error(context);
throw new LoadException(e);
}
-
+ logger.info("end datas context failed size " + context.getProcessedDatas().size()+" Success size " + context.getProcessedDatas().size());
return context;// 返回处理成功的记录
+
}
private DbLoadContext buildContext(Identity identity) {
@@ -398,7 +408,6 @@ private void doTwoPhase(DbLoadContext context, List> totalRows,
if (CollectionUtils.isEmpty(rows)) {
continue; // 过滤空记录
}
-
results.add(executor.submit(new DbLoadWorker(context, rows, canBatch)));
}
@@ -412,9 +421,9 @@ private void doTwoPhase(DbLoadContext context, List> totalRows,
interceptor.after(context, data);// 通知加载完成
}
} catch (Exception e) {
+ e.printStackTrace();
ex = e;
}
-
if (ex != null) {
logger.warn("##load phase one failed!", ex);
partFailed = true;
@@ -561,9 +570,8 @@ private Exception doCall() {
index = end;// 移动到下一批次
} else {
splitDatas.add(datas.get(index));
- index = index + 1;// 移动到下一条
+ index++;// 移动到下一条
}
-
int retryCount = 0;
while (true) {
try {
@@ -573,72 +581,78 @@ private Exception doCall() {
} else {
failedDatas.addAll(splitDatas); // 先添加为出错记录,可能获取lob,datasource会出错
}
-
- final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
- if (useBatch && canBatch) {
- // 处理batch
- final String sql = splitDatas.get(0).getSql();
- int[] affects = new int[splitDatas.size()];
- affects = (int[]) dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
-
- public Object doInTransaction(TransactionStatus status) {
- // 初始化一下内容
- try {
- failedDatas.clear(); // 先清理
- processedDatas.clear();
- interceptor.transactionBegin(context, splitDatas, dbDialect);
- JdbcTemplate template = dbDialect.getJdbcTemplate();
- int[] affects = template.batchUpdate(sql, new BatchPreparedStatementSetter() {
-
- public void setValues(PreparedStatement ps, int idx) throws SQLException {
- doPreparedStatement(ps, dbDialect, lobCreator, splitDatas.get(idx));
- }
-
- public int getBatchSize() {
- return splitDatas.size();
- }
- });
- interceptor.transactionEnd(context, splitDatas, dbDialect);
- return affects;
- } finally {
- lobCreator.close();
- }
- }
-
- });
-
+ if ( dbDialect instanceof ClickHouseDialect && ( splitDatas.get(0).getEventType().isDelete() ||
+ splitDatas.get(0).getEventType().isUpdate())) {
// 更新统计信息
- for (int i = 0; i < splitDatas.size(); i++) {
- processStat(splitDatas.get(i), affects[i], true);
+ for (int v = 0; v < splitDatas.size(); v++) {
+ processStat(splitDatas.get(v), 1, true);
}
} else {
- final EventData data = splitDatas.get(0);// 直接取第一条
- int affect = 0;
- affect = (Integer) dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
-
- public Object doInTransaction(TransactionStatus status) {
- try {
- failedDatas.clear(); // 先清理
- processedDatas.clear();
- interceptor.transactionBegin(context, Arrays.asList(data), dbDialect);
- JdbcTemplate template = dbDialect.getJdbcTemplate();
- int affect = template.update(data.getSql(), new PreparedStatementSetter() {
-
- public void setValues(PreparedStatement ps) throws SQLException {
- doPreparedStatement(ps, dbDialect, lobCreator, data);
- }
- });
- interceptor.transactionEnd(context, Arrays.asList(data), dbDialect);
- return affect;
- } finally {
- lobCreator.close();
+ //clickhouse 不支持
+ final LobCreator lobCreator = dbDialect.getLobHandler().getLobCreator();
+ if (useBatch && canBatch) {
+ // 处理batch
+ final String sql = splitDatas.get(0).getSql();
+ int[] affects = new int[splitDatas.size()];
+ //clickhouse 不支持 update,delete ,判断sql是否为null
+ affects = (int[]) dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
+ public Object doInTransaction(TransactionStatus status) {
+ // 初始化一下内容
+ try {
+ failedDatas.clear(); // 先清理
+ processedDatas.clear();
+ interceptor.transactionBegin(context, splitDatas, dbDialect);
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ int[] affects = template.batchUpdate(sql, new BatchPreparedStatementSetter() {
+
+ public void setValues(PreparedStatement ps, int idx) throws SQLException {
+ doPreparedStatement(ps, dbDialect, lobCreator, splitDatas.get(idx));
+ }
+ public int getBatchSize() {
+ return splitDatas.size();
+ }
+
+ });
+ interceptor.transactionEnd(context, splitDatas, dbDialect);
+ return affects;
+ } finally {
+ lobCreator.close();
+ }
}
+
+ });
+ // 更新统计信息
+ for (int i = 0; i < splitDatas.size(); i++) {
+ processStat(splitDatas.get(i), affects[i], true);
}
- });
- // 更新统计信息
- processStat(data, affect, false);
+ } else {
+ final EventData data = splitDatas.get(0);// 直接取第一条
+ int affect = 0;
+ affect = (Integer) dbDialect.getTransactionTemplate().execute(new TransactionCallback() {
+
+ public Object doInTransaction(TransactionStatus status) {
+ try {
+ failedDatas.clear(); // 先清理
+ processedDatas.clear();
+ interceptor.transactionBegin(context, Arrays.asList(data), dbDialect);
+ JdbcTemplate template = dbDialect.getJdbcTemplate();
+ int affect = template.update(data.getSql(), new PreparedStatementSetter() {
+
+ public void setValues(PreparedStatement ps) throws SQLException {
+ doPreparedStatement(ps, dbDialect, lobCreator, data);
+ }
+ });
+ interceptor.transactionEnd(context, Arrays.asList(data), dbDialect);
+ return affect;
+ } finally {
+ lobCreator.close();
+ }
+ }
+ });
+ // 更新统计信息
+ processStat(data, affect, false);
+ }
}
-
error = null;
exeResult = ExecuteResult.SUCCESS;
} catch (DeadlockLoserDataAccessException ex) {
@@ -656,10 +670,12 @@ public void setValues(PreparedStatement ps) throws SQLException {
// }
exeResult = ExecuteResult.ERROR;
} catch (RuntimeException ex) {
+ ex.printStackTrace();
error = new LoadException(ExceptionUtils.getFullStackTrace(ex),
DbLoadDumper.dumpEventDatas(splitDatas));
exeResult = ExecuteResult.ERROR;
} catch (Throwable ex) {
+ ex.printStackTrace();
error = new LoadException(ExceptionUtils.getFullStackTrace(ex),
DbLoadDumper.dumpEventDatas(splitDatas));
exeResult = ExecuteResult.ERROR;
@@ -769,6 +785,28 @@ private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobC
// 解决mysql的0000-00-00 00:00:00问题,直接依赖mysql
// driver进行处理,如果转化为Timestamp会出错
param = column.getColumnValue();
+ } else if (dbDialect instanceof ClickHouseDialect && (column.isNull() || column.getColumnValue() == null)) {
+ // 解决 clickhouse 数字类型出现空置的情况
+ if (SqlUtils.isNumeric(sqlType)) {
+ param = 0;
+ } else if (SqlUtils.isTextType(sqlType)) {
+ param="";
+ } else if (sqlType == Types.TIMESTAMP) {
+ param = SqlUtils.stringToSqlValue("0000-00-00 00:00:00",
+ sqlType,
+ isRequired,
+ dbDialect.isEmptyStringNulled());
+ } else if (sqlType == Types.DATE) {
+ param = SqlUtils.stringToSqlValue("0000-00-00",
+ sqlType,
+ isRequired,
+ dbDialect.isEmptyStringNulled());
+ } else {
+ param = SqlUtils.stringToSqlValue(column.getColumnValue(),
+ sqlType,
+ isRequired,
+ dbDialect.isEmptyStringNulled());
+ }
} else {
param = SqlUtils.stringToSqlValue(column.getColumnValue(),
sqlType,
@@ -789,7 +827,7 @@ private void doPreparedStatement(PreparedStatement ps, DbDialect dbDialect, LobC
case Types.TIMESTAMP:
case Types.DATE:
// 只处理mysql的时间类型,oracle的进行转化处理
- if (dbDialect instanceof MysqlDialect) {
+ if (dbDialect instanceof MysqlDialect ) {
// 解决mysql的0000-00-00 00:00:00问题,直接依赖mysql
// driver进行处理,如果转化为Timestamp会出错
ps.setObject(paramIndex, param);
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/interceptor/sql/SqlBuilderLoadInterceptor.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/interceptor/sql/SqlBuilderLoadInterceptor.java
index dc470e10..567589c0 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/interceptor/sql/SqlBuilderLoadInterceptor.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/load/loader/db/interceptor/sql/SqlBuilderLoadInterceptor.java
@@ -48,7 +48,7 @@ public boolean before(DbLoadContext context, EventData currentData) {
SqlTemplate sqlTemplate = dbDialect.getSqlTemplate();
EventType type = currentData.getEventType();
String sql = null;
-
+ //todo liyc 如果 有其他的数据库,则要判断生成的sql语句问题
String schemaName = (currentData.isWithoutSchema() ? null : currentData.getSchemaName());
// 注意insert/update语句对应的字段数序都是将主键排在后面
if (type.isInsert()) {
diff --git a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
index 23ec4404..6e940a90 100644
--- a/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
+++ b/node/etl/src/main/java/com/alibaba/otter/node/etl/select/selector/MessageParser.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
+import com.alibaba.otter.node.etl.common.db.dialect.clickhouse.ClickHouseDialect;
import org.apache.commons.lang.StringUtils;
import org.apache.ddlutils.model.Table;
import org.slf4j.Logger;
@@ -359,6 +360,9 @@ private List internParse(Pipeline pipeline, Entry entry) {
schemaName,
tableName,
notExistReturnNull);
+ if ( dataMedia == null){
+ logger.error("存在异常的数据: 管道数据"+pipeline.getPairs() + "对应同步的表:" + schemaName +"."+ tableName);
+ }
// 如果EventType是CREATE/ALTER,需要reload
// DataMediaInfo;并且把CREATE/ALTER类型的事件丢弃掉.
if (dataMedia != null && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) {
@@ -771,6 +775,10 @@ public boolean isOracle() {
return (dbDialect != null && dbDialect instanceof OracleDialect);
}
+ public boolean isClickHouse() {
+ return (dbDialect != null && dbDialect instanceof ClickHouseDialect);
+ }
+
public boolean isMysql() {
return (dbDialect != null && dbDialect instanceof MysqlDialect);
}
diff --git a/node/etl/src/test/java/com/alibaba/otter/node/etl/common/datasource/AbstractDbDialectTest.java b/node/etl/src/test/java/com/alibaba/otter/node/etl/common/datasource/AbstractDbDialectTest.java
index b04469fa..6dd52e53 100644
--- a/node/etl/src/test/java/com/alibaba/otter/node/etl/common/datasource/AbstractDbDialectTest.java
+++ b/node/etl/src/test/java/com/alibaba/otter/node/etl/common/datasource/AbstractDbDialectTest.java
@@ -69,6 +69,8 @@ private DataSource createDataSource(String url, String userName, String password
if (dataMediaType.isOracle()) {
dbcpDs.addConnectionProperty("restrictGetTables", "true");
dbcpDs.setValidationQuery("select 1 from dual");
+ } else if (dataMediaType.isClickHouse()) {
+ dbcpDs.setValidationQuery("select 1");
} else if (dataMediaType.isMysql()) {
// open the batch mode for mysql since 5.1.8
dbcpDs.addConnectionProperty("useServerPrepStmts", "true");
diff --git a/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/Ow_Processor.java b/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/Ow_Processor.java
new file mode 100644
index 00000000..ff65adab
--- /dev/null
+++ b/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/Ow_Processor.java
@@ -0,0 +1,58 @@
+package com.alibaba.otter.node.extend.processor;
+
+import com.alibaba.otter.shared.etl.model.EventColumn;
+import com.alibaba.otter.shared.etl.model.EventData;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+
+public class Ow_Processor extends AbstractEventProcessor {
+ public boolean process(EventData eventData) {
+ // 基本步骤:
+
+ // 构造新的主键,老的主键uid插入普通值中
+ EventColumn id = new EventColumn();
+ id.setColumnValue(eventData.getSchemaName());
+ id.setColumnType(Types.BIGINT);
+ id.setColumnName("uid");
+
+ EventColumn room = new EventColumn();
+ room.setColumnValue(eventData.getSchemaName());
+ room.setColumnType(Types.BIGINT);
+ room.setColumnName("toproomid");
+
+ for (EventColumn column : eventData.getKeys()) {
+ if (column.getColumnName().toLowerCase().equals("uid")){
+ id.setColumnValue(column.getColumnValue());
+ }
+ if (column.getColumnName().toLowerCase().equals("toproomid")){
+ room.setColumnValue(column.getColumnValue());
+ }
+ }
+ List keys = new ArrayList();
+ keys.add(room);
+ eventData.setKeys(keys);
+
+ eventData.getColumns().add(id);
+ boolean v_flag =false;
+ for (EventColumn column : eventData.getColumns()) {
+ if (column.getColumnName().toLowerCase().equals("rank")){
+ if (column.getColumnValue().equals("80")){
+ v_flag= true;
+ }
+ else{
+ return false;
+ }
+ }
+ if (column.getColumnName().toLowerCase().equals("status")){
+ if (column.getColumnValue().equals("0") && v_flag){
+ return true;
+ }
+ else{
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/TradOrder_Processor.java b/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/TradOrder_Processor.java
new file mode 100644
index 00000000..98b11587
--- /dev/null
+++ b/node/extend/src/main/java/com/alibaba/otter/node/extend/processor/TradOrder_Processor.java
@@ -0,0 +1,26 @@
+package com.alibaba.otter.node.extend.processor;
+
+import com.alibaba.otter.shared.etl.model.EventColumn;
+import com.alibaba.otter.shared.etl.model.EventData;
+
+public class TradOrder_Processor extends AbstractEventProcessor {
+ public boolean process(EventData eventData) {
+ Boolean ck_flag=false;
+ for (EventColumn column : eventData.getKeys()) {
+ if (column.getColumnName().toLowerCase().toLowerCase().equals("outer_pay_status")){
+ if (column.getColumnValue().equals("1")) {
+ ck_flag = true;
+ }
+ }
+ if (column.getColumnName().toLowerCase().toLowerCase().equals("pay_user_id")){
+ if ( !column.isNull() && column.getColumnValue().length()!= 0 ){
+ ck_flag=ck_flag&&true;
+ } else
+ {
+ ck_flag=false;
+ }
+ }
+ }
+ return ck_flag;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 210557d0..f12ecb95 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1,9 +1,9 @@
4.0.0
- org.sonatype.oss
- oss-parent
- 7
+ org.sonatype.oss
+ oss-parent
+ 7
com.alibaba.otter
otter
@@ -46,9 +46,9 @@
- git@github.com:alibaba/otter.git
- scm:git:git@github.com:alibaba/otter.git
- scm:git:git@github.com:alibaba/otter.git
+ git@github.com:alibaba/otter.git
+ scm:git:git@github.com:alibaba/otter.git
+ scm:git:git@github.com:alibaba/otter.git
@@ -109,14 +109,14 @@
- true
- true
-
- 1.6
- 1.6
+ true
+ true
+
+ 1.6
+ 1.6
UTF-8
UTF-8
- 3.1.2.RELEASE
+ 3.1.4.RELEASE
8.1.7.v20120910
1.0.24
@@ -130,17 +130,17 @@
-
+
commons-dbcp
commons-dbcp
1.4
-
+
commons-lang
commons-lang
2.6
-
+
commons-pool
commons-pool
1.5.4
@@ -170,43 +170,48 @@
fastjson
1.2.28
+
+
+
+
+
com.google.guava
guava
- r08
+ 18.0
com.google.protobuf
protobuf-java
2.6.1
-
- com.alibaba
- druid
- 1.0.25
-
+
+ com.alibaba
+ druid
+ 1.0.25
+
org.apache.zookeeper
zookeeper
3.4.5
-
+
org.slf4j
slf4j-log4j12
-
-
+
+
org.slf4j
slf4j-api
-
-
+
+
log4j
log4j
-
-
+
+
jline
jline
-
+
@@ -220,7 +225,14 @@
mysql-connector-java
5.1.35
-
+
+
+ ru.yandex.clickhouse
+ clickhouse-jdbc
+ 0.1.24
+
+
+
com.oracle
ojdbc14
@@ -228,79 +240,80 @@
system
${user.dir}/lib/ojdbc14-10.2.0.3.0.jar
+
- org.springframework
- spring-core
- ${spring-version}
-
-
- org.springframework
- spring-beans
- ${spring-version}
-
-
- org.springframework
- spring-aop
- ${spring-version}
-
-
- org.springframework
- spring-context
- ${spring-version}
-
-
- org.springframework
- spring-context-support
- ${spring-version}
-
-
- org.springframework
- spring-tx
- ${spring-version}
-
-
- org.springframework
- spring-jdbc
- ${spring-version}
-
-
- org.springframework
- spring-test
- ${spring-version}
-
-
+ org.springframework
+ spring-core
+ ${spring-version}
+
+
+ org.springframework
+ spring-beans
+ ${spring-version}
+
+
+ org.springframework
+ spring-aop
+ ${spring-version}
+
+
+ org.springframework
+ spring-context
+ ${spring-version}
+
+
+ org.springframework
+ spring-context-support
+ ${spring-version}
+
+
+ org.springframework
+ spring-tx
+ ${spring-version}
+
+
+ org.springframework
+ spring-jdbc
+ ${spring-version}
+
+
+ org.springframework
+ spring-test
+ ${spring-version}
+
+
cglib
cglib-nodep
2.2
-
- ch.qos.logback
- logback-core
- 1.1.3
-
-
- ch.qos.logback
- logback-classic
- 1.1.3
-
-
- org.slf4j
- jcl-over-slf4j
- 1.7.12
-
-
- org.slf4j
- slf4j-api
- 1.7.12
-
-
- javax.servlet
- javax.servlet-api
- 3.0.1
- provided
-
+
+ ch.qos.logback
+ logback-core
+ 1.1.3
+
+
+ ch.qos.logback
+ logback-classic
+ 1.1.3
+
+
+ org.slf4j
+ jcl-over-slf4j
+ 1.7.12
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.12
+
+
+ javax.servlet
+ javax.servlet-api
+ 3.0.1
+ provided
+
org.jtester
@@ -308,7 +321,7 @@
1.1.8
test
-
+
junit
junit
4.5
@@ -341,19 +354,19 @@
- org.apache.maven.plugins
- maven-eclipse-plugin
- 2.5.1
-
-
-
- .settings/org.eclipse.core.resources.prefs
-
- =${file_encoding}${line.separator}]]>
-
-
-
-
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.5.1
+
+
+
+ .settings/org.eclipse.core.resources.prefs
+
+ =${file_encoding}${line.separator}]]>
+
+
+
+
org.apache.maven.plugins
@@ -379,29 +392,29 @@
src/main/java
- src/test/java
-
-
- src/main/resources
-
- **/*
-
-
- **/.svn/
-
-
-
-
-
- src/test/resources
-
- **/*
-
-
- **/.svn/
-
-
-
+ src/test/java
+
+
+ src/main/resources
+
+ **/*
+
+
+ **/.svn/
+
+
+
+
+
+ src/test/resources
+
+ **/*
+
+
+ **/.svn/
+
+
+
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/ArbitrateFactory.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/ArbitrateFactory.java
index d817863f..39d017df 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/ArbitrateFactory.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/ArbitrateFactory.java
@@ -20,6 +20,7 @@
import java.util.Collection;
import java.util.Map;
+import com.google.common.cache.*;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
@@ -39,6 +40,7 @@ public class ArbitrateFactory implements ApplicationContextAware {
private static ApplicationContext context = null;
// 两层的Map接口,第一层为pipelineId,第二层为具体的资源类型class
+ /* delete by liyc
private static Map> cache = new MapMaker().makeComputingMap(new Function>() {
public Map apply(final Long pipelineId) {
@@ -50,7 +52,45 @@ public Object apply(Class instanceClass) {
});
}
});
-
+ */
+ private static LoadingCache> cache = CacheBuilder
+ .newBuilder()
+ .removalListener( new RemovalListener>() {
+ public void onRemoval(RemovalNotification> removal) {
+ if (removal.getValue() != null) {
+ Collection collection = removal.getValue().asMap().values();
+ for (Object obj : collection) {
+ if (obj instanceof ArbitrateLifeCycle) {
+ ArbitrateLifeCycle lifeCycle = (ArbitrateLifeCycle) obj;
+ lifeCycle.destory();// 调用销毁方法
+ }
+ }
+ }
+ }
+ })
+ .build(new CacheLoader>() {
+
+ public LoadingCache load(final Long pipelineId) {
+ return CacheBuilder
+ .newBuilder()
+ .removalListener( new RemovalListener() {
+ public void onRemoval(RemovalNotification removal) {
+ if (removal.getValue() != null) {
+ if (removal.getValue() instanceof ArbitrateLifeCycle) {
+ ArbitrateLifeCycle lifeCycle = (ArbitrateLifeCycle) removal.getValue();
+ lifeCycle.destory();// 调用销毁方法
+ }
+ }
+ }
+ })
+ .build(new CacheLoader() {
+
+ public Object load(Class instanceClass) {
+ return newInstance(pipelineId, instanceClass);
+ }
+ });
+ }
+ });
private static Object newInstance(Long pipelineId, Class instanceClass) {
Object obj = newInstance(instanceClass, pipelineId);// 通过反射调用构造函数进行初始化
autowire(obj);
@@ -87,7 +127,7 @@ public static T getInstance(Long pipelineId, Clas
// }
// return (T) obj;
- return (T) cache.get(pipelineId).get(instanceClass);
+ return (T) cache.getUnchecked(pipelineId).getUnchecked(instanceClass); //edit by liyc
}
public static void autowire(Object obj) {
@@ -98,7 +138,7 @@ public static void autowire(Object obj) {
}
public static void destory() {
- for (Long pipelineId : cache.keySet()) {
+ for (Long pipelineId : cache.asMap().keySet()) { //edit by liyc
destory(pipelineId);
}
}
@@ -109,6 +149,7 @@ public static void destory() {
* @param pipelineId
*/
public static void destory(Long pipelineId) {
+ /*
Map resources = cache.remove(pipelineId);
if (resources != null) {
Collection collection = resources.values();
@@ -119,6 +160,8 @@ public static void destory(Long pipelineId) {
}
}
}
+ */
+ cache.invalidate(pipelineId);
}
/**
@@ -127,14 +170,17 @@ public static void destory(Long pipelineId) {
* @param pipelineId
*/
public static void destory(Long pipelineId, Class instanceClass) {
+ /*
Map resources = cache.get(pipelineId);
+
if (resources != null) {
Object obj = resources.remove(instanceClass);
if (obj instanceof ArbitrateLifeCycle) {
ArbitrateLifeCycle lifeCycle = (ArbitrateLifeCycle) obj;
lifeCycle.destory();// 调用销毁方法
}
- }
+ }*/
+ cache.getUnchecked(pipelineId).invalidate(instanceClass);
}
// ==================== helper method =======================
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/memory/MemoryStageController.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/memory/MemoryStageController.java
index 13aa3605..a4a3c527 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/memory/MemoryStageController.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/memory/MemoryStageController.java
@@ -24,6 +24,9 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.springframework.util.CollectionUtils;
import com.alibaba.otter.shared.arbitrate.impl.config.ArbitrateConfigUtils;
@@ -44,14 +47,14 @@
public class MemoryStageController extends ArbitrateLifeCycle {
private AtomicLong atomicMaxProcessId = new AtomicLong(0);
- private Map replys;
+ private LoadingCache replys;
private Map progress;
private BlockingQueue termins;
private StageProgress nullProgress = new StageProgress();
public MemoryStageController(Long pipelineId){
super(pipelineId);
-
+ /* add by liyc
replys = new MapMaker().makeComputingMap(new Function() {
public ReplyProcessQueue apply(StageType input) {
@@ -62,17 +65,28 @@ public ReplyProcessQueue apply(StageType input) {
return new ReplyProcessQueue(size);
}
});
+ */
+ replys = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public ReplyProcessQueue load(StageType input) {
+ int size = ArbitrateConfigUtils.getParallelism(getPipelineId()) * 10;
+ if (size < 100) {
+ size = 100;
+ }
+ return new ReplyProcessQueue(size);
+ }
+ });
progress = new MapMaker().makeMap();
termins = new LinkedBlockingQueue(20);
}
public Long waitForProcess(StageType stage) throws InterruptedException {
- if (stage.isSelect() && !replys.containsKey(stage)) {
+ if (stage.isSelect() && !replys.asMap().containsKey(stage)) {
initSelect();
}
- Long processId = replys.get(stage).take();
+ Long processId = replys.getUnchecked(stage).take();
if (stage.isSelect()) {// select一旦分出processId,就需要在progress中记录一笔,用于判断谁是最小的一个processId
progress.put(processId, nullProgress);
}
@@ -85,7 +99,8 @@ public EtlEventData getLastData(Long processId) {
}
public synchronized void destory() {
- replys.clear();
+ //replys.clear(); edit by liyc
+ replys.invalidateAll();
progress.clear();
}
@@ -133,14 +148,14 @@ public synchronized boolean single(StageType stage, EtlEventData etlEventData) {
case SELECT:
if (progress.containsKey(etlEventData.getProcessId())) {// 可能发生了rollback,对应的progress已经被废弃
progress.put(etlEventData.getProcessId(), new StageProgress(stage, etlEventData));
- replys.get(StageType.EXTRACT).offer(etlEventData.getProcessId());
+ replys.getUnchecked(StageType.EXTRACT).offer(etlEventData.getProcessId()); //edit by liyc
result = true;
}
break;
case EXTRACT:
if (progress.containsKey(etlEventData.getProcessId())) {
progress.put(etlEventData.getProcessId(), new StageProgress(stage, etlEventData));
- replys.get(StageType.TRANSFORM).offer(etlEventData.getProcessId());
+ replys.getUnchecked(StageType.TRANSFORM).offer(etlEventData.getProcessId()); //edit by liyc
result = true;
}
break;
@@ -158,7 +173,7 @@ public synchronized boolean single(StageType stage, EtlEventData etlEventData) {
computeNextLoad();
// 一个process完成了,自动添加下一个process
if (removed != null) {
- replys.get(StageType.SELECT).offer(atomicMaxProcessId.incrementAndGet());
+ replys.getUnchecked(StageType.SELECT).offer(atomicMaxProcessId.incrementAndGet()); //edit by liyc
result = true;
}
break;
@@ -193,7 +208,7 @@ private synchronized void initSelect() {
// 第一次/出现ROLLBACK/RESTART事件,删除了所有调度信号后,重新初始化一下select
// stage的数据,初始大小为并行度大小
// 后续的select的reply队列变化,由load single时直接添加
- ReplyProcessQueue queue = replys.get(StageType.SELECT);
+ ReplyProcessQueue queue = replys.getUnchecked(StageType.SELECT); //edit by liyc
int parallelism = ArbitrateConfigUtils.getParallelism(getPipelineId());
while (parallelism-- > 0 && queue.size() <= parallelism) {
queue.offer(atomicMaxProcessId.incrementAndGet());
@@ -206,7 +221,7 @@ private synchronized void initSelect() {
private void computeNextLoad() {
Long processId = getMinTransformedProcessId();
if (processId != null) {
- replys.get(StageType.LOAD).offer(processId);
+ replys.getUnchecked(StageType.LOAD).offer(processId); //edit by liyc
}
}
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController.java
index f7d51ba9..18990ff7 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/setl/rpc/RpcStageController.java
@@ -19,6 +19,9 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.commons.lang.ClassUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,14 +49,14 @@
public class RpcStageController extends ArbitrateLifeCycle implements ProcessListener {
private static final Logger logger = LoggerFactory.getLogger(RpcStageController.class);
- private Map replys;
+ private LoadingCache replys;
private Map progress;
private ProcessMonitor processMonitor;
private volatile Long lastestLoadedProcessId = -1L; // 最近一次同步成功的processId
public RpcStageController(Long pipelineId){
super(pipelineId);
-
+ /* delete by liyc
replys = new MapMaker().makeComputingMap(new Function() {
public ReplyProcessQueue apply(StageType input) {
@@ -64,7 +67,17 @@ public ReplyProcessQueue apply(StageType input) {
return new ReplyProcessQueue(size);
}
});
+ */
+ replys = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public ReplyProcessQueue load(StageType input) {
+ int size = ArbitrateConfigUtils.getParallelism(getPipelineId()) * 10;
+ if (size < 100) {
+ size = 100;
+ }
+ return new ReplyProcessQueue(size);
+ }
+ });
progress = new MapMaker().makeMap();
// 注册一下监听事件变化
processMonitor = ArbitrateFactory.getInstance(pipelineId, ProcessMonitor.class);
@@ -77,7 +90,7 @@ public Long waitForProcess(StageType stage) throws InterruptedException {
throw new ArbitrateException("not support");
}
- return replys.get(stage).take();
+ return replys.getUnchecked(stage).take();//edit by liyc
}
/**
@@ -89,7 +102,8 @@ public EtlEventData getLastData(Long processId) {
public void destory() {
processMonitor.removeListener(this);
- replys.clear();
+ //replys.clear(); edit by liyc
+ replys.invalidateAll();
progress.clear();
}
@@ -98,11 +112,11 @@ public synchronized boolean single(StageType stage, EtlEventData etlEventData) {
switch (stage) {
case SELECT:
progress.put(etlEventData.getProcessId(), new StageProgress(StageType.SELECT, etlEventData));
- replys.get(StageType.EXTRACT).offer(etlEventData.getProcessId());
+ replys.getUnchecked(StageType.EXTRACT).offer(etlEventData.getProcessId()); //edit by liyc
break;
case EXTRACT:
progress.put(etlEventData.getProcessId(), new StageProgress(StageType.EXTRACT, etlEventData));
- replys.get(StageType.TRANSFORM).offer(etlEventData.getProcessId());
+ replys.getUnchecked(StageType.TRANSFORM).offer(etlEventData.getProcessId()); //edit by liyc
break;
case TRANSFORM:
progress.put(etlEventData.getProcessId(), new StageProgress(StageType.TRANSFORM, etlEventData));
@@ -135,7 +149,7 @@ private void computeNextLoad() {
// 针对上一个id为本地load成功的,直接忽略,触发下一个id
Long processId = getMinTransformedProcessId(lastestLoadedProcessId);
if (processId != null) {
- replys.get(StageType.LOAD).offer(processId);
+ replys.getUnchecked(StageType.LOAD).offer(processId); //edit by liyc
}
}
@@ -188,7 +202,7 @@ private Long getMinTransformedProcessId(Long loadedProcessId) {
public void processChanged(List processIds) {
compareProgress(processIds);
- for (ReplyProcessQueue replyProcessIds : replys.values()) {
+ for (ReplyProcessQueue replyProcessIds : replys.asMap().values()) { //edit by liyc
compareReply(processIds, replyProcessIds);
}
diff --git a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/zookeeper/ZooKeeperClient.java b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/zookeeper/ZooKeeperClient.java
index 12edf080..f9b1b319 100644
--- a/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/zookeeper/ZooKeeperClient.java
+++ b/shared/arbitrate/src/main/java/com/alibaba/otter/shared/arbitrate/impl/zookeeper/ZooKeeperClient.java
@@ -20,6 +20,9 @@
import java.util.List;
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.commons.lang.StringUtils;
import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -39,12 +42,22 @@ public class ZooKeeperClient {
private static String cluster;
private static int sessionTimeout = 10 * 1000;
+ /*
private static Map clients = new MapMaker().makeComputingMap(new Function() {
public ZkClientx apply(Long pipelineId) {
return createClient();
}
});
+ */
+
+ private static LoadingCache clients = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public ZkClientx load(Long pipelineId) {
+ return createClient();
+ }
+ });
+
private static Long defaultId = 0L;
/**
@@ -58,11 +71,11 @@ public static ZkClientx getInstance() {
* 根据pipelineId获取对应的zookeeper客户端,每个pipelineId可以独立一个zookeeper链接,保证性能
*/
public static ZkClientx getInstance(Long pipelineId) {
- return clients.get(pipelineId);
- }
+ return clients.getUnchecked(pipelineId);
+ } //edit by liyc
public static void destory() {
- for (ZkClientx zkClient : clients.values()) {
+ for (ZkClientx zkClient : clients.asMap().values()) { //edit by liyc
zkClient.close();
}
}
diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/ConfigHelper.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/ConfigHelper.java
index 13486228..0c785161 100644
--- a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/ConfigHelper.java
+++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/ConfigHelper.java
@@ -16,11 +16,13 @@
package com.alibaba.otter.shared.common.model.config;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import com.google.common.cache.CacheBuilder;
+
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.commons.lang.StringUtils;
import org.apache.oro.text.regex.MalformedPatternException;
import org.apache.oro.text.regex.MatchResult;
@@ -49,6 +51,21 @@
public class ConfigHelper {
public static final String MODE_PATTERN = "(.*)(\\[(\\d+)\\-(\\d+)\\])(.*)"; // 匹配offer[1-128]
+ // add by liyc google guava 19不支持这种写法,已经废弃
+ public static LoadingCache patterns = CacheBuilder.newBuilder().softValues()
+ .build(new CacheLoader() {
+ public Pattern load(String input) {
+ PatternCompiler pc = new Perl5Compiler();
+ try {
+ return pc.compile(input,
+ Perl5Compiler.CASE_INSENSITIVE_MASK
+ | Perl5Compiler.READ_ONLY_MASK);
+ } catch (MalformedPatternException e) {
+ throw new ConfigException(e);
+ }
+ }
+ });
+ /*
private static Map patterns = new MapMaker().makeComputingMap(new Function() {
public Pattern apply(String input) {
@@ -62,7 +79,7 @@ public Pattern apply(String input) {
}
}
});
-
+ */
/**
* 根据DataMedia id得到对应的DataMedia
*/
@@ -166,7 +183,7 @@ public static DataMediaPair findDataMediaPair(Pipeline pipeline, Long pairId) {
*/
public static ModeValue parseMode(String value) {
PatternMatcher matcher = new Perl5Matcher();
- if (matcher.matches(value, patterns.get(MODE_PATTERN))) {
+ if (matcher.matches(value, patterns.getUnchecked(MODE_PATTERN))) {
MatchResult matchResult = matcher.getMatch();
String prefix = matchResult.group(1);
String startStr = matchResult.group(3);
@@ -193,6 +210,7 @@ public static ModeValue parseMode(String value) {
} else {
return new ModeValue(Mode.SINGLE, Arrays.asList(value));
}
+
}
public static String makeSQLPattern(String rawValue) {
@@ -296,7 +314,7 @@ private static boolean isWildCard(String value) {
private static boolean isWildCardMatch(String matchPattern, String value) {
PatternMatcher matcher = new Perl5Matcher();
- return matcher.matches(value, patterns.get(matchPattern));
+ return matcher.matches(value, patterns.getUnchecked(matchPattern));
}
public static int indexIgnoreCase(List datas, String value) {
diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/data/DataMediaType.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/data/DataMediaType.java
index ad778546..1942f976 100644
--- a/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/data/DataMediaType.java
+++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/model/config/data/DataMediaType.java
@@ -35,12 +35,18 @@ public enum DataMediaType {
/** napoli */
NAPOLI,
/** diamond push for us */
- DIAMOND_PUSH;
+ DIAMOND_PUSH,
+ /** CLICKHOUSE */
+ CLICKHOUSE;
public boolean isMysql() {
return this.equals(DataMediaType.MYSQL);
}
+ public boolean isClickHouse() {
+ return this.equals(DataMediaType.CLICKHOUSE);
+ }
+
public boolean isOracle() {
return this.equals(DataMediaType.ORACLE);
}
diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/RegexUtils.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/RegexUtils.java
index 84c5377c..ebd48afb 100644
--- a/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/RegexUtils.java
+++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/RegexUtils.java
@@ -18,6 +18,9 @@
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.commons.lang.StringUtils;
import org.apache.oro.text.regex.MalformedPatternException;
import org.apache.oro.text.regex.Pattern;
@@ -35,6 +38,7 @@
*/
public class RegexUtils {
+ /* delete by lyc
private static Map patterns = null;
static {
@@ -50,14 +54,29 @@ public Pattern apply(String pattern) {
}
});
}
+ */
+ private static LoadingCache patterns = null;
+ static {
+ patterns = CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public Pattern load(String pattern) {
+ try {
+ PatternCompiler pc = new Perl5Compiler();
+ return pc.compile(pattern, Perl5Compiler.CASE_INSENSITIVE_MASK | Perl5Compiler.READ_ONLY_MASK);
+ } catch (MalformedPatternException e) {
+ throw new RuntimeException("Regex failed!", e);
+ }
+ }
+ });
+ }
public static String findFirst(String originalStr, String regex) {
if (StringUtils.isBlank(originalStr) || StringUtils.isBlank(regex)) {
return StringUtils.EMPTY;
}
PatternMatcher matcher = new Perl5Matcher();
- if (matcher.contains(originalStr, patterns.get(regex))) {
+ if (matcher.contains(originalStr, patterns.getUnchecked(regex))) {
return StringUtils.trimToEmpty(matcher.getMatch().group(0));
}
return StringUtils.EMPTY;
diff --git a/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/zookeeper/ZkClientx.java b/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/zookeeper/ZkClientx.java
index 158c9bd6..1eeb7b8a 100644
--- a/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/zookeeper/ZkClientx.java
+++ b/shared/common/src/main/java/com/alibaba/otter/shared/common/utils/zookeeper/ZkClientx.java
@@ -27,6 +27,9 @@
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.ExceptionUtil;
import org.I0Itec.zkclient.IZkChildListener;
@@ -67,15 +70,22 @@
public class ZkClientx implements Watcher {
// 对于zkclient进行一次缓存,避免一个jvm内部使用多个zk connection
+ /* delete by liyc
private static Map clients = new MapMaker().makeComputingMap(new Function() {
public ZkClientx apply(String servers) {
return new ZkClientx(servers);
}
});
+ */
+ private static LoadingCache clients = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public ZkClientx load(String servers) {
+ return new ZkClientx(servers);
+ }
+ });
public static ZkClientx getZkClient(String servers) {
- return clients.get(servers);
+ return clients.getUnchecked(servers);
}
public ZkClientx(String serverstring){
diff --git a/shared/communication/src/main/java/com/alibaba/otter/shared/communication/core/impl/dubbo/DubboCommunicationConnectionFactory.java b/shared/communication/src/main/java/com/alibaba/otter/shared/communication/core/impl/dubbo/DubboCommunicationConnectionFactory.java
index 594af539..e82f4b8b 100644
--- a/shared/communication/src/main/java/com/alibaba/otter/shared/communication/core/impl/dubbo/DubboCommunicationConnectionFactory.java
+++ b/shared/communication/src/main/java/com/alibaba/otter/shared/communication/core/impl/dubbo/DubboCommunicationConnectionFactory.java
@@ -28,6 +28,9 @@
import com.alibaba.otter.shared.communication.core.impl.connection.CommunicationConnectionFactory;
import com.alibaba.otter.shared.communication.core.model.CommunicationParam;
import com.google.common.base.Function;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.MapMaker;
/**
@@ -43,6 +46,7 @@ public class DubboCommunicationConnectionFactory implements CommunicationConnect
private DubboProtocol protocol = DubboProtocol.getDubboProtocol();
private ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");
+ /*
private Map connections = null;
public DubboCommunicationConnectionFactory(){
@@ -53,7 +57,16 @@ public CommunicationEndpoint apply(String serviceUrl) {
}
});
}
+ */
+ private LoadingCache connections = null;
+ public DubboCommunicationConnectionFactory(){
+ connections = CacheBuilder.newBuilder().build(new CacheLoader() {
+ public CommunicationEndpoint load(String serviceUrl) {
+ return proxyFactory.getProxy(protocol.refer(CommunicationEndpoint.class, URL.valueOf(serviceUrl)));
+ }
+ });
+ }
public CommunicationConnection createConnection(CommunicationParam params) {
if (params == null) {
throw new IllegalArgumentException("param is null!");
@@ -61,7 +74,7 @@ public CommunicationConnection createConnection(CommunicationParam params) {
// 构造对应的url
String serviceUrl = MessageFormat.format(DUBBO_SERVICE_URL, params.getIp(), String.valueOf(params.getPort()));
- CommunicationEndpoint endpoint = connections.get(serviceUrl);
+ CommunicationEndpoint endpoint = connections.getUnchecked(serviceUrl); //edit by liyc
return new DubboCommunicationConnection(params, endpoint);
}
diff --git a/shared/push/src/main/java/com/alibaba/otter/common/push/SubscribeManagerFactory.java b/shared/push/src/main/java/com/alibaba/otter/common/push/SubscribeManagerFactory.java
index b7645032..7974116c 100644
--- a/shared/push/src/main/java/com/alibaba/otter/common/push/SubscribeManagerFactory.java
+++ b/shared/push/src/main/java/com/alibaba/otter/common/push/SubscribeManagerFactory.java
@@ -18,6 +18,9 @@
import java.util.Map;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext;
@@ -35,6 +38,7 @@ public class SubscribeManagerFactory implements ApplicationContextAware {
private static ApplicationContext context = null;
+ /* delete by liyc
private static final Map innerContainer = new MapMaker().makeComputingMap(new Function() {
@Override
@@ -42,6 +46,14 @@ public SubscribeManager apply(SubscribeType input) {
return createSubsrcibeManager(input);
}
});
+ */
+ private static final LoadingCache innerContainer =
+ CacheBuilder.newBuilder().build(new CacheLoader() {
+ @Override
+ public SubscribeManager load(SubscribeType input) {
+ return createSubsrcibeManager(input);
+ }
+ });
private static SubscribeManager createSubsrcibeManager(SubscribeType type) {
SubscribeManager manager = null;
@@ -79,7 +91,7 @@ public static SubscribeManager getSubscribeManager(SubscribeType type) {
if (type == null) {
return null;
}
- SubscribeManager manager = (SubscribeManager) innerContainer.get(type);
+ SubscribeManager manager = (SubscribeManager) innerContainer.getUnchecked(type); //edit by liyc
return manager;
}
diff --git a/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSource.java b/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSource.java
index cda3168f..95729d18 100644
--- a/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSource.java
+++ b/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSource.java
@@ -164,6 +164,8 @@ protected DataSource doCreateDataSource(String url) {
dbcpDs.addConnectionProperty("characterEncoding", encoding);
}
dbcpDs.setValidationQuery("select 1");
+ } else if (dataMediaType.isClickHouse()){
+ dbcpDs.setValidationQuery("select 1");
} else {
logger.error("ERROR ## Unknow database type");
}
diff --git a/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSourceHandler.java b/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSourceHandler.java
index bbba1e5d..88944ff4 100644
--- a/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSourceHandler.java
+++ b/shared/push/src/main/java/com/alibaba/otter/common/push/datasource/media/MediaPushDataSourceHandler.java
@@ -23,6 +23,7 @@
import javax.sql.DataSource;
+import com.google.common.cache.*;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +32,8 @@
import com.alibaba.otter.shared.common.model.config.data.DataMediaType;
import com.alibaba.otter.shared.common.model.config.data.db.DbMediaSource;
import com.google.common.base.Function;
-import com.google.common.collect.GenericMapMaker;
-import com.google.common.collect.MapEvictionListener;
+//import com.google.common.collect.GenericMapMaker;
+//import com.google.common.collect.MapEvictionListener;
import com.google.common.collect.MapMaker;
/**
@@ -53,10 +54,10 @@ public class MediaPushDataSourceHandler implements DataSourceHanlder {
* key = pipelineId
* value = key(dataMediaSourceId)-value(DataSource)
*/
- private Map> dataSources;
+ private LoadingCache> dataSources;
public MediaPushDataSourceHandler(){
- // 设置soft策略
+ /*设置soft策略 delete by liyc
GenericMapMaker mapMaker = new MapMaker().softValues();
mapMaker = ((MapMaker) mapMaker).evictionListener(new MapEvictionListener>() {
@@ -92,6 +93,43 @@ public DataSource apply(DbMediaSource dbMediaSource) {
});
}
});
+ */
+ RemovalListener> removalListener =
+ new RemovalListener>() {
+ public void onRemoval(RemovalNotification> removal) {
+ if (removal.getValue() == null) {
+ return;
+ }
+
+ for (DataSource dataSource : removal.getValue().asMap().values()) {
+ try {
+ MediaPushDataSource mediaPushDataSource = (MediaPushDataSource) dataSource;
+ mediaPushDataSource.destory();
+ } catch (SQLException e) {
+ log.error("ERROR ## close the datasource has an error", e);
+ }
+ }
+ }
+ };
+
+ // 构建第一层map
+ dataSources = CacheBuilder.newBuilder()
+ .removalListener(removalListener).
+ build(new CacheLoader>() {
+ public LoadingCache load(Long pipelineId) {
+ // 构建第二层map
+ return CacheBuilder.newBuilder().build(new CacheLoader() {
+
+ public DataSource load(DbMediaSource dbMediaSource) {
+ return createDataSource(dbMediaSource.getUrl(), dbMediaSource.getUsername(),
+ dbMediaSource.getPassword(), dbMediaSource.getDriver(),
+ dbMediaSource.getType(), dbMediaSource.getEncode());
+ }
+
+ });
+ }
+ });
+
}
public boolean support(DbMediaSource dbMediaSource) {
@@ -106,7 +144,7 @@ public boolean support(DataSource dataSource) {
}
public DataSource create(Long pipelineId, DbMediaSource dbMediaSource) {
- return dataSources.get(pipelineId).get(dbMediaSource);
+ return dataSources.getUnchecked(pipelineId).getUnchecked(dbMediaSource); //edit by liyc
}
protected DataSource createDataSource(String url, String userName, String password, String driverClassName,
@@ -131,7 +169,9 @@ protected DataSource createDataSource(String url, String userName, String passwo
@Override
public boolean destory(Long pipelineId) {
+ /* delete by liyc
Map sources = dataSources.remove(pipelineId);
+
if (sources != null) {
for (DataSource dataSource : sources.values()) {
try {
@@ -144,7 +184,8 @@ public boolean destory(Long pipelineId) {
sources.clear();
}
-
+ */
+ dataSources.invalidate(pipelineId);
return true;
}