Skip to content

Commit

Permalink
Merge pull request #69 from orkes-io/cleanup
Browse files Browse the repository at this point in the history
Cleaned up repo by removing unused code
  • Loading branch information
gardusig authored Jan 17, 2023
2 parents 0cde862 + 0d4c7e6 commit a8499e9
Show file tree
Hide file tree
Showing 24 changed files with 30 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Arrays;

import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest;
import com.netflix.conductor.common.run.Workflow;

import io.orkes.conductor.client.OrkesClients;
import io.orkes.conductor.client.WorkflowClient;
Expand Down Expand Up @@ -56,7 +55,7 @@ private void workflowOperations() {
// Start the workflow
String workflowId = workflowClient.startWorkflow(startWorkflowRequest);
// Get the workflow execution status
Workflow workflow = workflowClient.getWorkflow(workflowId, true);
workflowClient.getWorkflow(workflowId, true);
// Pause the workflow
workflowClient.pauseWorkflow(workflowId);
// Resume the workflow
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/orkes/conductor/client/ApiClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,6 @@ public X509Certificate[] getAcceptedIssuers() {
return null;
}
};
SSLContext sslContext = SSLContext.getInstance("TLS");
trustManagers = new TrustManager[] {trustAll};
hostnameVerifier =
new HostnameVerifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class TaskRunner {
private final int updateRetryCount;
private final ThreadPoolExecutor executorService;
private final Map<String /* taskType */, String /* domain */> taskToDomain;
private final int threadCount;
private final int taskPollTimeout;

public static final String DOMAIN = "domain";
Expand All @@ -75,7 +74,6 @@ class TaskRunner {
this.taskClient = taskClient;
this.updateRetryCount = updateRetryCount;
this.taskToDomain = taskToDomain;
this.threadCount = threadCount;
this.taskPollTimeout = taskPollTimeout;
this.permits = new Semaphore(threadCount);
this.executorService =
Expand Down Expand Up @@ -170,7 +168,6 @@ private List<Task> pollTasksForWorker() {
String domain = Optional.ofNullable(PropertyFactory.getString(taskType, DOMAIN, null)).orElseGet(() -> Optional.ofNullable(PropertyFactory.getString(ALL_WORKERS, DOMAIN, null)).orElse(taskToDomain.get(taskType)));
LOGGER.trace("Polling task of type: {} in domain: '{}' with size {}", taskType, domain, pollCount);
Stopwatch stopwatch = Stopwatch.createStarted();
long now = System.currentTimeMillis();
int tasksToPoll = pollCount;
tasks = MetricsContainer.getPollTimer(taskType).record(() -> pollTask(domain, tasksToPoll));
stopwatch.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.config.DefaultConductorClientConfiguration;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.discovery.EurekaClient;
Expand All @@ -31,7 +30,6 @@
import io.orkes.conductor.client.http.OrkesTaskClient;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static io.orkes.conductor.client.automator.TaskRunner.ALL_WORKERS;
import static io.orkes.conductor.client.automator.TaskRunner.DOMAIN;
Expand Down Expand Up @@ -106,9 +104,6 @@ public static class Builder {

private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollCount = new HashMap<>();

private ConductorClientConfiguration conductorClientConfiguration =
new DefaultConductorClientConfiguration();

public Builder(TaskClient taskClient, Iterable<Worker> workers) {
Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
Preconditions.checkNotNull(workers, "Workers cannot be null");
Expand Down Expand Up @@ -152,7 +147,6 @@ public TaskRunnerConfigurer.Builder withUpdateRetryCount(int updateRetryCount) {
*/
public TaskRunnerConfigurer.Builder withConductorClientConfiguration(
ConductorClientConfiguration conductorClientConfiguration) {
this.conductorClientConfiguration = conductorClientConfiguration;
return this;
}

Expand Down Expand Up @@ -288,25 +282,6 @@ public synchronized void init() {
}
}

private ThreadPoolExecutor getExecutor(int threadPoolSize) {
return new ThreadPoolExecutor(
threadPoolSize,
threadPoolSize,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(threadPoolSize) {
@Override
public boolean offer(Runnable runnable) {
try {
return super.offer(runnable, 200, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
return false;
}
}
},
new ThreadFactoryBuilder().setNameFormat("task-poll-execute-thread-%d").build());
}

/**
* Invoke this method within a PreDestroy block within your application to facilitate a graceful
* shutdown of your worker, during process termination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,25 @@

import io.orkes.conductor.client.ApiClient;
import io.orkes.conductor.client.http.ApiException;
import io.orkes.grpc.service.WorkflowServiceStreamGrpc;

import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;

import static io.orkes.conductor.client.grpc.ChannelManager.getChannel;

@Slf4j
public class ExecuteWorkflowStream {

private final WorkflowServiceStreamGrpc.WorkflowServiceStreamStub stub;

private final ApiClient apiClient;
private final HeaderClientInterceptor headerInterceptor;
private StreamObserver<StartWorkflowRequestPb.StartWorkflowRequest> requests;

private volatile boolean ready;

private int reconnectBackoff = 10;

public ExecuteWorkflowStream(ApiClient apiClient) {
this.apiClient = apiClient;
this.headerInterceptor = new HeaderClientInterceptor(apiClient);
if (apiClient.useSecurity()) {
apiClient.getToken();
}
this.stub =
WorkflowServiceStreamGrpc.newStub(getChannel(apiClient))
.withInterceptors(headerInterceptor);
connect();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class PooledPoller implements StreamObserver<TaskPb.Task> {
private final TaskServiceGrpc.TaskServiceStub taskPollClient;
private final Worker worker;
private final String domain;
private final Integer taskPollTimeout;
private ThreadPoolExecutor executor;
private Integer threadCountForTask;
private final ArrayBlockingQueue<Holder> latchesForOrder = new ArrayBlockingQueue<>(10000);
Expand All @@ -63,7 +62,6 @@ public PooledPoller(ApiClient apiClient, Worker worker, String domain, int taskP
this.taskPollClient = TaskServiceGrpc.newStub(channel).withInterceptors(new HeaderClientInterceptor(apiClient));
this.worker = worker;
this.domain = domain;
this.taskPollTimeout = taskPollTimeout;
this.executor = executor;
this.threadCountForTask = threadCountForTask;
this.taskPollCount = taskPollCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public CompletableFuture<WorkflowRun> executeWorkflow(StartWorkflowRequest start
requestBuilder.setWaitUntilTask(waitUntilTask);
}
requestBuilder.setRequest(protoMappingHelper.toProto(startWorkflowRequest));
CompletableFuture future = executionMonitor.monitorRequest(requestId);
CompletableFuture<WorkflowRun> future = executionMonitor.monitorRequest(requestId);
synchronized (requestStream) {
requestStream.onNext(requestBuilder.build());
}
Expand Down
21 changes: 0 additions & 21 deletions src/main/java/io/orkes/conductor/client/http/JSON.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.text.ParseException;
import java.text.ParsePosition;
import java.util.Date;
import java.util.Map;

import org.threeten.bp.LocalDate;
import org.threeten.bp.OffsetDateTime;
Expand All @@ -44,26 +43,6 @@ public static GsonBuilder createGson() {
return fireBuilder.createGsonBuilder();
}

private static String getDiscriminatorValue(
JsonElement readElement, String discriminatorField) {
JsonElement element = readElement.getAsJsonObject().get(discriminatorField);
if (null == element) {
throw new IllegalArgumentException(
"missing discriminator field: <" + discriminatorField + ">");
}
return element.getAsString();
}

private static <T> Class<? extends T> getClassByDiscriminator(
Map<String, Class<? extends T>> classByDiscriminatorValue, String discriminatorValue) {
Class<? extends T> clazz = classByDiscriminatorValue.get(discriminatorValue.toUpperCase());
if (null == clazz) {
throw new IllegalArgumentException(
"cannot determine model class of name: <" + discriminatorValue + ">");
}
return clazz;
}

public JSON() {
gson = createGson()
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call addRoleToApplicationUserValidateBeforeCall(
String applicationId,
String role,
Expand Down Expand Up @@ -245,7 +244,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call createAccessKeyValidateBeforeCall(
String id,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -359,7 +357,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call createApplicationValidateBeforeCall(
CreateOrUpdateApplicationRequest createOrUpdateApplicationRequest,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -489,7 +486,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call deleteAccessKeyValidateBeforeCall(
String applicationId,
String keyId,
Expand Down Expand Up @@ -615,7 +611,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call deleteApplicationValidateBeforeCall(
String id,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -730,7 +725,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getAccessKeysValidateBeforeCall(
String id,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -848,7 +842,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getApplicationValidateBeforeCall(
String id,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -962,7 +955,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call listApplicationsValidateBeforeCall(
final ProgressResponseBody.ProgressListener progressListener,
final ProgressRequestBody.ProgressRequestListener progressRequestListener)
Expand Down Expand Up @@ -1078,7 +1070,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call removeRoleFromApplicationUserValidateBeforeCall(
String applicationId,
String role,
Expand Down Expand Up @@ -1207,7 +1198,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call toggleAccessKeyStatusValidateBeforeCall(
String applicationId,
String keyId,
Expand Down Expand Up @@ -1338,7 +1328,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call updateApplicationValidateBeforeCall(
CreateOrUpdateApplicationRequest createOrUpdateApplicationRequest,
String id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getPermissionsValidateBeforeCall(
String type,
String id,
Expand Down Expand Up @@ -245,7 +244,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call grantPermissionsValidateBeforeCall(
AuthorizationRequest authorizationRequest,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -360,7 +358,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call removePermissionsValidateBeforeCall(
AuthorizationRequest authorizationRequest,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call addEventHandlerValidateBeforeCall(
EventHandler eventHandler,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -238,7 +237,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call deleteQueueConfigValidateBeforeCall(
String queueType,
String queueName,
Expand Down Expand Up @@ -362,7 +360,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getEventHandlersValidateBeforeCall(
final ProgressResponseBody.ProgressListener progressListener,
final ProgressRequestBody.ProgressRequestListener progressRequestListener)
Expand Down Expand Up @@ -476,7 +473,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getEventHandlersForEventValidateBeforeCall(
String event,
Boolean activeOnly,
Expand Down Expand Up @@ -608,7 +604,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getQueueConfigValidateBeforeCall(
String queueType,
String queueName,
Expand Down Expand Up @@ -731,7 +726,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call getQueueNamesValidateBeforeCall(
final ProgressResponseBody.ProgressListener progressListener,
final ProgressRequestBody.ProgressRequestListener progressRequestListener)
Expand Down Expand Up @@ -848,7 +842,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call putQueueConfigValidateBeforeCall(
String body,
String queueType,
Expand Down Expand Up @@ -987,7 +980,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call removeEventHandlerStatusValidateBeforeCall(
String name,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down Expand Up @@ -1101,7 +1093,6 @@ public com.squareup.okhttp.Response intercept(
progressRequestListener);
}

@SuppressWarnings("rawtypes")
private com.squareup.okhttp.Call updateEventHandlerValidateBeforeCall(
EventHandler eventHandler,
final ProgressResponseBody.ProgressListener progressListener,
Expand Down
Loading

0 comments on commit a8499e9

Please sign in to comment.