Skip to content

Commit

Permalink
pool: re-introduce limit on number of concurrent p2p transfers
Browse files Browse the repository at this point in the history
Motivation:
During massive data migration from multiple sources to a single pool the
later one gets IO starvation, thus all p2p transfers get stalled. The
majority of such transfers get killed by jtm. The migration get cancelled or
suspended, and restarts doesn't help as the behavior repeats itself.

Modification:
Introduce a semaphore to control the concurrency. The default value is
unlimited (Integer.MAX_VALUE). The pool's `pp set max active` command is
re-introduced (undeprecated).

Result:
never ending data migration (canceled after 3 weeks) completed within 16h.

Acked-by: Lea Morschel
Target: master
Require-book: no
Require-notes: yes
  • Loading branch information
kofemann committed Jan 11, 2024
1 parent 0bbf8f5 commit fea615f
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 8 deletions.
26 changes: 23 additions & 3 deletions modules/dcache/src/main/java/org/dcache/pool/p2p/Companion.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.dcache.pool.repository.Repository;
import org.dcache.pool.repository.RepositoryChannel;
import org.dcache.pool.repository.StickyRecord;
import org.dcache.util.AdjustableSemaphore;
import org.dcache.util.Checksum;
import org.dcache.util.FireAndForgetTask;
import org.dcache.util.Version;
Expand Down Expand Up @@ -155,6 +156,11 @@ class Companion {

private SSLContext _sslContext;

/**
* Semaphore to limit the number of concurrent pool-to-pool transfers.
*/
private final AdjustableSemaphore _concurrency;

/**
* Creates a new instance.
*
Expand Down Expand Up @@ -189,7 +195,8 @@ class Companion {
CacheFileAvailable callback,
boolean forceSourceMode,
Long atime,
SSLContext sslContext) {
SSLContext sslContext,
AdjustableSemaphore concurrency) {
_fsm = new CompanionContext(this);

_executor = executor;
Expand Down Expand Up @@ -222,6 +229,7 @@ class Companion {
_stickyRecords = new ArrayList<>(stickyRecords);

_id = _nextId.getAndIncrement();
_concurrency = concurrency;

synchronized (this) {
_fsm.start();
Expand Down Expand Up @@ -564,8 +572,20 @@ void beginTransfer(final String uri) {
new Thread("P2P Transfer - " + _pnfsId + " " + _sourcePoolName) {
@Override
public void run() {
cdc.restore();
transfer(uri);

boolean release = false;
try {
_concurrency.acquire();
release = true;
cdc.restore();
transfer(uri);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if(release) {
_concurrency.release();
}
}
}
}.start();
}
Expand Down
35 changes: 30 additions & 5 deletions modules/dcache/src/main/java/org/dcache/pool/p2p/P2PClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.dcache.pool.repository.ReplicaState;
import org.dcache.pool.repository.Repository;
import org.dcache.pool.repository.StickyRecord;
import org.dcache.util.AdjustableSemaphore;
import org.dcache.vehicles.FileAttributes;

enum TlsMode {
Expand All @@ -62,6 +63,11 @@ public class P2PClient

private Callable<SSLContext> _sslContext;

/**
* Semaphore to limit the number of concurrent pool-to-pool transfers.
*/
private final AdjustableSemaphore _concurrency = new AdjustableSemaphore(Integer.MAX_VALUE);

public synchronized void setExecutor(ScheduledExecutorService executor) {
_executor = executor;
}
Expand All @@ -83,7 +89,7 @@ public synchronized void setPool(CellStub pool) {
}

public synchronized int getActiveJobs() {
return _companions.size();
return _concurrency.getUsedPermits();
}

public synchronized void setSslContext(Callable<SSLContext> sslContext) {
Expand Down Expand Up @@ -256,7 +262,8 @@ public synchronized int newCompanion(String sourcePoolName,
targetState, stickyRecords,
cb, forceSourceMode,
atime,
context
context,
_concurrency
);

int id = addCompanion(companion);
Expand Down Expand Up @@ -296,6 +303,7 @@ public synchronized P2PData getDataObject() {
P2PData info = new P2PData();
info.setLabel("Pool to Pool");
info.setPpInterface(_interface);
info.setMaxActive(_concurrency.getMaxPermits());
return info;
}

Expand All @@ -305,6 +313,9 @@ public synchronized void printSetup(PrintWriter pw) {
if (_interface != null) {
pw.println("pp interface " + _interface.getHostAddress());
}
if (_concurrency.getMaxPermits() != Integer.MAX_VALUE) {
pw.println("pp set max active " + _concurrency.getMaxPermits());
}
}

@Command(name = "pp set pnfs timeout",
Expand All @@ -321,15 +332,29 @@ public String call() {
}
}

@Command(name = "pp set max active")
@Deprecated
@AffectsSetup
@Command(name = "pp set max active",
hint = "set the maximum number of active pool-to-pool client transfers",
description = "Set the maximum number of active pool-to-pool " +
"(client) concurrent transfers allowed. Any further " +
"requests will be queued. The default is unlimited.")
public class PpSetMaxActiveCommand implements Callable<String> {

@Argument
@Argument(usage = "Specify the maximum number of active pool-to-pool " +
"client transfers. The negative value means unlimited.", metaVar = "max active")
int maxActiveAllowed;

@Override
public String call() throws IllegalArgumentException {
if (maxActiveAllowed == 0) {
throw new IllegalArgumentException("Max active must be greater than 0");
}

if (maxActiveAllowed < 0) {
maxActiveAllowed = Integer.MAX_VALUE;
}

_concurrency.setMaxPermits(maxActiveAllowed);
return "";
}
}
Expand Down
11 changes: 11 additions & 0 deletions modules/dcache/src/main/java/org/dcache/pool/p2p/json/P2PData.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class P2PData implements Serializable {
private String label;
private InetAddress ppInterface;

private int maxActive;

public String getLabel() {
return label;
}
Expand All @@ -85,6 +87,11 @@ public void print(PrintWriter pw) {
if (ppInterface != null) {
pw.println(" Interface : " + ppInterface);
}
if (maxActive == Integer.MAX_VALUE) {
pw.println(" MaxActive : unlimited");
} else {
pw.println(" MaxActive : " + maxActive);
}
}

public void setLabel(String label) {
Expand All @@ -94,4 +101,8 @@ public void setLabel(String label) {
public void setPpInterface(InetAddress ppInterface) {
this.ppInterface = ppInterface;
}

public void setMaxActive(int maxActive) {
this.maxActive = maxActive;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public AdjustableSemaphore() {
// no op
}

public AdjustableSemaphore(int maxPermits) {
setMaxPermits(maxPermits);
}

/*
* Must be synchronized because the underlying int is not thread safe
*/
Expand Down

0 comments on commit fea615f

Please sign in to comment.