Skip to content

Commit

Permalink
foundation for grpc system task
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n committed Nov 23, 2024
1 parent 194d181 commit b7a703c
Show file tree
Hide file tree
Showing 12 changed files with 597 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
*/
package com.netflix.conductor.core.execution;

import java.util.List;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.tasks.TaskResult;
import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest;
import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest;
Expand All @@ -23,6 +22,9 @@
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;

import java.util.List;
import java.util.Map;

public interface WorkflowExecutor {

/**
Expand Down Expand Up @@ -105,6 +107,12 @@ void restart(String workflowId, boolean useLatestDefinitions)
*/
WorkflowModel decide(String workflowId);

/**
* @param workflow the workflow to be evaluated
* @return updated workflow
*/
WorkflowModel decide(WorkflowModel workflow);

/**
* @param workflowId id of the workflow to be terminated
* @param reason termination reason to be recorded
Expand Down Expand Up @@ -159,4 +167,23 @@ void skipTaskFromWorkflow(
* @return id of the workflow
*/
String startWorkflow(StartWorkflowInput input);

default String startWorkflow(String name, Integer version,
String correlationId,
Map<String, Object> workflowInput,
String externalInputPayloadStoragePath, String event,
Map<String, String> taskToDomain) {
StartWorkflowInput input = new StartWorkflowInput();
input.setName(name);
input.setVersion(version);
input.setCorrelationId(correlationId);
input.setWorkflowInput(workflowInput);
input.setExternalInputPayloadStoragePath(externalInputPayloadStoragePath);
input.setEvent(event);
input.setTaskToDomain(taskToDomain);
return startWorkflow(input);
}

TaskDef getTaskDefinition(TaskModel task);
public void addTaskToQueue(TaskModel task);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,8 @@ public WorkflowModel decide(String workflowId) {
* method does not acquire the lock on the workflow and should ony be called / overridden if
* No locking is required or lock is acquired externally
*/
private WorkflowModel decide(WorkflowModel workflow) {
@Override
public WorkflowModel decide(WorkflowModel workflow) {
if (workflow.getStatus().isTerminal()) {
if (!workflow.getStatus().isSuccessful()) {
cancelNonTerminalTasks(workflow);
Expand Down Expand Up @@ -1371,7 +1372,8 @@ public WorkflowModel getWorkflow(String workflowId, boolean includeTasks) {
return executionDAOFacade.getWorkflowModel(workflowId, includeTasks);
}

private void addTaskToQueue(TaskModel task) {
@Override
public void addTaskToQueue(TaskModel task) {
// put in queue
String taskQueueName = QueueUtils.getQueueName(task);
if (task.getCallbackAfterSeconds() > 0) {
Expand Down Expand Up @@ -1767,7 +1769,8 @@ public void scheduleNextIteration(TaskModel loopTask, WorkflowModel workflow) {
workflow.getTasks().addAll(scheduledLoopOverTasks);
}

private TaskDef getTaskDefinition(TaskModel task) {
@Override
public TaskDef getTaskDefinition(TaskModel task) {
return task.getTaskDefinition()
.orElseGet(
() ->
Expand Down
1 change: 1 addition & 0 deletions dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ ext {
revHealth = '1.1.4'
revPostgres = '42.7.2'
revProtoBuf = '3.25.5'
revProtoBufUtils = '4.28.3'
revJakartaAnnotation = '2.1.1'
revJAXB = '4.0.1'
revJAXRS = '4.0.0'
Expand Down
18 changes: 18 additions & 0 deletions grpc-task/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
dependencies {
implementation project(':conductor-common')
implementation project(':conductor-core')

compileOnly 'org.springframework.boot:spring-boot-starter'

implementation "io.grpc:grpc-netty:${revGrpc}"
implementation "io.grpc:grpc-services:${revGrpc}"
implementation "io.grpc:grpc-protobuf:${revGrpc}"
implementation "com.google.protobuf:protobuf-java:${revProtoBuf}"
implementation "com.google.protobuf:protobuf-java-util:${revProtoBufUtils}"

implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.fasterxml.jackson.core:jackson-core"
implementation "com.fasterxml.jackson.core:jackson-annotations"

testImplementation "io.grpc:grpc-protobuf:${revGrpc}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.conductoross.tasks.grpc;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.MethodDescriptor;

import java.io.IOException;
import java.io.InputStream;

public class DynamicMessageMarshaller implements MethodDescriptor.Marshaller<DynamicMessage> {
private final Descriptors.Descriptor descriptor;

public DynamicMessageMarshaller(Descriptors.Descriptor descriptor) {
this.descriptor = descriptor;
}

@Override
public InputStream stream(DynamicMessage value) {
return value.toByteString().newInput();
}

@Override
public DynamicMessage parse(InputStream stream) {
try {
return DynamicMessage.parseFrom(descriptor, stream);
} catch (IOException e) {
throw new RuntimeException("Failed to parse message", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package org.conductoross.tasks.grpc;

import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Descriptors;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DynamicServiceDescriptorBuilder {

private final Map<String, Descriptors.FileDescriptor> fileDescriptorMap = new HashMap<>();

/**
* Loads a descriptor set file and initializes the FileDescriptors.
*
* @param descriptorSetPath Path to the .desc file.
* @throws IOException If the file cannot be read.
* @throws Descriptors.DescriptorValidationException If descriptor validation fails.
*/
public void loadDescriptorSet(String descriptorSetPath) throws IOException, Descriptors.DescriptorValidationException {
DescriptorProtos.FileDescriptorSet descriptorSet =
DescriptorProtos.FileDescriptorSet.parseFrom(new FileInputStream(descriptorSetPath));

// Add well-known types to the descriptor cache
addWellKnownTypes();

// Build each FileDescriptor and cache it in the map
for (DescriptorProtos.FileDescriptorProto fileDescriptorProto : descriptorSet.getFileList()) {
// Resolve dependencies
Descriptors.FileDescriptor[] dependencies = fileDescriptorProto.getDependencyList().stream()
.map(fileDescriptorMap::get)
.toArray(Descriptors.FileDescriptor[]::new);

// Build the FileDescriptor
Descriptors.FileDescriptor fileDescriptor =
Descriptors.FileDescriptor.buildFrom(fileDescriptorProto, dependencies);

// Cache the FileDescriptor for resolving future imports
fileDescriptorMap.put(fileDescriptor.getName(), fileDescriptor);
}
}

/**
* Adds well-known Protobuf types (e.g., Timestamp, Any) to the descriptor map.
*/
private void addWellKnownTypes() {
// Add each well-known type to the file descriptor map
fileDescriptorMap.put("google/protobuf/timestamp.proto", com.google.protobuf.TimestampProto.getDescriptor());
fileDescriptorMap.put("google/protobuf/any.proto", com.google.protobuf.AnyProto.getDescriptor());
fileDescriptorMap.put("google/protobuf/struct.proto", com.google.protobuf.StructProto.getDescriptor());
fileDescriptorMap.put("google/protobuf/duration.proto", com.google.protobuf.DurationProto.getDescriptor());
fileDescriptorMap.put("google/protobuf/empty.proto", com.google.protobuf.EmptyProto.getDescriptor());
fileDescriptorMap.put("google/protobuf/field_mask.proto", com.google.protobuf.FieldMaskProto.getDescriptor());
}

public Descriptors.Descriptor getMessageType(String name) {
for (Descriptors.FileDescriptor fd : fileDescriptorMap.values()) {
for (Descriptors.Descriptor messageType : fd.getMessageTypes()) {
String messageTypeName = messageType.getName();
String fullName = messageType.getFullName();
System.out.println(messageTypeName + " == " + fullName);
if(fullName.equals(name)) {
return messageType;
}
}
}
return null;
}

public String getMessageProto(String name) {
var descriptor = getMessageType(name);
return getProtoDefinition(descriptor, "\t");
}

private String getProtoDefinition(Descriptors.Descriptor descriptor, String indent) {
StringBuilder protoDef = new StringBuilder();
protoDef.append(indent).append("message ").append(descriptor.getName()).append(" {\n");

for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
protoDef.append(indent).append(" ");
protoDef.append(getFieldDefinition(field)).append("\n");
}

for (Descriptors.Descriptor nestedDescriptor : descriptor.getNestedTypes()) {
protoDef.append(getProtoDefinition(nestedDescriptor, indent + " "));
}

protoDef.append(indent).append("}\n");
return protoDef.toString();
}

private String getFieldDefinition(Descriptors.FieldDescriptor field) {
StringBuilder fieldDef = new StringBuilder();

// Add field type
if (field.isRepeated()) {
fieldDef.append("repeated ");
}

switch (field.getType()) {
case MESSAGE:
fieldDef.append(field.getMessageType().getName());
break;
case ENUM:
fieldDef.append(field.getEnumType().getName());
break;
default:
fieldDef.append(field.getType().name().toLowerCase());
}

// Add field name and number
fieldDef.append(" ").append(field.getName()).append(" = ").append(field.getNumber()).append(";");

return fieldDef.toString();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.conductoross.tasks.grpc;

import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.stub.ClientCalls;

import java.util.Iterator;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class GrpcDynamicCaller {

private final ManagedChannel channel;

public GrpcDynamicCaller(ManagedChannel channel) {
this.channel = channel;
}

public DynamicMessage callUnaryMethod(
String fullMethodName,
Descriptors.Descriptor inputDescriptor,
Descriptors.Descriptor outputDescriptor,
DynamicMessage requestMessage
) {
// Build the gRPC MethodDescriptor dynamically
MethodDescriptor<DynamicMessage, DynamicMessage> grpcMethodDescriptor =
MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(fullMethodName)
.setRequestMarshaller(new DynamicMessageMarshaller(inputDescriptor))
.setResponseMarshaller(new DynamicMessageMarshaller(outputDescriptor))
.build();

// Make the call
return ClientCalls.blockingUnaryCall(channel, grpcMethodDescriptor, io.grpc.CallOptions.DEFAULT, requestMessage);
}

public Stream<DynamicMessage> callResponseStreamMethod(
String fullMethodName,
Descriptors.Descriptor inputDescriptor,
Descriptors.Descriptor outputDescriptor,
DynamicMessage requestMessage
) {
// Build the gRPC MethodDescriptor dynamically
MethodDescriptor<DynamicMessage, DynamicMessage> grpcMethodDescriptor =
MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setType(MethodDescriptor.MethodType.SERVER_STREAMING)
.setFullMethodName(fullMethodName)
.setRequestMarshaller(new DynamicMessageMarshaller(inputDescriptor))
.setResponseMarshaller(new DynamicMessageMarshaller(outputDescriptor))
.build();

// Make the call
var stream = ClientCalls.blockingServerStreamingCall(channel, grpcMethodDescriptor, io.grpc.CallOptions.DEFAULT, requestMessage);
return toStream(stream);
}

public DynamicMessage callUnaryMethod(
Descriptors.ServiceDescriptor serviceDescriptor,
String methodName,
DynamicMessage requestMessage
) {
// Find the method descriptor
Descriptors.MethodDescriptor methodDescriptor = serviceDescriptor.findMethodByName(methodName);
if (methodDescriptor == null) {
throw new IllegalArgumentException("Method " + methodName + " not found in service " + serviceDescriptor.getFullName());
}

// Build the gRPC method descriptor
MethodDescriptor<DynamicMessage, DynamicMessage> grpcMethodDescriptor =
MethodDescriptor.<DynamicMessage, DynamicMessage>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(
MethodDescriptor.generateFullMethodName(serviceDescriptor.getFullName(), methodName))
.setRequestMarshaller(new DynamicMessageMarshaller(methodDescriptor.getInputType()))
.setResponseMarshaller(new DynamicMessageMarshaller(methodDescriptor.getOutputType()))
.build();

// Make the call
return ClientCalls.blockingUnaryCall(channel, grpcMethodDescriptor, io.grpc.CallOptions.DEFAULT, requestMessage);
}

public static DynamicMessage jsonToProto(String json, Descriptors.Descriptor descriptor) throws InvalidProtocolBufferException {
DynamicMessage.Builder builder = DynamicMessage.newBuilder(descriptor);
JsonFormat.parser().merge(json, builder);
return builder.build();
}

private static <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(
((Iterable<T>) () -> iterator).spliterator(),
false
);
}






}
Loading

0 comments on commit b7a703c

Please sign in to comment.