Skip to content

Commit

Permalink
Improve logging and memory usage (#181)
Browse files Browse the repository at this point in the history
* Improve logging and memory usage

* Handle API exceptions
  • Loading branch information
boma96 authored Apr 3, 2024
1 parent 4a5dc2d commit 1109229
Showing 1 changed file with 31 additions and 30 deletions.
61 changes: 31 additions & 30 deletions src/ConductorSharp.Engine/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,40 +100,41 @@ private static Type GetInputType(Type workerType)

private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken cancellationToken)
{
var workerId = Guid.NewGuid().ToString();

var pollResponse = await _taskManager.PollAsync(scheduledWorker.TaskName, workerId, _configuration.Domain, cancellationToken);

if (pollResponse == null)
return;

if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath))
Client.Generated.Task pollResponse = null;
try
{
_logger.LogDebug("Fetching storage {location}", pollResponse.ExternalInputPayloadStoragePath);
// TODO: Check what the operation and payload type are
var externalStorageLocation = await _taskManager.GetExternalStorageLocationAsync(
pollResponse.ExternalInputPayloadStoragePath,
"",
"",
cancellationToken
);

// TODO: iffy
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, cancellationToken);
var workerId = Guid.NewGuid().ToString();

using TextReader textReader = new StreamReader(file.Stream);
var json = textReader.ReadToEnd();
pollResponse = await _taskManager.PollAsync(scheduledWorker.TaskName, workerId, _configuration.Domain, cancellationToken);

pollResponse.InputData = JsonConvert.DeserializeObject<IDictionary<string, object>>(
json,
ConductorConstants.IoJsonSerializerSettings
);
}
if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath))
{
_logger.LogDebug("Fetching storage {location}", pollResponse.ExternalInputPayloadStoragePath);
// TODO: Check what the operation and payload type are
var externalStorageLocation = await _taskManager.GetExternalStorageLocationAsync(
pollResponse.ExternalInputPayloadStoragePath,
"",
"",
cancellationToken
);

// TODO: iffy
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, cancellationToken);

using TextReader textReader = new StreamReader(file.Stream);
var json = await textReader.ReadToEndAsync();

pollResponse.InputData = JsonConvert.DeserializeObject<IDictionary<string, object>>(
json,
ConductorConstants.IoJsonSerializerSettings
);
}

try
{
var inputType = GetInputType(scheduledWorker.TaskType);
var inputData = SerializationHelper.DictonaryToObject(inputType, pollResponse.InputData, ConductorConstants.IoJsonSerializerSettings);
// Poll response data can be huge (if read from external storage)
// We can save memory by not holding reference to pollResponse.InputData after it is parsed
pollResponse.InputData = null;

using var scope = _lifetimeScopeFactory.CreateScope();

Expand Down Expand Up @@ -167,8 +168,8 @@ await _taskManager.UpdateAsync(
catch (Exception exception)
{
_logger.LogError(
"{error} while executing {task} as part of {workflow} with id {workflowId}",
exception.Message,
"{@Exception} while executing {Task} as part of {Workflow} with id {WorkflowId}",
exception,
pollResponse.TaskDefName,
pollResponse.WorkflowType,
pollResponse.WorkflowInstanceId
Expand Down

0 comments on commit 1109229

Please sign in to comment.