From 8a5c358af45586383d87873952407c646f81f4c6 Mon Sep 17 00:00:00 2001 From: Albert Louis Rossi Date: Mon, 2 Oct 2023 11:26:50 -0500 Subject: [PATCH] dcache-bulk: container rewrite to optimize threading Motivation: Benchmarking Bulk PIN requests against SRM's bringonline requests revealed that the current bulk threading implementation was nearly an order of magnitude slower (e.g., 10K target paths in 10 minutes rather than 75 seconds). Investigation determined there were two main issues: Bulk was executing preparatory calls to the PnfsManager synchronously on the same thread as that allocated for the main activity operation. Thread pools were unnecessarily multiple, and all of the BoundedCachedExecutor variety, which does not allow for unbounded pool expansion (unlike SRM, which uses the unbounded cached thread pool). The second point actually makes a huge difference in performance. All of this suggested that the container job needed significant rewriting to get the performance SRM does. Modification: The container job classes have been collapsed into a single concrete implementation class. This has been refactored to use ContainerTask objects. All jobs are given to the unbounded cached pool for execution, but they must acquire a semphore permit before running, releasing it when they have finished. PnfsManager communication is now asynchronous. Callbacks are handled by the calling thread. The callback from the activity itself is resubmitted to the executor pool since it involves a bit more work, and always a database update. Activity semaphores and executors have been removed, along with their related properties. There is one minor internal interface change (parameter type) on the activity perform method. The properties and documentation have been updated. The defaults are set from the testing reported here, though further tweaking may be desirable after thorough stress-testing has been done. Result: See summary under testing. Performance is now very good. Code is also cleaner (IMO). Target: master Request: 9.2 Patch: https://rb.dcache.org/r/14115 Requires-book: yes (changes included) Requires-notes: yes Acked-by: Tigran --- docs/TheBook/src/main/markdown/config-bulk.md | 19 +- .../org/dcache/services/bulk/BulkService.java | 2 +- .../services/bulk/BulkServiceCommands.java | 15 +- .../services/bulk/activity/BulkActivity.java | 50 +- .../bulk/activity/BulkActivityFactory.java | 23 - .../bulk/activity/BulkActivityProvider.java | 10 - .../plugin/delete/DeleteActivity.java | 5 +- .../plugin/log/LogTargetActivity.java | 3 +- .../plugin/pin/PinManagerActivity.java | 3 +- .../plugin/qos/UpdateQoSActivity.java | 3 +- .../bulk/handler/BulkRequestHandler.java | 4 +- .../bulk/job/AbstractRequestContainerJob.java | 611 ---------- .../bulk/job/BulkRequestContainerJob.java | 1008 +++++++++++++++++ .../bulk/job/RequestContainerJob.java | 312 ----- .../bulk/job/RequestContainerJobFactory.java | 35 +- .../bulk/manager/BulkRequestManager.java | 4 +- .../manager/ConcurrentRequestManager.java | 53 +- .../services/bulk/util/BatchedResult.java | 91 -- .../bulk/util/BulkServiceStatistics.java | 16 + .../org/dcache/services/bulk/bulk.xml | 72 +- skel/share/defaults/bulk.properties | 45 +- skel/share/services/bulk.batch | 11 +- 22 files changed, 1127 insertions(+), 1268 deletions(-) delete mode 100644 modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java create mode 100644 modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java delete mode 100644 modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java delete mode 100644 modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java diff --git a/docs/TheBook/src/main/markdown/config-bulk.md b/docs/TheBook/src/main/markdown/config-bulk.md index 71361983627..650503a0d9a 100644 --- a/docs/TheBook/src/main/markdown/config-bulk.md +++ b/docs/TheBook/src/main/markdown/config-bulk.md @@ -90,26 +90,9 @@ a different id. The default lifetime is five minutes (the same as for the NFS do the [QoS Engine](config-qos-engine.md). - **LOG_TARGET** : logs metadata for each target at the INFO level. -Each activity is associated with - -- a permit count (used in connection with a semaphore for throttling execution); -- two thread queues, one for the execution of the container job, -and the other for the execution of callbacks on activity futures; -- a retry policy (currently the only retry policy is a NOP, i.e., no retry). - -The permits are configurable using either the property or the admin shell -command ``request policy``. - +Each activity is associated with a retry policy (currently the only retry policy is a NOP, i.e., no retry). Should other retry policies become available, these can be set via a property. -The number and distribution of thread executors is hard-coded for the activities, but their -respective sizes can be adjusted using the properties: - - ``` - bulk.limits.container-processing-threads=110 - bulk.limits.activity-callback-threads=50 - ``` - ## Container Design Version 2 of the bulk service has introduced improvements for better scalability and recovery. diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java index 55a0f03a112..822debac7b0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkService.java @@ -59,7 +59,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING */ package org.dcache.services.bulk; -import static org.dcache.services.bulk.job.AbstractRequestContainerJob.findAbsolutePath; +import static org.dcache.services.bulk.job.BulkRequestContainerJob.findAbsolutePath; import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; import com.google.common.base.Strings; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java index ad74f3c546c..6ec8bd1bcdc 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/BulkServiceCommands.java @@ -93,6 +93,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.security.auth.Subject; @@ -181,7 +182,7 @@ public final class BulkServiceCommands implements CellCommandListener { /** * name | class | type | permits */ - private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s | %10s "; + private static final String FORMAT_ACTIVITY = "%-20s | %100s | %7s "; /** * name | required | description @@ -267,8 +268,7 @@ private static String formatActivity(Entry entry) return String.format(FORMAT_ACTIVITY, entry.getKey(), provider.getActivityClass(), - provider.getTargetType(), - provider.getMaxPermits()); + provider.getTargetType()); } private static String formatArgument(BulkActivityArgumentDescriptor descriptor) { @@ -550,7 +550,7 @@ public String call() throws Exception { return "There are no mapped activities!"; } - return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE", "PERMITS") + return String.format(FORMAT_ACTIVITY, "NAME", "CLASS", "TYPE") + "\n" + activities; } } @@ -1371,7 +1371,7 @@ public PagedTargetResult call() throws Exception { private BulkActivityFactory activityFactory; private BulkTargetStore targetStore; private BulkServiceStatistics statistics; - private ExecutorService executor; + private ExecutorService executor = Executors.newSingleThreadExecutor(); private JdbcBulkArchiveDao archiveDao; @@ -1390,11 +1390,6 @@ public void setArchiveDao(JdbcBulkArchiveDao archiveDao) { this.archiveDao = archiveDao; } - @Required - public void setExecutor(ExecutorService executor) { - this.executor = executor; - } - @Required public void setRequestManager(BulkRequestManager requestManager) { this.requestManager = requestManager; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java index fa88a53a16b..8ab79b81e58 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivity.java @@ -65,14 +65,13 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.EnumSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import javax.security.auth.Subject; import org.dcache.auth.attributes.Restriction; import org.dcache.namespace.FileAttribute; import org.dcache.services.bulk.BulkServiceException; import org.dcache.services.bulk.activity.retry.BulkTargetRetryPolicy; import org.dcache.services.bulk.activity.retry.NoRetryPolicy; -import org.dcache.services.bulk.util.BatchedResult; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.vehicles.FileAttributes; @@ -98,25 +97,17 @@ public enum TargetType { private static final BulkTargetRetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); - private static final int DEFAULT_PERMITS = 50; - protected final String name; protected final TargetType targetType; protected Subject subject; protected Restriction restriction; - protected Set requiredAttributes; - protected int maxPermits; - protected ExecutorService activityExecutor; - protected ExecutorService callbackExecutor; protected BulkTargetRetryPolicy retryPolicy; protected Set descriptors; protected BulkActivity(String name, TargetType targetType) { this.name = name; this.targetType = targetType; - requiredAttributes = MINIMALLY_REQUIRED_ATTRIBUTES; - maxPermits = DEFAULT_PERMITS; retryPolicy = DEFAULT_RETRY_POLICY; } @@ -124,10 +115,6 @@ public void cancel(BulkRequestTarget target) { target.cancel(); } - public int getMaxPermits() { - return maxPermits; - } - public String getName() { return name; } @@ -144,10 +131,6 @@ public TargetType getTargetType() { return targetType; } - public Set getRequiredAttributes() { - return requiredAttributes; - } - public Subject getSubject() { return subject; } @@ -164,39 +147,10 @@ public void setRestriction(Restriction restriction) { this.restriction = restriction; } - public ExecutorService getActivityExecutor() { - return activityExecutor; - } - - public void setActivityExecutor(ExecutorService activityExecutor) { - this.activityExecutor = activityExecutor; - } - - public ExecutorService getCallbackExecutor() { - return callbackExecutor; - } - - public void setCallbackExecutor(ExecutorService callbackExecutor) { - this.callbackExecutor = callbackExecutor; - } - public void setDescriptors(Set descriptors) { this.descriptors = descriptors; } - public void setMaxPermits(int maxPermits) { - this.maxPermits = maxPermits; - } - - /** - * Completion handler method. Calls the internal implementation. - * - * @param result of the targeted activity. - */ - public void handleCompletion(BatchedResult result) { - handleCompletion(result.getTarget(), result.getFuture()); - } - /** * Performs the activity. * @@ -223,5 +177,5 @@ public abstract ListenableFuture perform(String rid, long tid, FsPath path, F * @param target which has terminate. * @param future the future returned by the activity call to perform(); */ - protected abstract void handleCompletion(BulkRequestTarget target, ListenableFuture future); + public abstract void handleCompletion(BulkRequestTarget target, Future future); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java index 27f3e34711f..2904da6b8f0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityFactory.java @@ -70,7 +70,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; -import java.util.concurrent.ExecutorService; import javax.security.auth.Subject; import org.dcache.auth.Subjects; import org.dcache.auth.attributes.Restriction; @@ -102,9 +101,6 @@ public final class BulkActivityFactory implements CellMessageSender, Environment new HashMap<>()); private Map retryPolicies; - private Map activityExecutors; - private Map callbackExecutors; - private Map maxPermits; private Map environment; private CellStub pnfsManager; @@ -142,8 +138,6 @@ public BulkActivity createActivity(BulkRequest request, Subject subject, bulkActivity.setSubject(subject); bulkActivity.setRestriction(restriction); - bulkActivity.setActivityExecutor(activityExecutors.get(activity)); - bulkActivity.setCallbackExecutor(callbackExecutors.get(activity)); BulkTargetRetryPolicy retryPolicy = retryPolicies.get(activity); if (retryPolicy != null) { bulkActivity.setRetryPolicy(retryPolicy); @@ -163,8 +157,6 @@ public void initialize() { ServiceLoader serviceLoader = ServiceLoader.load(BulkActivityProvider.class); for (BulkActivityProvider provider : serviceLoader) { - String activity = provider.getActivity(); - provider.setMaxPermits(maxPermits.get(activity)); provider.configure(environment); providers.put(provider.getActivity(), provider); } @@ -215,26 +207,11 @@ public void setQoSResponseReceiver(QoSResponseReceiver qoSResponseReceiver) { this.qoSResponseReceiver = qoSResponseReceiver; } - @Required - public void setMaxPermits(Map maxPermits) { - this.maxPermits = maxPermits; - } - @Required public void setRetryPolicies(Map retryPolicies) { this.retryPolicies = retryPolicies; } - @Required - public void setActivityExecutors(Map activityExecutors) { - this.activityExecutors = activityExecutors; - } - - @Required - public void setCallbackExecutors(Map callbackExecutors) { - this.callbackExecutors = callbackExecutors; - } - @Override public void setEnvironment(Map environment) { this.environment = environment; diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java index 80f1fb0e184..9ea066eaac4 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/BulkActivityProvider.java @@ -73,7 +73,6 @@ public abstract class BulkActivityProvider { protected final String activity; protected final TargetType targetType; - protected int maxPermits; protected BulkActivityProvider(String activity, TargetType targetType) { this.activity = activity; @@ -88,14 +87,6 @@ public TargetType getTargetType() { return targetType; } - public int getMaxPermits() { - return maxPermits; - } - - public void setMaxPermits(int maxPermits) { - this.maxPermits = maxPermits; - } - /** * @return an instance of the specific activity type to be configured by factory. * @@ -103,7 +94,6 @@ public void setMaxPermits(int maxPermits) { */ public J createActivity() throws BulkServiceException { J activity = activityInstance(); - activity.setMaxPermits(maxPermits); activity.setDescriptors(getDescriptors()); return activity; } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java index 5a3ce9ab66d..8c3687326b6 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/delete/DeleteActivity.java @@ -71,6 +71,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.namespace.FileType; import org.dcache.services.bulk.activity.BulkActivity; import org.dcache.services.bulk.util.BulkRequestTarget; @@ -105,8 +106,8 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) { } @Override - protected void handleCompletion(BulkRequestTarget target, - ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, + Future future) { PnfsDeleteEntryMessage reply; try { reply = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java index 2185e7601a7..330ac0158ba 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/log/LogTargetActivity.java @@ -63,6 +63,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import com.google.common.util.concurrent.ListenableFuture; import diskCacheV111.util.FsPath; import java.util.Map; +import java.util.concurrent.Future; import org.dcache.services.bulk.activity.BulkActivity; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.services.bulk.util.BulkRequestTarget.State; @@ -110,7 +111,7 @@ public ListenableFuture perform(String ruid, long tid, FsPath } @Override - protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, Future future) { target.setState(State.COMPLETED); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java index e9a17a0b845..02019cfd741 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/pin/PinManagerActivity.java @@ -75,6 +75,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Optional; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.cells.CellStub; import org.dcache.pinmanager.PinManagerAware; import org.dcache.pinmanager.PinManagerPinMessage; @@ -104,7 +105,7 @@ public void setNamespaceHandler(PnfsHandler pnfsHandler) { } @Override - protected void handleCompletion(BulkRequestTarget target, ListenableFuture future) { + public void handleCompletion(BulkRequestTarget target, Future future) { Message reply; try { reply = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java index 9050c4944cb..e61b169dab0 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/activity/plugin/qos/UpdateQoSActivity.java @@ -76,6 +76,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.dcache.cells.CellStub; import org.dcache.qos.QoSException; import org.dcache.qos.data.FileQoSRequirements; @@ -197,7 +198,7 @@ protected void configure(Map arguments) throws BulkServiceExcept @Override public void handleCompletion(BulkRequestTarget target, - ListenableFuture future) { + Future future) { QoSTransitionCompletedMessage message; try { message = getUninterruptibly(future); diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java index 2311ef5470c..9561b88c719 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/handler/BulkRequestHandler.java @@ -74,7 +74,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkRequestStatus; import org.dcache.services.bulk.BulkServiceException; import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.job.RequestContainerJobFactory; import org.dcache.services.bulk.manager.BulkRequestManager; import org.dcache.services.bulk.store.BulkRequestStore; @@ -210,7 +210,7 @@ public void setTargetStore(BulkTargetStore targetStore) { @Override public void submitRequestJob(BulkRequest request) throws BulkServiceException { - AbstractRequestContainerJob job = jobFactory.createRequestJob(request); + BulkRequestContainerJob job = jobFactory.createRequestJob(request); if (storeJobTarget(job.getTarget())) { requestManager.submit(job); } diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java deleted file mode 100644 index 9aef072df61..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/AbstractRequestContainerJob.java +++ /dev/null @@ -1,611 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.job; - -import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.CANCELLED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.COMPLETED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED; -import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; - -import com.google.common.base.Throwables; -import com.google.common.collect.Range; -import diskCacheV111.util.CacheException; -import diskCacheV111.util.FsPath; -import diskCacheV111.util.NamespaceHandlerAware; -import diskCacheV111.util.PnfsHandler; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; -import javax.security.auth.Subject; -import org.dcache.auth.attributes.Restriction; -import org.dcache.services.bulk.BulkRequest; -import org.dcache.services.bulk.BulkRequest.Depth; -import org.dcache.services.bulk.BulkServiceException; -import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.activity.BulkActivity; -import org.dcache.services.bulk.activity.BulkActivity.TargetType; -import org.dcache.services.bulk.store.BulkTargetStore; -import org.dcache.services.bulk.util.BatchedResult; -import org.dcache.services.bulk.util.BulkRequestTarget; -import org.dcache.services.bulk.util.BulkRequestTarget.PID; -import org.dcache.services.bulk.util.BulkRequestTarget.State; -import org.dcache.services.bulk.util.BulkRequestTargetBuilder; -import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.util.BoundedExecutor; -import org.dcache.util.SignalAware; -import org.dcache.util.list.DirectoryEntry; -import org.dcache.util.list.DirectoryStream; -import org.dcache.util.list.ListDirectoryHandler; -import org.dcache.vehicles.FileAttributes; -import org.dcache.vehicles.PnfsResolveSymlinksMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for the implementations. Acts as a container for a list of targets which may or may - * not be associated with each other via a common parent. It handles all targets by calling - * activity.perform() on them serially using the activity's semaphore, and then holding them in a - * map as waiting tasks with a callback listener. - */ -public abstract class AbstractRequestContainerJob - implements Runnable, NamespaceHandlerAware, Comparable, - UncaughtExceptionHandler { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRequestContainerJob.class); - - public static FsPath findAbsolutePath(String prefix, String path) { - FsPath absPath = computeFsPath(null, path); - if (prefix == null) { - return absPath; - } - - FsPath pref = FsPath.create(prefix); - - if (!absPath.hasPrefix(pref)) { - absPath = computeFsPath(prefix, path); - } - - return absPath; - } - - abstract class DirListTask implements Runnable { - - protected Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); - - public void run() { - try { - doList(); - } catch (InterruptedException e) { - containerState = ContainerState.STOP; - target.setErrorObject(e); - update(CANCELLED); - } catch (Throwable e) { - errorHandler.accept(e); - Throwables.throwIfUnchecked(e); - } - } - - protected abstract void doList() throws Throwable; - } - - enum ContainerState { - START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP - } - - protected final BulkRequest request; - protected final BulkActivity activity; - protected final Long rid; - protected final String ruid; - protected final Depth depth; - protected final String targetPrefix; - protected final Map waiting; - - /** - * A temporary placeholder for tracking purposes; it will not be the same as the actual - * autogenerated key in the database. - */ - protected final AtomicLong id; - protected final BulkServiceStatistics statistics; - protected final AtomicInteger dirExpansionCount; - - protected BulkTargetStore targetStore; - protected PnfsHandler pnfsHandler; - protected Semaphore semaphore; - - protected volatile ContainerState containerState; - - private final TargetType targetType; - private final BulkRequestTarget target; - private final Subject subject; - private final Restriction restriction; - private final Set cancelledPaths; - - private ListDirectoryHandler listHandler; - private SignalAware callback; - - private BoundedExecutor dirListExecutor; - private Thread runThread; - - AbstractRequestContainerJob(BulkActivity activity, BulkRequestTarget target, - BulkRequest request, BulkServiceStatistics statistics) { - id = new AtomicLong(0L); - this.request = request; - this.activity = activity; - this.target = target; - this.subject = activity.getSubject(); - this.restriction = activity.getRestriction(); - this.statistics = statistics; - waiting = new HashMap<>(); - cancelledPaths = new HashSet<>(); - rid = request.getId(); - ruid = request.getUid(); - depth = request.getExpandDirectories(); - targetPrefix = request.getTargetPrefix(); - targetType = activity.getTargetType(); - semaphore = new Semaphore(activity.getMaxPermits()); - containerState = ContainerState.START; - dirExpansionCount = new AtomicInteger(0); - } - - public void cancel() { - containerState = ContainerState.STOP; - - target.cancel(); - - LOGGER.debug("cancel {}: target state is now {}.", ruid, target.getState()); - - semaphore.drainPermits(); - - interruptRunThread(); - - synchronized (waiting) { - LOGGER.debug("cancel {}: waiting {}.", ruid, waiting.size()); - waiting.values().forEach(r -> r.cancel(activity)); - LOGGER.debug("cancel {}: waiting targets cancelled.", ruid); - waiting.clear(); - } - - LOGGER.debug("cancel {}: calling cancel all on target store.", ruid); - targetStore.cancelAll(rid); - - signalStateChange(); - } - - public void cancel(long id) { - synchronized (waiting) { - for (Iterator i = waiting.values().iterator(); i.hasNext(); ) { - BatchedResult result = i.next(); - if (result.getTarget().getId() == id) { - result.cancel(activity); - i.remove(); - break; - } - } - } - - try { - targetStore.update(id, CANCELLED, null, null); - } catch (BulkStorageException e) { - LOGGER.error("Failed to cancel {}::{}: {}.", ruid, id, e.toString()); - } - } - - public void cancel(String path) { - FsPath toCancel = findAbsolutePath(targetPrefix, path); - - Optional found; - - synchronized (waiting) { - found = waiting.values().stream().filter(r -> r.getTarget().getPath().equals(toCancel)) - .findAny(); - } - - if (found.isPresent()) { - cancel(found.get().getTarget().getId()); - } else { - synchronized (cancelledPaths) { - cancelledPaths.add(toCancel); - } - } - } - - @Override - public int compareTo(AbstractRequestContainerJob other) { - return target.getKey().compareTo(other.target.getKey()); - } - - public BulkActivity getActivity() { - return activity; - } - - public BulkRequestTarget getTarget() { - return target; - } - - public void initialize() { - LOGGER.trace("BulkJob {}, initialize() called ...", target.getKey()); - target.setState(READY); - containerState = ContainerState.PROCESS_FILES; - } - - public synchronized boolean isReady() { - switch (target.getState()) { - case READY: - case CREATED: - return true; - default: - return false; - } - } - - @Override - public void run() { - setRunThread(Thread.currentThread()); - try { - switch (containerState) { - case PROCESS_FILES: - preprocessTargets(); - checkForRequestCancellation(); - processFileTargets(); - checkForRequestCancellation(); - containerState = ContainerState.WAIT; - break; - case PROCESS_DIRS: - checkForRequestCancellation(); - semaphore = new Semaphore(1); /* synchronous */ - processDirTargets(); - containerState = ContainerState.STOP; - update(COMPLETED); - break; - case STOP: - LOGGER.debug("run {} was prematurely stopped; exiting", ruid); - update(CANCELLED); - setRunThread(null); - return; - default: - throw new RuntimeException( - "run container called with container in wrong state " + containerState - + "; this is a bug."); - } - } catch (InterruptedException e) { - LOGGER.debug("run {} interrupted", ruid); - /* - * If the state has not already been set to terminal, do so. - */ - containerState = ContainerState.STOP; - update(CANCELLED); - } - setRunThread(null); - checkTransitionToDirs(); - } - - public void setListHandler(ListDirectoryHandler listHandler) { - this.listHandler = listHandler; - } - - public void setDirListExecutor(BoundedExecutor dirListExecutor) { - this.dirListExecutor = dirListExecutor; - } - - @Override - public void setNamespaceHandler(PnfsHandler pnfsHandler) { - this.pnfsHandler = pnfsHandler; - } - - public void setTargetStore(BulkTargetStore targetStore) { - this.targetStore = targetStore; - } - - public void setCallback(SignalAware callback) { - this.callback = callback; - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - /* - * Don't leave the request in non-terminal state in case of uncaught exception. - * We also try to handle uncaught exceptions here, so as not to kill the - * manager thread. - */ - containerState = ContainerState.STOP; - target.setErrorObject(e); - update(FAILED); - ThreadGroup group = t.getThreadGroup(); - if (group != null) { - group.uncaughtException(t, e); - } else { - LOGGER.error("Uncaught exception: please report to team@dcache.org", e); - } - } - - public void update(State state) { - if (target.setState(state)) { - try { - targetStore.update(target.getId(), target.getState(), target.getErrorType(), - target.getErrorMessage()); - } catch (BulkStorageException e) { - LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); - } - signalStateChange(); - } - } - - protected void checkForRequestCancellation() throws InterruptedException { - if (isRunThreadInterrupted() || containerState == ContainerState.STOP - || target.isTerminated()) { - throw new InterruptedException(); - } - } - - protected DirectoryStream getDirectoryListing(FsPath path) - throws CacheException, InterruptedException { - LOGGER.trace("getDirectoryListing {}, path {}, calling list ...", ruid, path); - return listHandler.list(subject, restriction, path, null, - Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); - } - - protected void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) - throws BulkServiceException, CacheException, InterruptedException { - checkForRequestCancellation(); - DirListTask task = new DirListTask() { - @Override - public void doList() throws Throwable { - try { - DirectoryStream stream = getDirectoryListing(path); - for (DirectoryEntry entry : stream) { - LOGGER.trace("expandDepthFirst {}, directory {}, entry {}", ruid, path, - entry.getName()); - FsPath childPath = path.child(entry.getName()); - FileAttributes childAttributes = entry.getFileAttributes(); - - switch (childAttributes.getFileType()) { - case DIR: - switch (depth) { - case ALL: - LOGGER.debug("expandDepthFirst {}, found directory {}, " - + "expand ALL.", ruid, childPath); - expandDepthFirst(null, PID.DISCOVERED, childPath, - childAttributes); - break; - case TARGETS: - switch (targetType) { - case BOTH: - case DIR: - handleDirTarget(null, PID.DISCOVERED, childPath, - childAttributes); - } - break; - } - break; - case LINK: - case REGULAR: - handleFileTarget(PID.DISCOVERED, childPath, childAttributes); - break; - case SPECIAL: - default: - LOGGER.trace("expandDepthFirst {}, cannot handle special " - + "file {}.", ruid, childPath); - break; - } - - checkForRequestCancellation(); - } - - switch (targetType) { - case BOTH: - case DIR: - handleDirTarget(id, pid, path, dirAttributes); - break; - case FILE: - /* - * Because we now store all initial targets immediately, - * we need to mark such a directory as SKIPPED; otherwise - * the request will not complete on the basis of querying for - * completed targets and finding this one unhandled. - */ - if (pid == PID.INITIAL) { - targetStore.storeOrUpdate( - toTarget(id, pid, path, Optional.of(dirAttributes), - SKIPPED, null)); - } - } - } finally { - dirExpansionCount.decrementAndGet(); - checkTransitionToDirs(); - } - } - }; - - dirExpansionCount.incrementAndGet(); - - /* - * The executor is shared among containers. To avoid total inactivity should - * the running container be starved by other containers, we allow it - * to execute on its own thread if all other threads are currently occupied. - */ - if (dirListExecutor.getWorkQueueSize() >= dirListExecutor.getMaximumPoolSize()) { - task.run(); - } else { - dirListExecutor.execute(task); - } - } - - protected List getInitialTargets() { - return targetStore.getInitialTargets(rid, true); - } - - protected boolean hasBeenCancelled(Long id, PID pid, FsPath path, FileAttributes attributes) { - synchronized (cancelledPaths) { - if (cancelledPaths.remove(path.toString())) { - BulkRequestTarget target = toTarget(id, pid, path, Optional.of(attributes), - CANCELLED, null); - try { - if (id == null) { - targetStore.store(target); - } else { - targetStore.update(target.getId(), CANCELLED, null, null); - } - } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, path, e.toString()); - } - return true; - } - } - - return false; - } - - protected void preprocessTargets() throws InterruptedException { - // NOP default - } - - protected void removeTarget(BulkRequestTarget target) { - synchronized (waiting) { - waiting.remove(target.getPath()); - } - - semaphore.release(); - - checkTransitionToDirs(); - } - - protected FsPath resolvePath(String targetPath) throws CacheException { - PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage(targetPath, null); - message = pnfsHandler.request(message); - return FsPath.create(message.getResolvedPath()); - } - - private void checkTransitionToDirs() { - if (dirExpansionCount.get() <= 0 && semaphore.availablePermits() == activity.getMaxPermits()) { - synchronized (this) { - if (containerState == ContainerState.WAIT) { - containerState = ContainerState.PROCESS_DIRS; - activity.getActivityExecutor().submit(this); - } - } - } - } - - protected BulkRequestTarget toTarget(Long id, PID pid, FsPath path, Optional attributes, - State state, Throwable throwable) { - String errorType = null; - String errorMessage = null; - Throwable root = null; - if (throwable != null) { - root = Throwables.getRootCause(throwable); - errorType = root.getClass().getCanonicalName(); - errorMessage = root.getMessage(); - } - - return BulkRequestTargetBuilder.builder(statistics).attributes(attributes.orElse(null)) - .activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state) - .createdAt(System.currentTimeMillis()).errorType(errorType) - .errorMessage(errorMessage).path(path).build(); - } - - protected abstract void handleFileTarget(PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException; - - protected abstract void handleDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException; - - protected abstract void processFileTargets() throws InterruptedException; - - protected abstract void processDirTargets() throws InterruptedException; - - protected abstract void retryFailed(BatchedResult result, FileAttributes attributes) - throws BulkStorageException; - - private void signalStateChange() { - if (callback != null) { - callback.signal(); - } - } - - private synchronized void interruptRunThread() { - if (runThread != null) { - runThread.interrupt(); - LOGGER.debug("cancel {}: container job interrupted.", ruid); - } - } - - private synchronized boolean isRunThreadInterrupted() { - return runThread != null && runThread.isInterrupted(); - } - - private synchronized void setRunThread(Thread runThread) { - this.runThread = runThread; - if (runThread != null) { - this.runThread.setUncaughtExceptionHandler(this); - } - } -} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java new file mode 100644 index 00000000000..53cd8198fe8 --- /dev/null +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/BulkRequestContainerJob.java @@ -0,0 +1,1008 @@ +/* +COPYRIGHT STATUS: +Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and +software are sponsored by the U.S. Department of Energy under Contract No. +DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide +non-exclusive, royalty-free license to publish or reproduce these documents +and software for U.S. Government purposes. All documents and software +available from this server are private under the U.S. and Foreign +Copyright Laws, and FNAL reserves all rights. + +Distribution of the software available from this server is free of +charge subject to the user following the terms of the Fermitools +Software Legal Information. + +Redistribution and/or modification of the software shall be accompanied +by the Fermitools Software Legal Information (including the copyright +notice). + +The user is asked to feed back problems, benefits, and/or suggestions +about the software to the Fermilab Software Providers. + +Neither the name of Fermilab, the URA, nor the names of the contributors +may be used to endorse or promote products derived from this software +without specific prior written permission. + +DISCLAIMER OF LIABILITY (BSD): + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, +OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT +OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR +BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +Liabilities of the Government: + +This software is provided by URA, independent from its Prime Contract +with the U.S. Department of Energy. URA is acting independently from +the Government and in its own private capacity and is not acting on +behalf of the U.S. Government, nor as its contractor nor its agent. +Correspondingly, it is understood and agreed that the U.S. Government +has no connection to this software and in no manner whatsoever shall +be liable for nor assume any responsibility or obligation for any claim, +cost, or damages arising out of or resulting from the use of the software +available from this server. + +Export Control: + +All documents and software available from this server are subject to U.S. +export control laws. Anyone downloading information from this server is +obligated to secure any necessary Government licenses before exporting +documents or software obtained from this server. + */ +package org.dcache.services.bulk.job; + +import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.CANCELLED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.COMPLETED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.CREATED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.READY; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING; +import static org.dcache.services.bulk.util.BulkRequestTarget.State.SKIPPED; +import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; + +import com.google.common.base.Throwables; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import diskCacheV111.util.CacheException; +import diskCacheV111.util.FsPath; +import diskCacheV111.util.NamespaceHandlerAware; +import diskCacheV111.util.PnfsHandler; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import javax.security.auth.Subject; +import org.dcache.auth.attributes.Restriction; +import org.dcache.cells.AbstractMessageCallback; +import org.dcache.cells.CellStub; +import org.dcache.namespace.FileType; +import org.dcache.services.bulk.BulkRequest; +import org.dcache.services.bulk.BulkRequest.Depth; +import org.dcache.services.bulk.BulkServiceException; +import org.dcache.services.bulk.BulkStorageException; +import org.dcache.services.bulk.activity.BulkActivity; +import org.dcache.services.bulk.activity.BulkActivity.TargetType; +import org.dcache.services.bulk.store.BulkTargetStore; +import org.dcache.services.bulk.util.BulkRequestTarget; +import org.dcache.services.bulk.util.BulkRequestTarget.PID; +import org.dcache.services.bulk.util.BulkRequestTarget.State; +import org.dcache.services.bulk.util.BulkRequestTargetBuilder; +import org.dcache.services.bulk.util.BulkServiceStatistics; +import org.dcache.util.CacheExceptionFactory; +import org.dcache.util.SignalAware; +import org.dcache.util.list.DirectoryEntry; +import org.dcache.util.list.DirectoryStream; +import org.dcache.util.list.ListDirectoryHandler; +import org.dcache.vehicles.FileAttributes; +import org.dcache.vehicles.PnfsGetFileAttributes; +import org.dcache.vehicles.PnfsResolveSymlinksMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Container job for a list of targets which may or may not be associated with each other via a + * common parent. It handles all file targets asynchronously, recurs if directory listing is enabled, + * and processes directory targets serially last in depth-first reverse order. + */ +public final class BulkRequestContainerJob + implements Runnable, NamespaceHandlerAware, Comparable, + UncaughtExceptionHandler { + + private static final Logger LOGGER = LoggerFactory.getLogger(BulkRequestContainerJob.class); + + private static final DirTargetSorter SORTER = new DirTargetSorter(); + + static final AtomicLong taskCounter = new AtomicLong(0L); + + public static FsPath findAbsolutePath(String prefix, String path) { + FsPath absPath = computeFsPath(null, path); + if (prefix == null) { + return absPath; + } + + FsPath pref = FsPath.create(prefix); + + if (!absPath.hasPrefix(pref)) { + absPath = computeFsPath(prefix, path); + } + + return absPath; + } + + /** + * Directories that serve as targets. These are stored in memory, sorted and processed last. + */ + static class DirTarget { + final FsPath path; + final FileAttributes attributes; + final PID pid; + final Long id; + final int depth; + + DirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { + this.id = id; + this.pid = pid; + this.attributes = attributes; + this.path = path; + depth = (int)path.toString().chars().filter(c -> c == '/').count(); + } + } + + /** + * Depth-first (descending order). + */ + static class DirTargetSorter implements Comparator { + @Override + public int compare(DirTarget o1, DirTarget o2) { + return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ + } + } + + /** + * Container delays processing directory targets until the final step. + */ + enum ContainerState { + START, PROCESS_FILES, WAIT, PROCESS_DIRS, STOP + } + + /** + * Only INITIAL targets go through all three states. DISCOVERED targets + * already have their proper paths and attributes from listing. + */ + enum TaskState { + RESOLVE_PATH, FETCH_ATTRIBUTES, HANDLE_TARGET, HANDLE_DIR_TARGET + } + + /** + * Wrapper task for directory listing and target processing. + */ + abstract class ContainerTask implements Runnable { + final Consumer errorHandler = e -> uncaughtException(Thread.currentThread(), e); + final long seqNo; + + Future taskFuture; + + ContainerTask() { + seqNo = taskCounter.getAndIncrement(); + } + + public void run() { + try { + doInner(); + } catch (InterruptedException e) { + remove(); + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(CANCELLED); + } catch (Throwable e) { + remove(); + errorHandler.accept(e); + Throwables.throwIfUnchecked(e); + } + } + + void cancel() { + if (taskFuture != null) { + taskFuture.cancel(true); + } + remove(); + } + + void expandDepthFirst(Long id, PID pid, FsPath path, FileAttributes dirAttributes) + throws BulkServiceException, CacheException, InterruptedException { + LOGGER.debug("{} - expandDepthFirst, {}, {}, {}, {}", ruid, id, pid, path, dirAttributes); + new DirListTask(id, pid, path, dirAttributes).submitAsync(); + } + + void submitAsync() throws InterruptedException { + checkForRequestCancellation(); + + synchronized (running) { + running.put(seqNo, this); + LOGGER.debug("{} - submitAsync {}, task count is now {}.", ruid, seqNo, running.size()); + } + taskFuture = executor.submit(this); + } + + void remove() { + synchronized (running) { + running.remove(seqNo); + LOGGER.debug("{} - remove task {}, task count now {}.", ruid, seqNo, running.size()); + } + + checkTransitionToDirs(); + } + + abstract void doInner() throws Throwable; + } + + class DirListTask extends ContainerTask { + final Long id; + final PID pid; + final FsPath path; + final FileAttributes dirAttributes; + + DirListTask(Long id, PID pid, FsPath path, FileAttributes dirAttributes) { + this.id = id; + this.pid = pid; + this.path = path; + this.dirAttributes = dirAttributes; + } + + void doInner() throws Throwable { + try { + DirectoryStream stream = getDirectoryListing(path); + for (DirectoryEntry entry : stream) { + LOGGER.debug("{} - DirListTask, directory {}, entry {}", ruid, path, + entry.getName()); + FsPath childPath = path.child(entry.getName()); + FileAttributes childAttributes = entry.getFileAttributes(); + + switch (childAttributes.getFileType()) { + case DIR: + switch (depth) { + case ALL: + expandDepthFirst(null, PID.DISCOVERED, childPath, + childAttributes); + break; + case TARGETS: + switch (targetType) { + case BOTH: + case DIR: + addDirTarget(null, PID.DISCOVERED, childPath, + childAttributes); + } + break; + } + break; + case LINK: + case REGULAR: + new TargetTask( + toTarget(null, PID.DISCOVERED, childPath, + Optional.of(childAttributes), CREATED, null), + TaskState.HANDLE_TARGET).submitAsync(); + break; + case SPECIAL: + default: + LOGGER.trace("{} - DirListTask, cannot handle special file {}.", + ruid, childPath); + break; + } + } + + switch (targetType) { + case BOTH: + case DIR: + addDirTarget(id, pid, path, dirAttributes); + break; + case FILE: + /* + * Because we now store all initial targets immediately, + * we need to mark such a directory as SKIPPED; otherwise + * the request will not complete on the basis of querying for + * completed targets and finding this one unhandled. + */ + if (pid == PID.INITIAL) { + targetStore.storeOrUpdate( + toTarget(id, pid, path, Optional.of(dirAttributes), + SKIPPED, null)); + } + } + } finally { + remove(); + } + } + + private void addDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { + LOGGER.debug("{} - DirListTask, addDirTarget, adding directory {} ...", ruid, path); + dirs.add(new DirTarget(id, pid, path, attributes)); + } + + private DirectoryStream getDirectoryListing(FsPath path) + throws CacheException, InterruptedException { + dirListSemaphore.acquire(); + try { + LOGGER.debug("{} - DirListTask, getDirectoryListing for path {}, calling list ...", + ruid, path); + return listHandler.list(subject, restriction, path, null, + Range.closedOpen(0, Integer.MAX_VALUE), MINIMALLY_REQUIRED_ATTRIBUTES); + } finally { + dirListSemaphore.release(); + } + } + } + + class TargetTask extends ContainerTask { + + final BulkRequestTarget target; + + /* + * From activity.perform() + */ + ListenableFuture activityFuture; + + /* + * Determines the doInner() switch + */ + TaskState state; + + boolean holdingPermit; + + TargetTask(BulkRequestTarget target, TaskState initialState) { + this.target = target; + state = initialState; + } + + void cancel() { + if (target != null) { + activity.cancel(target); + } + + super.cancel(); + } + + @Override + void doInner() throws Throwable { + switch (state) { + case RESOLVE_PATH: + resolvePath(); + break; + case FETCH_ATTRIBUTES: + fetchAttributes(); + break; + case HANDLE_DIR_TARGET: + performActivity(); + break; + case HANDLE_TARGET: + default: + switch (depth) { + case NONE: + performActivity(); + break; + default: + handleTarget(); + } + break; + } + } + + @Override + void submitAsync() throws InterruptedException { + if (!holdingPermit) { + inFlightSemaphore.acquire(); + holdingPermit = true; + } + super.submitAsync(); + } + + void remove() { + super.remove(); + if (holdingPermit) { + inFlightSemaphore.release(); + holdingPermit = false; + } + } + + void performSync() throws InterruptedException { + performActivity(false); + + try { + activityFuture.get(); + } catch (ExecutionException e) { + activityFuture = Futures.immediateFailedFuture(e.getCause()); + } + + handleCompletion(); + } + + /** + * (1) symlink resolution on initial targets; bypassed for discovered targets. + */ + private void resolvePath() throws InterruptedException { + LOGGER.debug("{} - resolvePath, resolving {}", ruid, target.getPath()); + PnfsResolveSymlinksMessage message = new PnfsResolveSymlinksMessage( + target.getPath().toString(), null); + ListenableFuture requestFuture = pnfsHandler.requestAsync( + message); + CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { + @Override + public void success(PnfsResolveSymlinksMessage message) { + LOGGER.debug("{} - resolvePath {}, callback success.", ruid, target.getPath()); + FsPath path = FsPath.create(message.getResolvedPath()); + if (targetPrefix != null && !path.contains(targetPrefix)) { + path = computeFsPath(targetPrefix, path.toString()); + } + LOGGER.debug("{} - resolvePath, resolved path {}", ruid, path); + target.setPath(path); + state = TaskState.FETCH_ATTRIBUTES; + taskFuture = executor.submit(TargetTask.this); + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("{} - resolvePath, callback failure for {}.", ruid, target); + try { + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); + } catch (InterruptedException e) { + errorHandler.accept(e); + } finally { + remove(); + } + } + }, MoreExecutors.directExecutor()); + } + + /** + * (2) retrieval of required file attributes. + */ + private void fetchAttributes() throws InterruptedException { + LOGGER.debug("{} - fetchAttributes for path {}", ruid, target.getPath()); + PnfsGetFileAttributes message = new PnfsGetFileAttributes(target.getPath().toString(), + MINIMALLY_REQUIRED_ATTRIBUTES); + ListenableFuture requestFuture = pnfsHandler.requestAsync(message); + CellStub.addCallback(requestFuture, new AbstractMessageCallback<>() { + @Override + public void success(PnfsGetFileAttributes message) { + LOGGER.debug("{} - fetchAttributes for path {}, callback success.", + ruid, target.getPath()); + FileAttributes attributes = message.getFileAttributes(); + target.setAttributes(attributes); + state = TaskState.HANDLE_TARGET; + taskFuture = executor.submit(TargetTask.this); + } + + @Override + public void failure(int rc, Object error) { + LOGGER.error("{} - fetchAttributes, callback failure for {}.", ruid, target); + try { + storeOrUpdate(CacheExceptionFactory.exceptionOf( + rc, Objects.toString(error, null))); + } catch (InterruptedException e) { + errorHandler.accept(e); + } finally { + remove(); + } + } + }, MoreExecutors.directExecutor()); + } + + /** + * (3b) either recurs on directory or performs activity on file. + */ + private void handleTarget() throws InterruptedException { + LOGGER.debug("{} - handleTarget for {}, path {}.", ruid, target.getActivity(), + target.getPath()); + FileAttributes attributes = target.getAttributes(); + FileType type = attributes.getFileType(); + try { + if (type == FileType.DIR) { + storeOrUpdate(null); + expandDepthFirst(target.getId(), target.getPid(), target.getPath(), attributes); + /* + * Swap out for the directory listing task. + * (We must do this AFTER the directory task has been added to running.) + */ + remove(); + } else if (type != FileType.SPECIAL) { + performActivity(); + } + } catch (BulkServiceException | CacheException e) { + LOGGER.error("handleTarget {}, path {}, error {}.", ruid, target.getPath(), + e.getMessage()); + storeOrUpdate(e); + } + } + + /** + * (3a) Performs activity on either file or directory target. + */ + private void performActivity() throws InterruptedException { + performActivity(true); + } + + private void performActivity(boolean async) throws InterruptedException { + Long id = target.getId(); + FsPath path = target.getPath(); + FileAttributes attributes = target.getAttributes(); + LOGGER.debug("{} - performActivity {} on {}.", ruid, activity, path); + + storeOrUpdate(null); + + if (hasBeenCancelled(this)) { + LOGGER.debug("{} - performActivity hasBeenCancelled for {}.", ruid, path); + remove(); + } + + try { + activityFuture = activity.perform(ruid, id == null ? seqNo : id, path, attributes); + if (async) { + activityFuture.addListener(() -> handleCompletion(), executor); + } + } catch (BulkServiceException | UnsupportedOperationException e) { + LOGGER.error("{}, perform failed for {}: {}", ruid, target, e.getMessage()); + activityFuture = Futures.immediateFailedFuture(e); + if (async) { + handleCompletion(); + } + } + } + + private void handleCompletion() { + LOGGER.debug("{} - handleCompletion {}", ruid, target.getPath()); + + State state = RUNNING; + try { + activity.handleCompletion(target, activityFuture); + state = target.getState(); + + if (state == FAILED && activity.getRetryPolicy().shouldRetry(target)) { + retryFailed(); + return; + } + + targetStore.update(target.getId(), state, target.getErrorType(), + target.getErrorMessage()); + } catch (BulkStorageException e) { + LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, + target.getId(), target.getPath(), target.getAttributes(), e.toString()); + } + + if (state == FAILED && request.isCancelOnFailure()) { + cancel(); + } else { + remove(); + } + } + + private void retryFailed() throws BulkStorageException { + LOGGER.debug("{} - retryFailed {}.", ruid, target); + target.resetToReady(); + try { + performActivity(); + } catch (InterruptedException e) { + LOGGER.debug("{}. retryFailed {}, interrupted.", ruid, target); + activityFuture = Futures.immediateFailedFuture(e); + handleCompletion(); + } + } + + private void storeOrUpdate(Throwable error) throws InterruptedException { + LOGGER.debug("{} - storeOrUpdate {}.", ruid, target); + + if (hasBeenCancelled(this)) { + LOGGER.debug("{} - storeOrUpdate, hasBeenCancelled {}.", ruid, target.getPath()); + return; + } + + target.setState(error == null ? RUNNING : FAILED); + target.setErrorObject(error); + + try { + /* + * If this is an insert (id == null), the target id will be updated to what is + * returned from the database. + */ + targetStore.storeOrUpdate(target); + LOGGER.debug("{} - storeOrUpdate, target id {}", ruid, target.getId()); + } catch (BulkStorageException e) { + LOGGER.error("{}, could not store or update target from result {}, {}, {}: {}.", ruid, + target.getId(), target.getPath(), target.getAttributes(), e.toString()); + error = e; + } + + if (error != null) { + remove(); + } + } + } + + private final BulkRequest request; + private final BulkActivity activity; + private final Long rid; + private final String ruid; + private final Depth depth; + private final String targetPrefix; + private final BulkServiceStatistics statistics; + private final TargetType targetType; + private final BulkRequestTarget jobTarget; + private final Subject subject; + private final Restriction restriction; + + private final Map running; + private final Set cancelledPaths; + private final Queue dirs; + + private BulkTargetStore targetStore; + private PnfsHandler pnfsHandler; + private ListDirectoryHandler listHandler; + private SignalAware callback; + private Thread runThread; + private ExecutorService executor; + private Semaphore dirListSemaphore; + private Semaphore inFlightSemaphore; + + private volatile ContainerState containerState; + + public BulkRequestContainerJob(BulkActivity activity, BulkRequestTarget jobTarget, + BulkRequest request, BulkServiceStatistics statistics) { + this.request = request; + this.activity = activity; + this.jobTarget = jobTarget; + this.subject = activity.getSubject(); + this.restriction = activity.getRestriction(); + this.statistics = statistics; + + rid = request.getId(); + ruid = request.getUid(); + depth = request.getExpandDirectories(); + targetPrefix = request.getTargetPrefix(); + targetType = activity.getTargetType(); + + running = new HashMap<>(); + cancelledPaths = new HashSet<>(); + dirs = new ConcurrentLinkedQueue<>(); + + containerState = ContainerState.START; + } + + public void cancel() { + containerState = ContainerState.STOP; + + jobTarget.cancel(); + + LOGGER.debug("{} - cancel: target state is now {}.", ruid, jobTarget.getState()); + + interruptRunThread(); + + synchronized (running) { + LOGGER.debug("{} - cancel: running {}.", ruid, running.size()); + running.values().forEach(ContainerTask::cancel); + LOGGER.debug("{} - cancel: running targets cancelled.", ruid); + running.clear(); + } + + LOGGER.debug("{} - cancel: calling cancel all on target store.", ruid); + targetStore.cancelAll(rid); + + signalStateChange(); + } + + public void cancel(long targetId) { + synchronized (running) { + for (Iterator i = running.values().iterator(); i.hasNext(); ) { + ContainerTask task = i.next(); + if (task instanceof TargetTask + && targetId == ((TargetTask) task).target.getId()) { + task.cancel(); + i.remove(); + break; + } + } + } + + try { + targetStore.update(targetId, CANCELLED, null, null); + } catch (BulkStorageException e) { + LOGGER.error("Failed to cancel {}::{}: {}.", ruid, targetId, e.toString()); + } + } + + public void cancel(String targetPath) { + LOGGER.debug("{} - cancel path {}.", ruid, targetPath); + FsPath toCancel = findAbsolutePath(targetPrefix, targetPath); + + Optional found; + + synchronized (running) { + found = running.values().stream().filter(TargetTask.class::isInstance) + .map(TargetTask.class::cast).filter(t -> t.target.getPath().equals(toCancel)) + .findAny(); + } + + if (found.isPresent()) { + cancel(found.get().target.getId()); + } else { + synchronized (cancelledPaths) { + cancelledPaths.add(toCancel); + } + } + } + + @Override + public int compareTo(BulkRequestContainerJob other) { + return jobTarget.getKey().compareTo(other.jobTarget.getKey()); + } + + public BulkActivity getActivity() { + return activity; + } + + public BulkRequestTarget getTarget() { + return jobTarget; + } + + public void initialize() { + LOGGER.trace("BulkRequestContainerJob {}, initialize() called ...", jobTarget.getKey()); + jobTarget.setState(READY); + containerState = ContainerState.PROCESS_FILES; + } + + public synchronized boolean isReady() { + switch (jobTarget.getState()) { + case READY: + case CREATED: + return true; + default: + return false; + } + } + + @Override + public void run() { + setRunThread(Thread.currentThread()); + try { + checkForRequestCancellation(); + switch (containerState) { + case PROCESS_FILES: + LOGGER.debug("{} - run: PROCESS FILES", ruid); + processFileTargets(); + containerState = ContainerState.WAIT; + break; + case PROCESS_DIRS: + LOGGER.debug("{} - run: PROCESS DIRS", ruid); + processDirTargets(); + containerState = ContainerState.STOP; + update(COMPLETED); + break; + case STOP: + LOGGER.debug("{} - run: prematurely stopped; exiting", ruid); + update(CANCELLED); + setRunThread(null); + return; + default: + throw new RuntimeException( + "run container called with container in wrong state " + containerState + + "; this is a bug."); + } + } catch (InterruptedException e) { + LOGGER.debug("{} - run: interrupted", ruid); + /* + * If the state has not already been set to terminal, do so. + */ + containerState = ContainerState.STOP; + update(CANCELLED); + } + setRunThread(null); + checkTransitionToDirs(); + } + + public void setDirListSemaphore(Semaphore dirListSemaphore) { + this.dirListSemaphore = dirListSemaphore; + } + + public void setInFlightSemaphore(Semaphore inFlightSemaphore) { + this.inFlightSemaphore = inFlightSemaphore; + } + + public void setListHandler(ListDirectoryHandler listHandler) { + this.listHandler = listHandler; + } + + public void setExecutor(ExecutorService executor) { + this.executor = executor; + } + + public void setNamespaceHandler(PnfsHandler pnfsHandler) { + this.pnfsHandler = pnfsHandler; + } + + public void setTargetStore(BulkTargetStore targetStore) { + this.targetStore = targetStore; + } + + public void setCallback(SignalAware callback) { + this.callback = callback; + } + + @Override + public void uncaughtException(Thread t, Throwable e) { + /* + * Won't leave the request in non-terminal state in case of uncaught exception. + * We also try to handle uncaught exceptions here, so as not to kill the + * manager thread. + */ + containerState = ContainerState.STOP; + jobTarget.setErrorObject(e); + update(FAILED); + ThreadGroup group = t.getThreadGroup(); + if (group != null) { + group.uncaughtException(t, e); + } else { + LOGGER.error("Uncaught exception: please report to team@dcache.org", e); + } + } + + public void update(State state) { + if (jobTarget.setState(state)) { + try { + targetStore.update(jobTarget.getId(), jobTarget.getState(), jobTarget.getErrorType(), + jobTarget.getErrorMessage()); + } catch (BulkStorageException e) { + LOGGER.error("{}, updateJobState: {}", ruid, e.toString()); + } + signalStateChange(); + } + } + + private void checkForRequestCancellation() throws InterruptedException { + if (isRunThreadInterrupted() || containerState == ContainerState.STOP + || jobTarget.isTerminated()) { + throw new InterruptedException(); + } + } + + private void checkTransitionToDirs() { + synchronized (running) { + if (!running.isEmpty()) { + LOGGER.debug("{} - checkTransitionToDirs, running {}", ruid, running.size()); + return; + } + } + + synchronized (this) { + if (containerState == ContainerState.WAIT) { + containerState = ContainerState.PROCESS_DIRS; + executor.submit(this); + } + } + } + + private boolean hasBeenCancelled(TargetTask task) { + synchronized (cancelledPaths) { + BulkRequestTarget target = task.target; + if (cancelledPaths.remove(target.getPath().toString())) { + target = toTarget(target.getId(), target.getPid(), target.getPath(), + Optional.of(target.getAttributes()), CANCELLED, null); + try { + if (target.getId() == null) { + targetStore.store(target); + } else { + targetStore.update(target.getId(), CANCELLED, null, null); + } + } catch (BulkServiceException | UnsupportedOperationException e) { + LOGGER.error("hasBeenCancelled {}, failed for {}: {}", ruid, target.getPath(), + e.toString()); + } + return true; + } + } + + return false; + } + + private synchronized void interruptRunThread() { + if (runThread != null) { + runThread.interrupt(); + LOGGER.debug("{} - container job interrupted.", ruid); + } + } + + private synchronized boolean isRunThreadInterrupted() { + return runThread != null && runThread.isInterrupted(); + } + + private void processDirTargets() throws InterruptedException { + if (dirs.isEmpty()) { + LOGGER.debug("{} - processDirTargets, nothing to do.", ruid); + return; + } + + LOGGER.debug("{} - processDirTargets, size {}.", ruid, dirs.size()); + + DirTarget[] sorted = dirs.toArray(new DirTarget[0]); + Arrays.sort(sorted, SORTER); + + /* + * Process serially in this thread + */ + for (DirTarget dirTarget : sorted) { + new TargetTask(toTarget(dirTarget.id, dirTarget.pid, dirTarget.path, + Optional.of(dirTarget.attributes), CREATED, null), + TaskState.HANDLE_DIR_TARGET).performSync(); + } + } + + private void processFileTargets() throws InterruptedException { + List requestTargets = targetStore.getInitialTargets(rid, true); + + LOGGER.debug("{} - processFileTargets, initial size {}.", ruid, requestTargets.size()); + + if (requestTargets.isEmpty()) { + LOGGER.error("{} - processFileTargets, no initial targets!.", ruid); + containerState = ContainerState.STOP; + update(FAILED); + return; + } + + for (BulkRequestTarget target : requestTargets) { + new TargetTask(target, TaskState.RESOLVE_PATH).submitAsync(); + } + } + + private synchronized void setRunThread(Thread runThread) { + this.runThread = runThread; + if (runThread != null) { + this.runThread.setUncaughtExceptionHandler(this); + } + } + + private void signalStateChange() { + if (callback != null) { + callback.signal(); + } + } + + private BulkRequestTarget toTarget(Long id, PID pid, FsPath path, + Optional attributes, State state, Throwable throwable) { + String errorType = null; + String errorMessage = null; + Throwable root; + if (throwable != null) { + root = Throwables.getRootCause(throwable); + errorType = root.getClass().getCanonicalName(); + errorMessage = root.getMessage(); + } + + return BulkRequestTargetBuilder.builder(statistics).attributes(attributes.orElse(null)) + .activity(activity.getName()).id(id).pid(pid).rid(rid).ruid(ruid).state(state) + .createdAt(System.currentTimeMillis()).errorType(errorType) + .errorMessage(errorMessage).path(path).build(); + } +} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java deleted file mode 100644 index 9f5909e59a8..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJob.java +++ /dev/null @@ -1,312 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.job; - -import static org.dcache.services.bulk.activity.BulkActivity.MINIMALLY_REQUIRED_ATTRIBUTES; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.FAILED; -import static org.dcache.services.bulk.util.BulkRequestTarget.State.RUNNING; -import static org.dcache.services.bulk.util.BulkRequestTarget.computeFsPath; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import diskCacheV111.util.CacheException; -import diskCacheV111.util.FsPath; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.dcache.namespace.FileType; -import org.dcache.services.bulk.BulkRequest; -import org.dcache.services.bulk.BulkServiceException; -import org.dcache.services.bulk.BulkStorageException; -import org.dcache.services.bulk.activity.BulkActivity; -import org.dcache.services.bulk.util.BatchedResult; -import org.dcache.services.bulk.util.BulkRequestTarget; -import org.dcache.services.bulk.util.BulkRequestTarget.PID; -import org.dcache.services.bulk.util.BulkRequestTarget.State; -import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.vehicles.FileAttributes; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This version of the container does no preprocessing, storing the targets as they go live. It thus - * offers a much faster pathway toward target completion with potentially greater throughput. - */ -public final class RequestContainerJob extends AbstractRequestContainerJob { - - private static final Logger LOGGER = LoggerFactory.getLogger(RequestContainerJob.class); - - private static final DirTargetSorter SORTER = new DirTargetSorter(); - - static class DirTarget { - final FsPath path; - final FileAttributes attributes; - final PID pid; - final Long id; - final int depth; - - DirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { - this.id = id; - this.pid = pid; - this.attributes = attributes; - this.path = path; - depth = (int)path.toString().chars().filter(c -> c == '/').count(); - } - } - - static class DirTargetSorter implements Comparator { - @Override - public int compare(DirTarget o1, DirTarget o2) { - return Integer.compare(o2.depth, o1.depth); /* DESCENDING ORDER */ - } - } - - private final Queue dirs; - - public RequestContainerJob(BulkActivity activity, BulkRequestTarget target, - BulkRequest request, BulkServiceStatistics statistics) { - super(activity, target, request, statistics); - dirs = new ConcurrentLinkedQueue<>(); - } - - @Override - protected void processFileTargets() throws InterruptedException { - List requestTargets = getInitialTargets(); - - if (requestTargets.isEmpty()) { - containerState = ContainerState.STOP; - update(FAILED); - return; - } - - for (BulkRequestTarget tgt : requestTargets) { - checkForRequestCancellation(); - Long id = tgt.getId(); - try { - FsPath path = resolvePath(tgt.getPath().toString()); - if (targetPrefix != null && !path.contains(targetPrefix)) { - path = computeFsPath(targetPrefix, tgt.getPath().toString()); - } - - switch (depth) { - case NONE: - perform(id, PID.INITIAL, path, null); - break; - default: - handleTarget(id, PID.INITIAL, path); - } - } catch (CacheException e) { - LOGGER.error("problem handling target {}: {}.", tgt, e.toString()); - tgt.setState(FAILED); - tgt.setErrorObject(e); - try { - targetStore.storeOrUpdate(tgt); - } catch (BulkStorageException ex) { - LOGGER.error("processFileTargets {}, path {}, could not store, error {}.", ruid, - tgt.getPath(), - ex.getMessage()); - } - } - } - } - - @Override - protected void processDirTargets() throws InterruptedException { - DirTarget[] sorted = dirs.toArray(new DirTarget[0]); - Arrays.sort(sorted, SORTER); - for (DirTarget dirTarget : sorted) { - checkForRequestCancellation(); - perform(dirTarget.id, dirTarget.pid, dirTarget.path, dirTarget.attributes); - } - } - - @Override - protected void handleDirTarget(Long id, PID pid, FsPath path, FileAttributes attributes) { - dirs.add(new DirTarget(id, pid, path, attributes)); - } - - @Override - protected void handleFileTarget(PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException { - perform(null, pid, path, attributes); - } - - @Override - protected void retryFailed(BatchedResult result, FileAttributes attributes) - throws BulkStorageException { - BulkRequestTarget completedTarget = result.getTarget(); - Long id = completedTarget.getId(); - FsPath path = completedTarget.getPath(); - PID pid = completedTarget.getPid(); - completedTarget.resetToReady(); - try { - perform(id, pid, path, attributes); - } catch (InterruptedException e) { - LOGGER.debug("{}. retryFailed interrupted", ruid); - targetStore.update(result.getTarget().getId(), FAILED, - InterruptedException.class.getCanonicalName(), - "retryFailed interrupted for " + ruid); - } - } - - private void handleCompletion(BatchedResult result, FileAttributes attributes) { - activity.handleCompletion(result); - - BulkRequestTarget completedTarget = result.getTarget(); - State state = completedTarget.getState(); - - try { - if (state == FAILED && activity.getRetryPolicy().shouldRetry(completedTarget)) { - retryFailed(result, attributes); - return; - } - - targetStore.update(completedTarget.getId(), state, completedTarget.getErrorType(), - completedTarget.getErrorMessage()); - } catch (BulkStorageException e) { - LOGGER.error("{} could not store target from result: {}, {}: {}.", ruid, result, - attributes, e.toString()); - } - - removeTarget(completedTarget); /* RELEASES SEMAPHORE */ - - if (state == FAILED && request.isCancelOnFailure()) { - cancel(); - } - } - - private void handleTarget(Long id, PID pid, FsPath path) throws InterruptedException { - checkForRequestCancellation(); - FileAttributes attributes = null; - LOGGER.debug("handleTarget {}, path {}.", ruid, path); - try { - attributes = pnfsHandler.getFileAttributes(path, MINIMALLY_REQUIRED_ATTRIBUTES); - if (attributes.getFileType() == FileType.DIR) { - expandDepthFirst(id, pid, path, attributes); - } else if (attributes.getFileType() != FileType.SPECIAL) { - perform(id, pid, path, attributes); - } - } catch (BulkServiceException | CacheException e) { - LOGGER.error("handleTarget {}, path {}, error {}.", ruid, path, e.getMessage()); - register(id, pid, path, Futures.immediateFailedFuture(e), attributes, e); - } - } - - private ListenableFuture perform(Long id, PID pid, FsPath path, FileAttributes attributes) - throws InterruptedException { - checkForRequestCancellation(); - - if (hasBeenCancelled(id, pid, path, attributes)) { - return Futures.immediateCancelledFuture(); - } - - semaphore.acquire(); - - ListenableFuture future; - try { - future = activity.perform(ruid, id == null ? this.id.getAndIncrement() : id, path, - attributes); - } catch (BulkServiceException | UnsupportedOperationException e) { - LOGGER.error("{}, perform failed for {}: {}", ruid, path, e.getMessage()); - future = Futures.immediateFailedFuture(e); - register(id, pid, path, future, attributes, e); - return future; - } - - register(id, pid, path, future, attributes, null); - return future; - } - - private void register(Long id, PID pid, FsPath path, ListenableFuture future, FileAttributes attributes, - Throwable error) throws InterruptedException { - checkForRequestCancellation(); - - if (hasBeenCancelled(id, pid, path, attributes)) { - return; - } - - BulkRequestTarget target = toTarget(id, pid, path, Optional.ofNullable(attributes), - error == null ? RUNNING : FAILED, error); - - BatchedResult result = new BatchedResult(target, future); - - if (error == null) { - try { - /* - * If this is an insert (id == null), the target id will be updated to what is - * returned from the database. - */ - targetStore.storeOrUpdate(target); - } catch (BulkStorageException e) { - LOGGER.error("{}, could not store target from result {}, {}, {}: {}.", ruid, result, - attributes, e.toString()); - } - } - - synchronized (waiting) { - waiting.put(path, result); - future.addListener(() -> handleCompletion(result, attributes), - activity.getCallbackExecutor()); - } - } -} \ No newline at end of file diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java index 12b85f7609a..28a997e5160 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/job/RequestContainerJobFactory.java @@ -64,6 +64,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import diskCacheV111.util.PnfsHandler; import java.util.Optional; +import java.util.concurrent.Semaphore; import javax.security.auth.Subject; import org.dcache.auth.attributes.Restriction; import org.dcache.cells.CellStub; @@ -79,7 +80,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.util.BulkRequestTarget.PID; import org.dcache.services.bulk.util.BulkRequestTargetBuilder; import org.dcache.services.bulk.util.BulkServiceStatistics; -import org.dcache.util.BoundedExecutor; import org.dcache.util.list.ListDirectoryHandler; import org.dcache.vehicles.FileAttributes; import org.slf4j.Logger; @@ -100,9 +100,10 @@ public final class RequestContainerJobFactory { private ListDirectoryHandler listHandler; private BulkTargetStore targetStore; private BulkServiceStatistics statistics; - private BoundedExecutor dirListExecutor; + private Semaphore dirListSemaphore; + private Semaphore inFlightSemaphore; - public AbstractRequestContainerJob createRequestJob(BulkRequest request) + public BulkRequestContainerJob createRequestJob(BulkRequest request) throws BulkServiceException { String rid = request.getUid(); LOGGER.trace("createRequestJob {}", rid); @@ -124,29 +125,43 @@ public AbstractRequestContainerJob createRequestJob(BulkRequest request) pnfsHandler.setSubject(activity.getSubject()); LOGGER.trace("createRequestJob {}, creating batch request job.", request.getUid()); - AbstractRequestContainerJob containerJob - = new RequestContainerJob(activity, target, request, statistics); + BulkRequestContainerJob containerJob + = new BulkRequestContainerJob(activity, target, request, statistics); containerJob.setNamespaceHandler(pnfsHandler); containerJob.setTargetStore(targetStore); containerJob.setListHandler(listHandler); - containerJob.setDirListExecutor(dirListExecutor); + containerJob.setDirListSemaphore(dirListSemaphore); + containerJob.setInFlightSemaphore(inFlightSemaphore); containerJob.initialize(); return containerJob; } + public int getDirListSemaphoreAvailable() { + return dirListSemaphore.availablePermits(); + } + + public int getInFlightSemaphoreAvailable() { + return inFlightSemaphore.availablePermits(); + } + @Required public void setActivityFactory(BulkActivityFactory activityFactory) { this.activityFactory = activityFactory; } @Required - public void setDirListExecutor(BoundedExecutor dirListExecutor) { - this.dirListExecutor = dirListExecutor; + public void setListHandler(ListDirectoryHandler listHandler) { + this.listHandler = listHandler; } @Required - public void setListHandler(ListDirectoryHandler listHandler) { - this.listHandler = listHandler; + public void setDirListSemaphore(int permits) { + dirListSemaphore = new Semaphore(permits); + } + + @Required + public void setInFlightSemaphore(int permits) { + inFlightSemaphore = new Semaphore(permits); } @Required diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java index 6bdf4fa40c8..fae5b4acf59 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/BulkRequestManager.java @@ -60,7 +60,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING package org.dcache.services.bulk.manager; import java.util.List; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.util.BulkRequestTarget; import org.dcache.util.SignalAware; @@ -104,7 +104,7 @@ public interface BulkRequestManager extends SignalAware { * * @param job to be submitted to the queue. */ - void submit(AbstractRequestContainerJob job); + void submit(BulkRequestContainerJob job); /** * @param maxActiveRequests max number of requests that can be in the active state at a given diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java index 168b979d7da..7179b18d0df 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/manager/ConcurrentRequestManager.java @@ -72,6 +72,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -83,7 +84,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.services.bulk.BulkStorageException; import org.dcache.services.bulk.handler.BulkRequestCompletionHandler; import org.dcache.services.bulk.handler.BulkSubmissionHandler; -import org.dcache.services.bulk.job.AbstractRequestContainerJob; +import org.dcache.services.bulk.job.BulkRequestContainerJob; import org.dcache.services.bulk.manager.scheduler.BulkRequestScheduler; import org.dcache.services.bulk.manager.scheduler.BulkSchedulerProvider; import org.dcache.services.bulk.store.BulkRequestStore; @@ -168,15 +169,15 @@ private void await() throws InterruptedException { private void broadcastTargetCancel() { List toCancel; - synchronized (cancelledJobs) { - toCancel = cancelledJobs.stream().collect(Collectors.toList()); - cancelledJobs.clear(); + synchronized (cancelledTargets) { + toCancel = cancelledTargets.stream().collect(Collectors.toList()); + cancelledTargets.clear(); } synchronized (requestJobs) { toCancel.forEach(key -> { String[] ridId = BulkRequestTarget.parse(key); - AbstractRequestContainerJob job = requestJobs.get(ridId[0]); + BulkRequestContainerJob job = requestJobs.get(ridId[0]); if (job != null) { long id = Long.valueOf(ridId[1]); job.cancel(id); @@ -216,7 +217,7 @@ private void doRun() throws InterruptedException { @GuardedBy("requestJobs") private boolean isTerminated(String rid) { - AbstractRequestContainerJob job = requestJobs.get(rid); + BulkRequestContainerJob job = requestJobs.get(rid); if (job != null && job.getTarget().getState() == State.CANCELLED) { return completionHandler.checkTerminated(rid, true); } @@ -245,7 +246,7 @@ private void processNextRequests() { * immediately started. */ synchronized (requestJobs) { - requestJobs.values().stream().filter(AbstractRequestContainerJob::isReady) + requestJobs.values().stream().filter(BulkRequestContainerJob::isReady) .forEach(ConcurrentRequestManager.this::startJob); } } @@ -308,6 +309,11 @@ private ListMultimap userRequests() { */ private ExecutorService processorExecutorService; + /** + * Thread dedicated to jobs. + */ + private ExecutorService pooledExecutorService; + /** * Records number of jobs and requests processed. */ @@ -327,12 +333,12 @@ private ListMultimap userRequests() { /** * Held for the lifetime of the container run. */ - private Map requestJobs; + private Map requestJobs; /** * Ids of jobs cancelled individually. To avoid doing cancellation on the calling thread. */ - private Collection cancelledJobs; + private Collection cancelledTargets; /** * Handles the promotion of jobs to running. @@ -347,16 +353,18 @@ private ListMultimap userRequests() { @Override public void initialize() throws Exception { requestJobs = new LinkedHashMap<>(); - cancelledJobs = new HashSet<>(); + cancelledTargets = new HashSet<>(); schedulerProvider.initialize(); processor = new ConcurrentRequestProcessor(schedulerProvider.getRequestScheduler()); + processorExecutorService = Executors.newSingleThreadScheduledExecutor(); + pooledExecutorService = Executors.newCachedThreadPool(); processorFuture = processorExecutorService.submit(processor); } @Override public void cancel(BulkRequestTarget target) { - synchronized (cancelledJobs) { - cancelledJobs.add(target.getKey()); + synchronized (cancelledTargets) { + cancelledTargets.add(target.getKey()); } processor.signal(); } @@ -364,7 +372,7 @@ public void cancel(BulkRequestTarget target) { @Override public boolean cancelRequest(String requestId) { synchronized (requestJobs) { - AbstractRequestContainerJob job = requestJobs.get(requestId); + BulkRequestContainerJob job = requestJobs.get(requestId); if (job != null) { job.cancel(); processor.signal(); @@ -380,7 +388,7 @@ public boolean cancelRequest(String requestId) { @Override public void cancelTargets(String id, List targetPaths) { synchronized (requestJobs) { - AbstractRequestContainerJob job = requestJobs.get(id); + BulkRequestContainerJob job = requestJobs.get(id); if (job != null) { targetPaths.forEach(job::cancel); LOGGER.trace("{} request targets cancelled for {}.", targetPaths.size(), id); @@ -398,7 +406,7 @@ public void shutdown() throws Exception { processorFuture.cancel(true); } requestJobs = null; - cancelledJobs = null; + cancelledTargets = null; requestStore.clearCache(); } @@ -427,11 +435,6 @@ public void setMaxActiveRequests(int maxActiveRequests) { this.maxActiveRequests = maxActiveRequests; } - @Required - public void setProcessorExecutorService(ExecutorService processorExecutorService) { - this.processorExecutorService = processorExecutorService; - } - @Required public void setRequestStore(BulkRequestStore requestStore) { this.requestStore = requestStore; @@ -474,7 +477,7 @@ public void signal() { } @Override - public void submit(AbstractRequestContainerJob job) { + public void submit(BulkRequestContainerJob job) { synchronized (requestJobs) { requestJobs.put(job.getTarget().getRuid(), job); } @@ -511,16 +514,16 @@ void activateRequest(BulkRequest request) { } } - void startJob(AbstractRequestContainerJob job) { + void startJob(BulkRequestContainerJob job) { String key = job.getTarget().getKey(); - long id = job.getTarget().getId(); LOGGER.trace("submitting job {} to executor, target {}.", key, job.getTarget()); + job.setExecutor(pooledExecutorService); job.setCallback(this); try { if (isJobValid(job)) { /* possibly cancelled in flight */ job.update(State.RUNNING); - job.getActivity().getActivityExecutor().submit(new FireAndForgetTask(job)); + pooledExecutorService.submit(new FireAndForgetTask(job)); } } catch (RuntimeException e) { job.getTarget().setErrorObject(e); @@ -535,7 +538,7 @@ void startJob(AbstractRequestContainerJob job) { * This is here mostly in order to catch jobs which have changed state on the fly * during cancellation. It is only called by the processor thread (when it invokes startJob). */ - private boolean isJobValid(AbstractRequestContainerJob job) { + private boolean isJobValid(BulkRequestContainerJob job) { BulkRequestTarget target = job.getTarget(); if (target.isTerminated()) { diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java deleted file mode 100644 index fc3cf9fb385..00000000000 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BatchedResult.java +++ /dev/null @@ -1,91 +0,0 @@ -/* -COPYRIGHT STATUS: -Dec 1st 2001, Fermi National Accelerator Laboratory (FNAL) documents and -software are sponsored by the U.S. Department of Energy under Contract No. -DE-AC02-76CH03000. Therefore, the U.S. Government retains a world-wide -non-exclusive, royalty-free license to publish or reproduce these documents -and software for U.S. Government purposes. All documents and software -available from this server are protected under the U.S. and Foreign -Copyright Laws, and FNAL reserves all rights. - -Distribution of the software available from this server is free of -charge subject to the user following the terms of the Fermitools -Software Legal Information. - -Redistribution and/or modification of the software shall be accompanied -by the Fermitools Software Legal Information (including the copyright -notice). - -The user is asked to feed back problems, benefits, and/or suggestions -about the software to the Fermilab Software Providers. - -Neither the name of Fermilab, the URA, nor the names of the contributors -may be used to endorse or promote products derived from this software -without specific prior written permission. - -DISCLAIMER OF LIABILITY (BSD): - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS -FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL FERMILAB, -OR THE URA, OR THE U.S. DEPARTMENT of ENERGY, OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -Liabilities of the Government: - -This software is provided by URA, independent from its Prime Contract -with the U.S. Department of Energy. URA is acting independently from -the Government and in its own private capacity and is not acting on -behalf of the U.S. Government, nor as its contractor nor its agent. -Correspondingly, it is understood and agreed that the U.S. Government -has no connection to this software and in no manner whatsoever shall -be liable for nor assume any responsibility or obligation for any claim, -cost, or damages arising out of or resulting from the use of the software -available from this server. - -Export Control: - -All documents and software available from this server are subject to U.S. -export control laws. Anyone downloading information from this server is -obligated to secure any necessary Government licenses before exporting -documents or software obtained from this server. - */ -package org.dcache.services.bulk.util; - -import com.google.common.util.concurrent.ListenableFuture; -import org.dcache.services.bulk.activity.BulkActivity; - -/** - * In-memory placeholder for results of batched operations which associates the target with the - * activity future. - */ -public final class BatchedResult { - - private final BulkRequestTarget target; - private final ListenableFuture future; - - public BatchedResult(BulkRequestTarget target, ListenableFuture future) { - this.future = future; - this.target = target; - } - - public void cancel(BulkActivity activity) { - future.cancel(true); - activity.cancel(target); - } - - public ListenableFuture getFuture() { - return future; - } - - public BulkRequestTarget getTarget() { - return target; - } -} diff --git a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java index 09e94997b5e..461c1173777 100644 --- a/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java +++ b/modules/dcache-bulk/src/main/java/org/dcache/services/bulk/util/BulkServiceStatistics.java @@ -78,6 +78,8 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.dcache.services.bulk.job.RequestContainerJobFactory; +import org.springframework.beans.factory.annotation.Required; /** * Provides activity statistics via the CellInfo interface. @@ -105,6 +107,7 @@ public static Long getTimestamp(String datetime) throws ParseException { private static final String LAST_SWEEP = "Last job sweep at %s"; private static final String LAST_SWEEP_DURATION = "Last job sweep took %s seconds"; private static final String STATS_FORMAT = "%-20s : %10s"; + private static final String CONCURRENCY_FORMAT = "%-45s : %10s"; private final Date started = new Date(); @@ -119,6 +122,8 @@ public static Long getTimestamp(String datetime) throws ParseException { FAILED.name(), new AtomicLong(0L), SKIPPED.name(), new AtomicLong(0L)); + + private RequestContainerJobFactory factory; private long lastSweep = started.getTime(); private long lastSweepDuration = 0; @@ -172,6 +177,12 @@ public void getInfo(PrintWriter pw) { pw.println("---------------- REQUESTS (current) -----------------"); pw.println(String.format(STATS_FORMAT, "Active", activeRequests.get())); pw.println(); + + pw.println(String.format(CONCURRENCY_FORMAT, "Available permits for directory listing", + factory.getDirListSemaphoreAvailable())); + pw.println(String.format(CONCURRENCY_FORMAT, "Available permits for in-flight targets", + factory.getInFlightSemaphoreAvailable())); + pw.println(); } public String getOwnerCounts() { @@ -216,6 +227,11 @@ public void setActive(int count) { activeRequests.set(count); } + @Required + public void setRequestContainerJobFactory(RequestContainerJobFactory factory) { + this.factory = factory; + } + public void sweepFinished(long duration) { lastSweep = System.currentTimeMillis(); lastSweepDuration = duration; diff --git a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml index 08076c1e0d7..097bea036be 100644 --- a/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml +++ b/modules/dcache-bulk/src/main/resources/org/dcache/services/bulk/bulk.xml @@ -76,15 +76,6 @@ - - Used to execute the future callbacks to jobs which send and wait for replies. - - - - - - - Used to cancel requests. @@ -94,26 +85,6 @@ - - - - - - - - - - - - - - - - - - - - Encapsulates the bulk database connection pool and properties. @@ -215,39 +186,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -269,11 +207,13 @@ - + + Tracks request and target states (counts), sweeper state, etc. + @@ -302,7 +242,6 @@ - @@ -336,7 +275,6 @@ - @@ -346,11 +284,7 @@ - - - - diff --git a/skel/share/defaults/bulk.properties b/skel/share/defaults/bulk.properties index e1f09cd01b8..8a2ef72d8c6 100644 --- a/skel/share/defaults/bulk.properties +++ b/skel/share/defaults/bulk.properties @@ -64,16 +64,12 @@ bulk.request-scheduler=org.dcache.services.bulk.manager.scheduler.LeastRecentFir # adjusted via an admin command to oversubscribe to # this preset number, which may allow for some speed up # if high latency requests such as pinning predominate. -# - activity-callback threads: for running the completion handling on activities # - incoming-request threads: for handling requests received on the message queue # - cancellation threads: for handling cancellation requests # -bulk.limits.container-processing-threads=200 -bulk.limits.activity-callback-threads=50 +bulk.limits.container-processing-threads=100 bulk.limits.incoming-request-threads=10 -bulk.limits.cancellation-threads=20 -bulk.limits.dir-list-threads=20 -(deprecated)bulk.limits.delay-clear-threads= +bulk.limits.cancellation-threads=10 # ---- Expiration of the cache serving to front the request storage. # @@ -104,6 +100,15 @@ bulk.limits.max.targets-per-shallow-request=10 # bulk.limits.max.targets-per-recursive-request=1 +# ---- The maximum number of directory listings that can occur concurrently. +# This is necessary to avoid timing out calls to the PnfsManager. +# +bulk.limits.dir-list-semaphore=20 + +# ---- The maximum number of target tasks that can occur concurrently. +# +bulk.limits.in-flight-semaphore=2000 + # ---- Interval of inactivity by the request manager consumer if not signalled # internally (as for instance when a request job completes). The consumer checks # for request readiness and completion. @@ -197,22 +202,8 @@ bulk.db.url=jdbc:postgresql://${bulk.db.host}/${bulk.db.name}?targetServerType=m bulk.db.fetch-size=1000 # ---- Activity plugin properties -# -# Max permits is the number permits for the semaphore used by the request container to -# perform the activity on individual targets. This number is on a container-by-container -# basis; for instance, if the bulk.limits.container-processing-threads is set to 100, -# this means that the number of concurrent requests to pin manager, e.g., would be 100000. -# These numbers should be adjusted up or down depending on concurrency requirements -# so as not to generate DOS attacks on the other dCache services such as PnfsManager, -# PinManager or the QoSEngine. -# -bulk.plugin!delete.max-permits=100 -bulk.plugin!pin.max-permits=1000 -bulk.plugin!stage.max-permits=1000 -bulk.plugin!unpin.max-permits=1000 -bulk.plugin!release.max-permits=1000 -bulk.plugin!update-qos.max-permits=1000 -bulk.plugin!log-target.max-permits=100 + +bulk.plugin!delete.default-batch-size=100 # ---- Algorithm for determining what action to take on job failures. # @@ -257,3 +248,13 @@ bulk.qos-transition-topic=${dcache.qos.transition-topic} (deprecated)bulk.service.ping=no longer used (deprecated)bulk.service.ping.timeout=no longer used (deprecated)bulk.service.ping.timeout.unit=no longer used +(deprecated)bulk.limits.dir-list-threads=use bulk.limits.dir-list-semaphore +(deprecated)bulk.limits.delay-clear-threads=no longer used +(deprecated)bulk.limits.activity-callback-threads=no longer used +(deprecated)bulk.plugin!delete.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!pin.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!stage.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!unpin.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!release.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!update-qos.max-permits=no longer used; see bulk.limits.in-flight-semaphore +(deprecated)bulk.plugin!log-target.max-permits=no longer used; see bulk.limits.in-flight-semaphore \ No newline at end of file diff --git a/skel/share/services/bulk.batch b/skel/share/services/bulk.batch index 2acaa4ba60b..edc406269df 100644 --- a/skel/share/services/bulk.batch +++ b/skel/share/services/bulk.batch @@ -8,10 +8,8 @@ check -strong bulk.cell.subscribe check -strong bulk.allowed-directory-expansion check -strong bulk.request-scheduler check -strong bulk.limits.container-processing-threads -check -strong bulk.limits.activity-callback-threads check -strong bulk.limits.incoming-request-threads check -strong bulk.limits.cancellation-threads -check -strong bulk.limits.dir-list-threads check -strong bulk.limits.request-cache-expiration check -strong bulk.limits.request-cache-expiration.unit check -strong bulk.limits.max-requests-per-user @@ -26,6 +24,8 @@ check -strong bulk.limits.archiver-period check -strong bulk.limits.archiver-period.unit check -strong bulk.limits.archiver-window check -strong bulk.limits.archiver-window.unit +check -strong bulk.limits.dir-list-semaphore +check -strong bulk.limits.in-flight-semaphore check -strong bulk.service.pnfsmanager check -strong bulk.service.pnfsmanager.timeout check -strong bulk.service.pnfsmanager.timeout.unit @@ -51,17 +51,10 @@ check -strong bulk.db.password check -strong bulk.db.password.file check -strong bulk.db.schema.auto check -strong bulk.db.fetch-size -check -strong bulk.plugin!delete.max-permits -check -strong bulk.plugin!pin.max-permits check -strong bulk.plugin!pin.default-lifetime check -strong bulk.plugin!pin.default-lifetime.unit -check -strong bulk.plugin!stage.max-permits check -strong bulk.plugin!stage.default-lifetime check -strong bulk.plugin!stage.default-lifetime.unit -check -strong bulk.plugin!unpin.max-permits -check -strong bulk.plugin!release.max-permits -check -strong bulk.plugin!update-qos.max-permits -check -strong bulk.plugin!log-target.max-permits check -strong bulk.plugin!delete.retry-policy check -strong bulk.plugin!pin.retry-policy check -strong bulk.plugin!stage.retry-policy