Skip to content

Commit

Permalink
IGNITE-23037 Remove GridDhtUnlockRequest#nearKeys
Browse files Browse the repository at this point in the history
  • Loading branch information
shishkovilja committed Sep 10, 2024
1 parent 2280dea commit 5b56c65
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 273 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpdateCountersMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
Expand Down Expand Up @@ -232,7 +231,6 @@ public class GridIoMessageFactory implements MessageFactoryProvider {
factory.register((short)33, GridDhtTxFinishResponse::new);
factory.register((short)34, GridDhtTxPrepareRequest::new);
factory.register((short)35, GridDhtTxPrepareResponse::new);
factory.register((short)36, GridDhtUnlockRequest::new);
factory.register((short)37, GridDhtAtomicDeferredUpdateResponse::new);
factory.register((short)38, GridDhtAtomicUpdateRequest::new);
factory.register((short)39, GridDhtAtomicUpdateResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ protected GridDhtTransactionalCacheAdapter(GridCacheContext<K, V> ctx, GridCache
ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridNearUnlockRequest.class,
(CI2<UUID, GridNearUnlockRequest>)this::processNearUnlockRequest);

ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtUnlockRequest.class,
(CI2<UUID, GridDhtUnlockRequest>)this::processDhtUnlockRequest);
ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDistributedUnlockRequest.class,
(CI2<UUID, GridDistributedUnlockRequest>)this::processDistributedUnlockRequest);

ctx.io().addCacheHandler(ctx.cacheId(), ctx.startTopologyVersion(), GridDhtForceKeysRequest.class,
new MessageHandler<GridDhtForceKeysRequest>() {
Expand Down Expand Up @@ -578,11 +578,8 @@ private void processDhtLockRequest0(UUID nodeId, GridDhtLockRequest req) {
* @param nodeId Node ID.
* @param req Request.
*/
private void processDhtUnlockRequest(UUID nodeId, GridDhtUnlockRequest req) {
private void processDistributedUnlockRequest(UUID nodeId, GridDistributedUnlockRequest req) {
clearLocks(nodeId, req);

if (isNearEnabled(cacheCfg))
near().clearLocks(nodeId, req);
}

/**
Expand Down Expand Up @@ -1672,7 +1669,7 @@ else if (log.isDebugEnabled())

List<KeyCacheObject> keyBytes = entry.getValue();

GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size(),
GridDistributedUnlockRequest req = new GridDistributedUnlockRequest(ctx.cacheId(), keyBytes.size(),
ctx.deploymentEnabled());

req.version(dhtVer);
Expand All @@ -1681,12 +1678,6 @@ else if (log.isDebugEnabled())
for (KeyCacheObject key : keyBytes)
req.addKey(key);

keyBytes = nearMap.get(n);

if (keyBytes != null)
for (KeyCacheObject key : keyBytes)
req.addNearKey(key);

req.completedVersions(committed, rolledback);

ctx.io().send(n, req, ctx.ioPolicy());
Expand All @@ -1707,15 +1698,12 @@ else if (log.isDebugEnabled())
if (!dhtMap.containsKey(n)) {
List<KeyCacheObject> keyBytes = entry.getValue();

GridDhtUnlockRequest req = new GridDhtUnlockRequest(ctx.cacheId(), keyBytes.size(),
GridDistributedUnlockRequest req = new GridDistributedUnlockRequest(ctx.cacheId(), keyBytes.size(),
ctx.deploymentEnabled());

req.version(dhtVer);

try {
for (KeyCacheObject key : keyBytes)
req.addNearKey(key);

req.completedVersions(committed, rolledback);

ctx.io().send(n, req, ctx.ioPolicy());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
Expand All @@ -39,7 +38,6 @@
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
Expand Down Expand Up @@ -204,67 +202,6 @@ IgniteInternalFuture<Map<K, V>> txLoadAsync(GridNearTxLocal tx,
return fut;
}

/**
* @param nodeId Node ID.
* @param req Request.
*/
public void clearLocks(UUID nodeId, GridDhtUnlockRequest req) {
assert nodeId != null;

GridCacheVersion obsoleteVer = nextVersion();

List<KeyCacheObject> keys = req.nearKeys();

if (keys != null) {
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();

for (KeyCacheObject key : keys) {
while (true) {
GridDistributedCacheEntry entry = peekExx(key);

try {
if (entry != null) {
entry.doneRemote(
req.version(),
req.version(),
null,
req.committedVersions(),
req.rolledbackVersions(),
/*system invalidate*/false);

// Note that we don't reorder completed versions here,
// as there is no point to reorder relative to the version
// we are about to remove.
if (entry.removeLock(req.version())) {
if (log.isDebugEnabled())
log.debug("Removed lock [lockId=" + req.version() + ", key=" + key + ']');

// Try to evict near entry dht-mapped locally.
evictNearEntry(entry, obsoleteVer, topVer);
}
else {
if (log.isDebugEnabled())
log.debug("Received unlock request for unknown candidate " +
"(added to cancelled locks set): " + req);
}

entry.touch();
}
else if (log.isDebugEnabled())
log.debug("Received unlock request for entry that could not be found: " + req);

break;
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Received remove lock request for removed entry (will retry) [entry=" + entry +
", req=" + req + ']');
}
}
}
}
}

/**
* @param nodeId Node ID.
* @param res Response.
Expand Down Expand Up @@ -312,36 +249,6 @@ private void processLockResponse(UUID nodeId, GridNearLockResponse res) {
return fut;
}

/**
* @param e Transaction entry.
* @param topVer Topology version.
* @return {@code True} if entry is locally mapped as a primary or back up node.
*/
protected boolean isNearLocallyMapped(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
return ctx.affinity().partitionBelongs(ctx.localNode(), e.partition(), topVer);
}

/**
*
* @param e Entry to evict if it qualifies for eviction.
* @param obsoleteVer Obsolete version.
* @param topVer Topology version.
* @return {@code True} if attempt was made to evict the entry.
*/
protected boolean evictNearEntry(GridCacheEntryEx e, GridCacheVersion obsoleteVer, AffinityTopologyVersion topVer) {
assert e != null;
assert obsoleteVer != null;

if (isNearLocallyMapped(e, topVer)) {
if (log.isDebugEnabled())
log.debug("Evicting dht-local entry from near cache [entry=" + e + ", tx=" + this + ']');

return e.markObsolete(obsoleteVer);
}

return false;
}

/** {@inheritDoc} */
@Override public void unlockAll(Collection<? extends K> keys) {
if (keys.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,6 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFutu
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture$4
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture$1
Expand Down

0 comments on commit 5b56c65

Please sign in to comment.