Skip to content

Commit

Permalink
Merge pull request #655 from xenon-middleware/named-pool-threads
Browse files Browse the repository at this point in the history
Named pool threads
  • Loading branch information
sverhoeven authored Jul 30, 2019
2 parents 9053314 + 66448f1 commit 292ff24
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 432 deletions.
2 changes: 1 addition & 1 deletion gradle/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defaultTasks 'clean', 'assemble', 'test'

// METADATA
// ==============
version = '3.0.0'
version = '3.0.1'
description = 'Xenon: a middleware abstraction library that provides a simple programming interface to various compute and storage resources.'

// will generate a warning with JDK 8, since the runtime jar (rt.jar) of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
Expand All @@ -41,21 +40,10 @@
import nl.esciencecenter.xenon.schedulers.QueueStatus;
import nl.esciencecenter.xenon.schedulers.Scheduler;
import nl.esciencecenter.xenon.schedulers.Streams;
import nl.esciencecenter.xenon.utils.DaemonThreadFactory;

public class JobQueueScheduler extends Scheduler {

/**
* Simple thread factory which returns daemon threads instead of normal threads
*
*/
private class DaemonThreadFactory implements ThreadFactory {
public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
return thread;
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(JobQueueScheduler.class);

private static final String SINGLE_QUEUE_NAME = "single";
Expand Down Expand Up @@ -125,11 +113,9 @@ public JobQueueScheduler(String uniqueID, String adaptorName, String location, C
throw new BadParameterException(adaptorName, "Polling delay must be between " + MIN_POLLING_DELAY + " and " + MAX_POLLING_DELAY + "!");
}

ThreadFactory threadFactory = new DaemonThreadFactory();

unlimitedExecutor = Executors.newCachedThreadPool(threadFactory);
singleExecutor = Executors.newSingleThreadExecutor(threadFactory);
multiExecutor = Executors.newFixedThreadPool(multiQThreads, threadFactory);
unlimitedExecutor = Executors.newCachedThreadPool(new DaemonThreadFactory("JobExecutorThread." + uniqueID + "." + UNLIMITED_QUEUE_NAME));
singleExecutor = Executors.newSingleThreadExecutor(new DaemonThreadFactory("JobExecutorThread." + uniqueID + "." + SINGLE_QUEUE_NAME));
multiExecutor = Executors.newFixedThreadPool(multiQThreads, new DaemonThreadFactory("JobExecutorThread." + uniqueID + "." + MULTI_QUEUE_NAME));
}

public long getCurrentJobID() {
Expand Down Expand Up @@ -471,18 +457,15 @@ public QueueStatus[] getQueueStatuses(String... queueNames) throws XenonExceptio
return result;
}

public void end() {
singleExecutor.shutdownNow();
multiExecutor.shutdownNow();
unlimitedExecutor.shutdownNow();
}

public FileSystem getFileSystem() throws XenonException {
return filesystem;
}

@Override
public void close() throws XenonException {
singleExecutor.shutdownNow();
multiExecutor.shutdownNow();
unlimitedExecutor.shutdownNow();
factory.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.io.IOException;

import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.session.ClientSession;
import org.apache.sshd.client.subsystem.sftp.SftpClient;
import org.apache.sshd.client.subsystem.sftp.SftpClientFactory;
Expand All @@ -25,14 +26,16 @@

public class SSHConnection implements AutoCloseable {

private ClientSession[] sessions;
private Tunnel[] tunnels;
private final SshClient client;
private final ClientSession[] sessions;
private final Tunnel[] tunnels;
private final int hops;
private boolean closed = false;

private ClientSession session;

protected SSHConnection(int hops) {
protected SSHConnection(SshClient client, int hops) {
this.client = client;
this.hops = hops;
sessions = new ClientSession[hops];
tunnels = new Tunnel[hops];
Expand Down Expand Up @@ -72,29 +75,33 @@ public void close() {

closed = true;

if (session != null) {
try {
session.close();
} catch (Exception e) {
// ignored?
}
}

for (int i = hops - 1; i >= 0; i--) {
if (tunnels[i] != null) {
try {
if (session != null) {
try {
tunnels[i].close();
session.close();
} catch (Exception e) {
// ignored?
}
}
if (sessions[i] != null) {
try {
sessions[i].close();
} catch (Exception e) {
// ignored?

for (int i = hops - 1; i >= 0; i--) {
if (tunnels[i] != null) {
try {
tunnels[i].close();
} catch (Exception e) {
// ignored?
}
}
if (sessions[i] != null) {
try {
sessions[i].close();
} catch (Exception e) {
// ignored?
}
}
}
} finally {
client.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Map;
import java.util.Set;

import com.google.common.net.HostAndPort;
import org.apache.sshd.agent.local.ProxyAgentFactory;
import org.apache.sshd.client.SshClient;
import org.apache.sshd.client.channel.ChannelDirectTcpip;
Expand All @@ -52,6 +51,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.net.HostAndPort;

import nl.esciencecenter.xenon.InvalidCredentialException;
import nl.esciencecenter.xenon.InvalidLocationException;
import nl.esciencecenter.xenon.XenonException;
Expand Down Expand Up @@ -480,7 +481,7 @@ public static SSHConnection connect(String adaptorName, SshClient client, String
SshdSocketAddress[] locations = extractLocations(adaptorName, location);
UserCredential[] creds = extractCredentials(adaptorName, locations, credential);

SSHConnection connection = new SSHConnection(locations.length - 1);
SSHConnection connection = new SSHConnection(client, locations.length - 1);

// Connect to the last location. This is either the destination (without tunneling) or the first hop.
ClientSession session = connectAndAuthenticate(adaptorName, client, locations[0].getHostName(), locations[0].getPort(), creds[0], timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -46,6 +45,7 @@
import nl.esciencecenter.xenon.adaptors.filesystems.FileAdaptor;
import nl.esciencecenter.xenon.credentials.Credential;
import nl.esciencecenter.xenon.credentials.DefaultCredential;
import nl.esciencecenter.xenon.utils.DaemonThreadFactory;

/**
* FileSystem represent a (possibly remote) file system that can be used to access data.
Expand Down Expand Up @@ -419,14 +419,7 @@ protected FileSystem(String uniqueID, String adaptor, String location, Credentia
this.workingDirectory = workDirectory;
this.properties = properties;
this.bufferSize = bufferSize;

ThreadFactory f = r -> {
Thread t = new Thread(r, "CopyThread-" + adaptor + "-" + uniqueID);
t.setDaemon(true);
return t;
};

this.pool = Executors.newFixedThreadPool(1, f);
this.pool = Executors.newFixedThreadPool(1, new DaemonThreadFactory("CopyThread." + uniqueID));
}

protected int getBufferSize() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2013 Netherlands eScience Center
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package nl.esciencecenter.xenon.utils;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class DaemonThreadFactory implements ThreadFactory {

private final String name;
private int count = 0;

public DaemonThreadFactory(String name) {
this.name = name;
}

private synchronized int getCount() {
return count++;
}

public Thread newThread(Runnable runnable) {
Thread thread = Executors.defaultThreadFactory().newThread(runnable);
thread.setDaemon(true);
thread.setName(name + "-" + getCount());
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ public class MockSSHConnection extends SSHConnection {
MockSftpClient client;

protected MockSSHConnection(MockSftpClient client) {
super(0);
super(null, 0);
this.client = client;
}

public SftpClient createSftpClient() throws IOException {
return client;
}

@Override
public void close() {
// do nothing
}
}
Loading

0 comments on commit 292ff24

Please sign in to comment.