Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

升级相关版本依赖 #116

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions autoload-cache-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@
<groupId>com.googlecode.combinatoricslib</groupId>
<artifactId>combinatoricslib</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,46 +1,54 @@
package com.jarvis.cache.lock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.jedis.JedisSharding;
import redis.clients.jedis.params.SetParams;

/**
*
*/
public class ShardedJedisLock extends AbstractRedisLock {

private ShardedJedisPool shardedJedisPool;
private JedisSharding jedisSharding;

public ShardedJedisLock(ShardedJedisPool shardedJedisPool) {
this.shardedJedisPool = shardedJedisPool;
public ShardedJedisLock(JedisSharding jedisSharding) {
this.jedisSharding = jedisSharding;
}

private void returnResource(ShardedJedis shardedJedis) {
shardedJedis.close();
private void returnResource(JedisSharding jedisSharding) {
jedisSharding.close();
}

@Override
protected boolean setnx(String key, String val, int expire) {
ShardedJedis shardedJedis = null;
/*ShardedJedis shardedJedis = null;
try {
shardedJedis = shardedJedisPool.getResource();
Jedis jedis = shardedJedis.getShard(key);
return OK.equalsIgnoreCase(jedis.set(key, val, SetParams.setParams().nx().ex(expire)));
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以删除这些代码

} finally {
returnResource(shardedJedis);
}*/
try {
return OK.equalsIgnoreCase(jedisSharding.set(key, val, SetParams.setParams().nx().ex(expire)));
} finally {
returnResource(jedisSharding);
}
}

@Override
protected void del(String key) {
ShardedJedis shardedJedis = null;
/*ShardedJedis shardedJedis = null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以删除这些代码

try {
shardedJedis = shardedJedisPool.getResource();
Jedis jedis = shardedJedis.getShard(key);
jedis.del(key);
} finally {
returnResource(shardedJedis);
}*/
try {
jedisSharding.del(key);
} finally {
returnResource(jedisSharding);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.jarvis.cache.lock;

import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.params.SetParams;

/**
*
*/
public class JedisClusterLock extends AbstractRedisLock {

private JedisCluster jedisCluster;

public JedisClusterLock(JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}

@Override
protected boolean setnx(String key, String val, int expire) {
return OK.equalsIgnoreCase(jedisCluster.set(key, val, SetParams.setParams().nx().ex(expire)));
}

@Override
protected void del(String key) {
this.jedisCluster.del(key);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.jarvis.cache.lock;

import redis.clients.jedis.JedisSharding;
import redis.clients.jedis.params.SetParams;

/**
*
*/
public class ShardedJedisLock extends AbstractRedisLock {

private JedisSharding jedisSharding;

public ShardedJedisLock(JedisSharding jedisSharding) {
this.jedisSharding = jedisSharding;
}

private void returnResource(JedisSharding jedisSharding) {
jedisSharding.close();
}

@Override
protected boolean setnx(String key, String val, int expire) {
/*ShardedJedis shardedJedis = null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

删除这些代码。

try {
shardedJedis = shardedJedisPool.getResource();
Jedis jedis = shardedJedis.getShard(key);
return OK.equalsIgnoreCase(jedis.set(key, val, SetParams.setParams().nx().ex(expire)));
} finally {
returnResource(shardedJedis);
}*/
try {
return OK.equalsIgnoreCase(jedisSharding.set(key, val, SetParams.setParams().nx().ex(expire)));
} finally {
returnResource(jedisSharding);
}
}

@Override
protected void del(String key) {
/*ShardedJedis shardedJedis = null;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以删除这些代码

try {
shardedJedis = shardedJedisPool.getResource();
Jedis jedis = shardedJedis.getShard(key);
jedis.del(key);
} finally {
returnResource(shardedJedis);
}*/
try {
jedisSharding.del(key);
} finally {
returnResource(jedisSharding);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Client;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.Response;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisRedirectionException;
import redis.clients.jedis.util.JedisClusterCRC16;
import redis.clients.jedis.util.SafeEncoder;
Expand All @@ -30,20 +25,21 @@
*/
@Getter
@Slf4j
public class JedisClusterPipeline extends PipelineBase implements Closeable {
public class JedisClusterPipeline extends Pipeline implements Closeable {

private final JedisClusterInfoCache clusterInfoCache;

/**
* 根据顺序存储每个命令对应的Client
*/
private final Queue<Client> clients;
private final Queue<Connection> clients;
/**
* 用于缓存连接
*/
private final Map<JedisPool, Jedis> jedisMap;
private final Map<ConnectionPool, Connection> jedisMap;

public JedisClusterPipeline(JedisClusterInfoCache clusterInfoCache) {
super((Connection) null);
this.clusterInfoCache = clusterInfoCache;
this.clients = new LinkedList<>();
this.jedisMap = new HashMap<>(3);
Expand All @@ -52,7 +48,8 @@ public JedisClusterPipeline(JedisClusterInfoCache clusterInfoCache) {
/**
* 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
*/
protected void sync() {
@Override
public void sync() {
innerSync(null);
}

Expand All @@ -61,7 +58,8 @@ protected void sync() {
*
* @return 按照命令的顺序返回所有的数据
*/
protected List<Object> syncAndReturnAll() {
@Override
public List<Object> syncAndReturnAll() {
List<Object> responseList = new ArrayList<>(clients.size());
innerSync(responseList);
return responseList;
Expand All @@ -71,8 +69,8 @@ private void innerSync(List<Object> formatted) {
try {
Response<?> response;
Object data;
for (Client client : clients) {
response = generateResponse(client.getOne());
for (Connection connection : clients) {
response = generateResponse(connection.getOne());
if (null != formatted) {
data = response.get();
formatted.add(data);
Expand All @@ -89,44 +87,40 @@ private void innerSync(List<Object> formatted) {
public void close() {
clean();
clients.clear();
for (Jedis jedis : jedisMap.values()) {
flushCachedData(jedis);
jedis.close();
for (Connection connection : jedisMap.values()) {
flushCachedData(connection);
connection.close();
}
jedisMap.clear();
}

private void flushCachedData(Jedis jedis) {
private void flushCachedData(Connection connection) {
try {
//FIXME 这个count怎么取值? 执行命令的个数??
jedis.getClient().getMany(jedisMap.size());
connection.getMany(jedisMap.size());
//jedis.getClient().getAll();
} catch (RuntimeException ex) {
// 其中一个client出问题,后面出问题的几率较大
}
}

@Override
protected Client getClient(String key) {
protected Connection getClient(String key) {
byte[] bKey = SafeEncoder.encode(key);
return getClient(bKey);
}

@Override
protected Client getClient(byte[] key) {
Client client = getClient(JedisClusterCRC16.getSlot(key));
clients.add(client);
return client;
protected Connection getClient(byte[] key) {
Connection connection = getClient(JedisClusterCRC16.getSlot(key));
clients.add(connection);
return connection;
}

private Client getClient(int slot) {
JedisPool pool = clusterInfoCache.getSlotPool(slot);
private Connection getClient(int slot) {
ConnectionPool pool = clusterInfoCache.getSlotPool(slot);
// 根据pool从缓存中获取Jedis
Jedis jedis = jedisMap.get(pool);
if (null == jedis) {
jedis = pool.getResource();
jedisMap.put(pool, jedis);
Connection connection = jedisMap.get(pool);
if (null == connection) {
jedisMap.put(pool, pool.getResource());
}
return jedis.getClient();
return connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import com.jarvis.cache.to.CacheKeyTO;
import com.jarvis.cache.to.CacheWrapper;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.PipelineBase;
import redis.clients.jedis.Pipeline;

import java.util.Collection;
import java.util.Set;

@Slf4j
public class JedisUtil {

public static void executeMSet(PipelineBase pipeline, AbstractRedisCacheManager manager, Collection<MSetParam> params) throws Exception {
public static void executeMSet(Pipeline pipeline, AbstractRedisCacheManager manager, Collection<MSetParam> params) throws Exception {
CacheKeyTO cacheKeyTO;
String cacheKey;
String hfield;
Expand Down Expand Up @@ -49,7 +49,7 @@ public static void executeMSet(PipelineBase pipeline, AbstractRedisCacheManager
}
}

public static void executeMGet(PipelineBase pipeline, Set<CacheKeyTO> keys) {
public static void executeMGet(Pipeline pipeline, Set<CacheKeyTO> keys) {
String hfield;
String cacheKey;
byte[] key;
Expand All @@ -68,7 +68,7 @@ public static void executeMGet(PipelineBase pipeline, Set<CacheKeyTO> keys) {
}
}

public static void executeDelete(PipelineBase pipeline, Set<CacheKeyTO> keys) {
public static void executeDelete(Pipeline pipeline, Set<CacheKeyTO> keys) {
String hfield;
String cacheKey;
byte[] key;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.jarvis.cache.redis;

import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.BinaryJedisCluster;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisClusterConnectionHandler;
import redis.clients.jedis.JedisClusterInfoCache;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisMovedDataException;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.providers.ConnectionProvider;

import java.lang.reflect.Field;
import java.util.List;
Expand All @@ -22,19 +21,19 @@ public abstract class RetryableJedisClusterPipeline {
private static final Field FIELD_CACHE;

static {
FIELD_CONNECTION_HANDLER = getField(BinaryJedisCluster.class, "connectionHandler");
FIELD_CACHE = getField(JedisClusterConnectionHandler.class, "cache");
FIELD_CONNECTION_HANDLER = getField(JedisCluster.class, "provider");
FIELD_CACHE = getField(ConnectionProvider.class, "cache");
}

private final JedisSlotBasedConnectionHandler connectionHandler;
private final ClusterConnectionProvider connectionProvider;

private final JedisClusterInfoCache clusterInfoCache;

private int maxAttempts = 1;

public RetryableJedisClusterPipeline(JedisCluster jedisCluster) {
connectionHandler = getValue(jedisCluster, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionHandler, FIELD_CACHE);
connectionProvider = getValue(jedisCluster, FIELD_CONNECTION_HANDLER);
clusterInfoCache = getValue(connectionProvider, FIELD_CACHE);
}

public abstract void execute(JedisClusterPipeline pipeline) throws Exception;
Expand All @@ -51,7 +50,7 @@ public void sync() throws Exception {
} catch (JedisMovedDataException jre) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
connectionHandler.renewSlotCache();
connectionProvider.renewSlotCache();
if (maxAttempts > 0) {
maxAttempts--;
sync();
Expand All @@ -77,7 +76,7 @@ public List<Object> syncAndReturnAll() throws Exception {
} catch (JedisMovedDataException jre) {
// if MOVED redirection occurred, rebuilds cluster's slot cache,
// recommended by Redis cluster specification
connectionHandler.renewSlotCache();
connectionProvider.renewSlotCache();
if (maxAttempts > 0) {
maxAttempts--;
return syncAndReturnAll();
Expand Down
Loading