Skip to content
This repository has been archived by the owner on Sep 1, 2023. It is now read-only.

Commit

Permalink
0.0.6: Various bugfixes
Browse files Browse the repository at this point in the history
- Fixed a bug where server would keep files open after opening job outputs,
  which would result in a gradual resource leak
- Fixed the job queue such that it dequeues as many jobs as possible (below
  the configuration-defined upper limit) once a running job finishes (previously:
  advancing only happened on submission, which was a bug).
  • Loading branch information
Adam Kewley committed Jan 9, 2018
2 parents f83d065 + 44bc6ce commit b3d442f
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.github.jobson</groupId>
<artifactId>jobson</artifactId>
<version>0.0.5</version>
<version>0.0.6</version>
<name>jobson</name>
<description>
A web server that can turn command-line applications into a job system.
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/com/github/jobson/Helpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.*;
import java.util.stream.Collector;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ private static AuthenticationConfig loadAuthenticationConfig(
}
}


private final AuthenticationConfig loadedConfig;
private final String className;
private final Optional<String> classPath;
private final Optional<JsonNode> properties;


public CustomAuthenticatorConfig(String className) {
Expand All @@ -110,21 +111,19 @@ public CustomAuthenticatorConfig(
@JsonProperty("className") String className,
@JsonProperty("classPath") Optional<String> classPath,
@JsonProperty("properties") Optional<JsonNode> properties) {
this.className = className;
this.classPath = classPath;
this.properties = properties;
}

@Override
public AuthFilter<?, Principal> createAuthFilter(AuthenticationBootstrap bootstrap) {
final ClassLoader classLoader = getClassLoader(classPath);
final Class<?> klass = loadClass(classLoader, className);
final Class<AuthenticationConfig> authConfigClass = toAuthConfigClass(klass);

this.loadedConfig = loadAuthenticationConfig(properties, authConfigClass);
}


public AuthenticationConfig getLoadedConfig() {
return loadedConfig;
}
final AuthenticationConfig loadedConfig = loadAuthenticationConfig(properties, authConfigClass);

@Override
public AuthFilter<?, Principal> createAuthFilter(AuthenticationBootstrap bootstrap) {
return loadedConfig.createAuthFilter(bootstrap);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ private void createIfDoesNotExist(Path p) {

private void writeJobOutputToDisk(JobOutput jobOutput, Path outputPath) {
try {
IOUtils.copy(jobOutput.getData().getData(), new FileOutputStream(outputPath.toFile(), false));
IOUtils.copyLarge(jobOutput.getData().getData(), new FileOutputStream(outputPath.toFile(), false));
jobOutput.getData().getData().close();
} catch (IOException ex) {
throw new RuntimeException(outputPath + ": cannot write: " + ex);
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/com/github/jobson/jobs/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ public Pair<JobId, CancelablePromise<FinalizedJob>> submit(ValidJobRequest valid
}

private void tryAdvancingJobQueue() {
if (executingJobs.size() >= maxRunningJobs) return;
while (jobQueue.size() > 0 && executingJobs.size() < maxRunningJobs)
advanceJobQueue();
}

private void advanceJobQueue() {
final QueuedJob queuedJob = jobQueue.poll();

if (queuedJob == null) return;
Expand Down Expand Up @@ -192,6 +195,7 @@ private void onExecutionFinished(ExecutingJob executingJob, JobExecutionResult j
FinalizedJob.fromExecutingJob(executingJob, jobExecutionResult.getFinalStatus());

executingJob.getCompletionPromise().complete(finalizedJob);
tryAdvancingJobQueue();
}

public Map<String, HealthCheck> getHealthChecks() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ private Response generateBinaryDataResponse(JobId jobId, Optional<BinaryData> ma
if (maybeBinaryData.isPresent()) {
final BinaryData binaryData = maybeBinaryData.get();

final StreamingOutput body = outputStream -> IOUtils.copy(binaryData.getData(), outputStream);
final StreamingOutput body = outputStream -> {
IOUtils.copyLarge(binaryData.getData(), outputStream);
binaryData.getData().close();
};

final Response.ResponseBuilder b =
Response.ok(body, binaryData.getMimeType())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,18 +49,18 @@ public ObservableSocket(String name, Observable<T> events) {

this.eventsSubscription = events.subscribe(
this::onMessage,
this::onStderrError,
this::onStderrClosed);
this::onObservableError,
this::onObservableClosed);
}

protected abstract void onMessage(T messageData) throws IOException;

private void onStderrError(Throwable ex) {
private void onObservableError(Throwable ex) {
log.debug("Closing websocket because an error was thrown by the observable. Error: " + ex);
this.session.close(SERVER_UNEXPECTED_CONDITION_STATUS, "Internal server error");
}

private void onStderrClosed() {
private void onObservableClosed() {
log.debug("Closing websocket because observable closed");
this.session.close(NORMAL_SOCKET_CLOSURE_STATUS, "Sever event stream ended");
}
Expand All @@ -74,4 +75,11 @@ public void onWebSocketConnect(Session session) {
public void onWebSocketClose(Session session, int closeCode, String closeReason) {
this.eventsSubscription.dispose();
}

@OnWebSocketError
public void onWebSocketError(Session session, Throwable ex) {
log.error(ex.getMessage());
this.eventsSubscription.dispose();
session.close();
}
}
1 change: 1 addition & 0 deletions src/test/java/com/github/jobson/TestConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@

public final class TestConstants {
public static final int DEFAULT_TIMEOUT = 1000;
public static final int NUMBER_OF_BURN_IN_API_CALLS = 10_000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package com.github.jobson.auth.custom;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.jobson.auth.AuthenticationBootstrap;
import com.github.jobson.dao.users.UserDAO;
import io.dropwizard.jersey.DropwizardResourceConfig;
Expand All @@ -30,11 +29,8 @@
import org.junit.Test;

import javax.servlet.Servlet;
import java.io.IOException;

import static com.github.jobson.Helpers.readJSON;
import static com.github.jobson.TestHelpers.generateClassName;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
import static org.mockito.Mockito.mock;

public final class CustomAuthenticatorConfigTest {
Expand All @@ -50,41 +46,30 @@ private static AuthenticationBootstrap createTypicalAuthBootstrap() {


@Test(expected = NullPointerException.class)
public void testCtorThrowsIfClassNameWasNull() {
public void testCreateAuthFilterThrowsIfClassNameIsNull() {
final CustomAuthenticatorConfig config =
new CustomAuthenticatorConfig(null);
config.createAuthFilter(createTypicalAuthBootstrap());
}

@Test(expected = RuntimeException.class)
public void testCtorThrowsIfClassNameDoesNotExistOnClassPath() {
public void testCreateAuthFilterIfClassNameDoesNotExistOnClassPath() {
final CustomAuthenticatorConfig config =
new CustomAuthenticatorConfig(generateClassName());
config.createAuthFilter(createTypicalAuthBootstrap());
}

@Test(expected = RuntimeException.class)
public void testCtorThrowsIfClassDoesNotDeriveFromAuthenticationConfig() {
public void testCreateAuthFilterIfClassDoesNotDeriveFromAuthenticationConfig() {
final CustomAuthenticatorConfig config =
new CustomAuthenticatorConfig(Object.class.getName());
config.createAuthFilter(createTypicalAuthBootstrap());
}

@Test
public void testCtorDoesNotThrowIfClassDoesDeriveFromAuthenticationConfig() {
public void testCreateAuthFilterDoesNotThrowIfClassDoesDeriveFromAuthenticationConfig() {
final CustomAuthenticatorConfig config =
new CustomAuthenticatorConfig(NullCustomAuthConfig.class.getName());
}

@Test
public void testEnableWithPropertiesPutsThePropetiesOnTheLoadedCustomConfig() throws IOException {
final JsonNode n =
readJSON("{ \"prop1\": \"val1\", \"prop2\": \"val2\" }", JsonNode.class);

final CustomAuthenticatorConfig config =
new CustomAuthenticatorConfig(CustomAuthConfigWithProperties.class.getName(), n);

final CustomAuthConfigWithProperties createdConfig =
(CustomAuthConfigWithProperties)config.getLoadedConfig();

assertThat(createdConfig.getProp1()).isEqualTo("val1");
assertThat(createdConfig.getProp2()).isEqualTo("val2");
config.createAuthFilter(createTypicalAuthBootstrap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.github.jobson.Constants.JOB_MANAGER_JOB_QUEUE_OVERFLOW_HEALTHCHECK;
import static com.github.jobson.Constants.JOB_MANAGER_MAX_JOB_QUEUE_OVERFLOW_THRESHOLD;
Expand Down Expand Up @@ -74,12 +77,20 @@ private static JobManager createManagerWith(JobExecutor jobExecutor) {
return createManagerWith(new MockInMemoryJobWriter(), jobExecutor);
}

private static JobManager createManagerWith(JobExecutor jobExecutor, int maxRunningJobs) {
return createManagerWith(new MockInMemoryJobWriter(), jobExecutor, maxRunningJobs);
}

private static JobManager createManagerWith(WritingJobDAO dao) {
return createManagerWith(dao, MockJobExecutor.thatResolvesWith(new JobExecutionResult(FINISHED)));
}

private static JobManager createManagerWith(WritingJobDAO dao, JobExecutor executor) {
return new JobManager(dao, executor, Constants.MAX_CONCURRENT_JOBS);
return createManagerWith(dao, executor, Constants.MAX_CONCURRENT_JOBS);
}

private static JobManager createManagerWith(WritingJobDAO dao, JobExecutor executor, int maxRunningJobs) {
return new JobManager(dao, executor, maxRunningJobs);
}


Expand Down Expand Up @@ -536,4 +547,29 @@ public void testGetHealthChecksReturnsAHealthCheckForJobQueueOverflowing() {

assertThat(jobQueueHealthCheck.execute().isHealthy()).isFalse();
}

@Test
public void testJobManagerAdvancesJobQueueOnceAJobFinishesExecuting() throws InterruptedException, ExecutionException, TimeoutException {
final AtomicBoolean isFirst = new AtomicBoolean(true);
final SimpleCancelablePromise<JobExecutionResult> firstExcutionPromise = new SimpleCancelablePromise<>();
final SimpleCancelablePromise<JobExecutionResult> secondExecutionPromise = new SimpleCancelablePromise<>();

final Supplier<CancelablePromise<JobExecutionResult>> promiseSupplier = () ->
isFirst.getAndSet(false) ? firstExcutionPromise : secondExecutionPromise;

final MockJobExecutor mockJobExecutor = MockJobExecutor.thatUses(promiseSupplier);

final int maxRunningJobs = 1;

final JobManager jobManager = createManagerWith(mockJobExecutor, maxRunningJobs);

final CancelablePromise<FinalizedJob> firstJobPromise = jobManager.submit(STANDARD_VALID_REQUEST).getRight();
final CancelablePromise<FinalizedJob> secondJobPromise = jobManager.submit(STANDARD_VALID_REQUEST).getRight();

firstExcutionPromise.complete(new JobExecutionResult(FINISHED));
firstJobPromise.get(1, TimeUnit.SECONDS);

secondExecutionPromise.complete(new JobExecutionResult(FINISHED));
secondJobPromise.get(1, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.github.jobson.utils.CancelablePromise;
import com.github.jobson.utils.SimpleCancelablePromise;
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

import java.util.function.Supplier;

import static com.github.jobson.TestHelpers.generateRandomBytes;

Expand All @@ -39,7 +43,7 @@ public static MockJobExecutor thatResolvesWith(JobExecutionResult result) {
public static MockJobExecutor thatResolvesWith(JobExecutionResult result, byte[] stdout, byte[] stderr) {
final CancelablePromise<JobExecutionResult> ret = new SimpleCancelablePromise<>();
ret.complete(result);
return new MockJobExecutor(ret, Observable.just(stdout), Observable.just(stderr));
return new MockJobExecutor(() -> ret, Observable.just(stdout), Observable.just(stderr));
}

public static MockJobExecutor thatUses(CancelablePromise<JobExecutionResult> promise) {
Expand All @@ -49,25 +53,30 @@ public static MockJobExecutor thatUses(CancelablePromise<JobExecutionResult> pro
public static MockJobExecutor thatUses(Observable<byte[]> stdout, Observable<byte[]> stderr) {
final CancelablePromise<JobExecutionResult> p = new SimpleCancelablePromise<>();
p.complete(new JobExecutionResult(JobStatus.FINISHED));
return new MockJobExecutor(p, stdout, stderr);
return new MockJobExecutor(() -> p, stdout, stderr);
}

public static MockJobExecutor thatUses(CancelablePromise<JobExecutionResult> promise, Observable<byte[]> stdout, Observable<byte[]> stderr) {
return new MockJobExecutor(promise, stdout, stderr);
return new MockJobExecutor(() -> promise, stdout, stderr);
}

public static MockJobExecutor thatUses(Supplier<CancelablePromise<JobExecutionResult>> promiseSupplier) {
return new MockJobExecutor(promiseSupplier, Observable.just(generateRandomBytes()), Observable.just(generateRandomBytes()));
}


private final CancelablePromise<JobExecutionResult> ret;
private final Subject<PersistedJob> executionCalls = PublishSubject.create();
private final Supplier<CancelablePromise<JobExecutionResult>> promiseSupplier;
private final Observable<byte[]> stdout;
private final Observable<byte[]> stderr;


public MockJobExecutor(
CancelablePromise<JobExecutionResult> ret,
Supplier<CancelablePromise<JobExecutionResult>> promiseSupplier,
Observable<byte[]> stdout,
Observable<byte[]> stderr) {

this.ret = ret;
this.promiseSupplier = promiseSupplier;
this.stdout = stdout;
this.stderr = stderr;
}
Expand All @@ -76,6 +85,11 @@ public MockJobExecutor(
public CancelablePromise<JobExecutionResult> execute(PersistedJob persistedJob, JobEventListeners jobEventListeners) {
stdout.subscribe(jobEventListeners.getOnStdoutListener());
stderr.subscribe(jobEventListeners.getOnStderrListener());
return ret;
this.executionCalls.onNext(persistedJob);
return this.promiseSupplier.get();
}

public Observable<PersistedJob> getExecutionCalls() {
return this.executionCalls;
}
}
Loading

0 comments on commit b3d442f

Please sign in to comment.