Skip to content

Commit

Permalink
Finally got all of the SignalR notifications working again; properly …
Browse files Browse the repository at this point in the history
…configured for Event Hubs.
  • Loading branch information
ScottArbeit committed Dec 28, 2023
1 parent 1a9b66a commit 0e8eabf
Show file tree
Hide file tree
Showing 15 changed files with 81 additions and 81 deletions.
17 changes: 12 additions & 5 deletions src/Grace.Actors/Branch.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ open Grace.Shared.Dto.Reference

module Branch =

let GetActorId (branchId: BranchId) = ActorId($"{branchId}")

// Branch should support logical deletes with physical deletes set using a Dapr Timer based on a repository-level setting.
// Branch Deletion should enumerate and delete each reference in the branch.

Expand Down Expand Up @@ -94,13 +96,18 @@ module Branch =
let activateStartTime = getCurrentInstant()
let stateManager = this.StateManager
task {
let mutable message = String.Empty
let! retrievedDto = Storage.RetrieveState<BranchDto> stateManager dtoStateName
match retrievedDto with
| Some retrievedDto -> branchDto <- retrievedDto
| None -> branchDto <- BranchDto.Default
| Some retrievedDto ->
branchDto <- retrievedDto
message <- "Retrieved from database."
| None ->
branchDto <- BranchDto.Default
message <- "Not found in database."

let duration_ms = getCurrentInstant().Minus(activateStartTime).TotalMilliseconds.ToString("F3")
log.LogInformation("{CurrentInstant}: Activated {ActorType} {ActorId}. Retrieved from storage in {duration_ms}ms.", getCurrentInstantExtended(), actorName, host.Id, duration_ms)
log.LogInformation("{CurrentInstant}: Activated {ActorType} {ActorId}. {message} Duration: {duration_ms}ms.", getCurrentInstantExtended(), actorName, host.Id, message, duration_ms)
} :> Task

member private this.SetMaintenanceReminder() =
Expand Down Expand Up @@ -153,8 +160,8 @@ module Branch =

// Publish the event to the rest of the world.
let graceEvent = Events.GraceEvent.BranchEvent branchEvent
let message = graceEvent |> serialize
do! daprClient.PublishEventAsync(Constants.GracePubSubService, Constants.GraceEventStreamTopic, message)
let message = serialize graceEvent
do! daprClient.PublishEventAsync(Constants.GracePubSubService, Constants.GraceEventStreamTopic, graceEvent)

//let httpClient = getHttpClient branchEvent.Metadata.CorrelationId
//httpClient.BaseAddress <- Uri $"http://127.0.0.1:5000"
Expand Down
10 changes: 7 additions & 3 deletions src/Grace.Actors/DirectoryVersion.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,22 @@ module DirectoryVersion =
let activateStartTime = getCurrentInstant()
let stateManager = this.StateManager
task {
let mutable message = String.Empty
try
let! retrievedDto = Storage.RetrieveState<DirectoryVersion> stateManager dtoStateName
match retrievedDto with
| Some retrievedDto -> directoryVersion <- retrievedDto
| None -> ()
| Some retrievedDto ->
directoryVersion <- retrievedDto
message <- "Retrieved from database."
| None ->
message <- "Not found in database."
with ex ->
let exc = createExceptionResponse ex
log.LogError("{CurrentInstant} Error in {ActorType} {ActorId}.", getCurrentInstantExtended(), this.GetType().Name, host.Id)
log.LogError("{CurrentInstant} {ExceptionDetails}", getCurrentInstantExtended(), exc.ToString())

let duration_ms = getCurrentInstant().Minus(activateStartTime).TotalMilliseconds.ToString("F3")
log.LogInformation("{CurrentInstant}: Activated {ActorType} {ActorId}. Retrieved from storage in {duration_ms}ms.", getCurrentInstantExtended(), actorName, host.Id, duration_ms)
log.LogInformation("{CurrentInstant}: Activated {ActorType} {ActorId}. {message} Duration: {duration_ms}ms.", getCurrentInstantExtended(), actorName, host.Id, message, duration_ms)
} :> Task

override this.OnPreActorMethodAsync(context) =
Expand Down
4 changes: 2 additions & 2 deletions src/Grace.Actors/Organization.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ module Organization =

// Publish the event to the rest of the world.
let graceEvent = Events.GraceEvent.OrganizationEvent organizationEvent
let message = graceEvent |> serialize
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, message)
let message = serialize graceEvent
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, graceEvent)

// Update the Dto based on the current event.
organizationDto <- organizationDto |> updateDto organizationEvent.Event
Expand Down
4 changes: 2 additions & 2 deletions src/Grace.Actors/Owner.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ module Owner =

// Publish the event to the rest of the world.
let graceEvent = Events.GraceEvent.OwnerEvent ownerEvent
let message = graceEvent |> serialize
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, message)
let message = serialize graceEvent
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, graceEvent)

let returnValue = GraceReturnValue.Create "Owner command succeeded." ownerEvent.Metadata.CorrelationId
returnValue.Properties.Add(nameof(OwnerId), $"{ownerDto.OwnerId}")
Expand Down
2 changes: 1 addition & 1 deletion src/Grace.Actors/Reference.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ open Types

module Reference =

let GetActorId (referenceId) = ActorId($"{referenceId}")
let GetActorId (referenceId: ReferenceId) = ActorId($"{referenceId}")

type ReferenceActor(host: ActorHost) =
inherit Actor (host)
Expand Down
4 changes: 2 additions & 2 deletions src/Grace.Actors/Repository.Actor.fs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ module Repository =
| Ok (branchId, referenceId) ->
// Publish the event to the rest of the world.
let graceEvent = Events.GraceEvent.RepositoryEvent repositoryEvent
let message = graceEvent |> serialize
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, message)
let message = serialize graceEvent
do! daprClient.PublishEventAsync(GracePubSubService, GraceEventStreamTopic, graceEvent)

let returnValue = GraceReturnValue.Create $"Repository command succeeded." repositoryEvent.Metadata.CorrelationId
returnValue.Properties.Add(nameof(OwnerId), $"{repositoryDto.OwnerId}")
Expand Down
3 changes: 2 additions & 1 deletion src/Grace.CLI/Command/Services.CLI.fs
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ module Services =
// }

/// The full path of the inter-process communication file that grace watch uses to communicate with other invocations of Grace.
let IpcFileName() = Path.Combine(Path.GetTempPath(), Constants.IpcFileName)
let IpcFileName() = Path.Combine(Path.GetTempPath(), "Grace", Current().BranchName, Constants.IpcFileName)

/// Updates the contents of the `grace watch` status inter-process communication file.
let updateGraceWatchInterprocessFile (graceStatus: GraceStatus) =
Expand All @@ -766,6 +766,7 @@ module Services =
//logToAnsiConsole Colors.Important $"In updateGraceWatchStatus. newGraceWatchStatus.UpdatedAt: {newGraceWatchStatus.UpdatedAt.ToString(InstantPattern.ExtendedIso.PatternText, CultureInfo.InvariantCulture)}."
//logToAnsiConsole Colors.Highlighted $"{Markup.Escape(EnhancedStackTrace.Current().ToString())}"

Directory.CreateDirectory(Path.GetDirectoryName(IpcFileName())) |> ignore
use fileStream = new FileStream(IpcFileName(), FileMode.Create, FileAccess.Write, FileShare.None)
do! serializeAsync fileStream newGraceWatchStatus
graceWatchStatusUpdateTime <- newGraceWatchStatus.UpdatedAt
Expand Down
8 changes: 0 additions & 8 deletions src/Grace.CLI/Command/Watch.CLI.fs
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,6 @@ module Watch =
else
logToAnsiConsole Colors.Important $"{updateInProgressFileName} should already exist, but it doesn't."

let OnGraceUpdateInProgressChanged (args: FileSystemEventArgs) =
if args.FullPath = updateInProgressFileName then
if updateInProgress() then
logToAnsiConsole Colors.Important $"Update is in progress from another Grace instance."
else
logToAnsiConsole Colors.Important $"{updateInProgressFileName} should already exist, but it doesn't."

let OnGraceUpdateInProgressDeleted (args: FileSystemEventArgs) =
if args.FullPath = updateInProgressFileName then
if updateNotInProgress() then
Expand Down Expand Up @@ -319,7 +312,6 @@ module Watch =
Directory.CreateDirectory(Path.GetDirectoryName(updateInProgressFileName)) |> ignore
use updateInProgressFileSystemWatcher = createFileSystemWatcher (Path.GetDirectoryName(updateInProgressFileName))
use updateInProgressChanged = Observable.FromEventPattern<FileSystemEventArgs>(updateInProgressFileSystemWatcher, "Created").Select(fun e -> e.EventArgs).Subscribe(OnGraceUpdateInProgressCreated)
use updateInProgressChanged = Observable.FromEventPattern<FileSystemEventArgs>(updateInProgressFileSystemWatcher, "Changed").Select(fun e -> e.EventArgs).Subscribe(OnGraceUpdateInProgressChanged)
use updateInProgressDeleted = Observable.FromEventPattern<FileSystemEventArgs>(updateInProgressFileSystemWatcher, "Deleted").Select(fun e -> e.EventArgs).Subscribe(OnGraceUpdateInProgressDeleted)

// Load the Grace Index file.
Expand Down
4 changes: 2 additions & 2 deletions src/Grace.Server/Branch.Server.fs
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ module Branch =
| None ->
return List<DirectoryVersion>()
elif not <| String.IsNullOrEmpty(listContentsParameters.ReferenceId) then
let referenceActorId = Reference.GetActorId listContentsParameters.ReferenceId
let referenceActorId = ActorId(listContentsParameters.ReferenceId)
let referenceActorProxy = actorProxyFactory.CreateActorProxy<IReferenceActor>(referenceActorId, ActorName.Reference)
let! referenceDto = referenceActorProxy.Get()
let directoryActorId = DirectoryVersion.GetActorId referenceDto.DirectoryId
Expand Down Expand Up @@ -931,7 +931,7 @@ module Branch =
| None -> () // This should never happen because it would get caught in validations.
elif not <| String.IsNullOrEmpty(parameters.ReferenceId) then
logToConsole $"In Branch.GetVersion: parameters.ReferenceId: {parameters.ReferenceId}"
let referenceActorId = Reference.GetActorId(parameters.ReferenceId)
let referenceActorId = ActorId(parameters.ReferenceId)
let referenceActorProxy = ApplicationContext.actorProxyFactory.CreateActorProxy<IReferenceActor>(referenceActorId, ActorName.Reference)
let! referenceDto = referenceActorProxy.Get()
logToConsole $"referenceDto.ReferenceId: {referenceDto.ReferenceId}"
Expand Down
7 changes: 5 additions & 2 deletions src/Grace.Server/Middleware/ValidateIds.Middleware.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ type ValidateIdsMiddleware(next: RequestDelegate) =
/// Holds the property info for each parameter type.
let propertyLookup = ConcurrentDictionary<Type, EntityProperties>()

/// Paths that we want to ignore, because they won't have Ids and Names in the body.
let ignorePaths = ["/healthz"; "/actors"; "/dapr"; "/notifications"]

/// Gets the parameter type for the endpoint from the endpoint metadata created in Startup.Server.fs.
let getBodyType (context: HttpContext) =
let path = context.Request.Path.ToString()
if not <| path.StartsWith("/healthz") && not <| path.StartsWith("/actors") && not <| path.StartsWith("/dapr") && not <| path.StartsWith("/notifications") then
if not <| (ignorePaths |> Seq.exists(fun ignorePath -> path.StartsWith(ignorePath, StringComparison.InvariantCultureIgnoreCase))) then
let endpoint = context.GetEndpoint()
if isNull(endpoint) then
log.LogDebug("{currentInstant}: Path: {context.Request.Path}; Endpoint: null.", getCurrentInstantExtended(), context.Request.Path)
Expand Down Expand Up @@ -256,7 +259,7 @@ type ValidateIdsMiddleware(next: RequestDelegate) =
context.Request.Headers["X-MiddlewareTraceOut"] <- $"{middlewareTraceOutHeader}{nameof(ValidateIdsMiddleware)} --> ";

let elapsed = getCurrentInstant().Minus(startTime).TotalMilliseconds
if not <| path.StartsWith("/healthz") && not <| path.StartsWith("/actors") && not <| path.StartsWith("/dapr") && not <| path.StartsWith("/notifications") then
if not <| (ignorePaths |> Seq.exists(fun ignorePath -> path.StartsWith(ignorePath, StringComparison.InvariantCultureIgnoreCase))) then
log.LogDebug("{currentInstant}: Path: {path}; Elapsed: {elapsed}ms; Status code: {statusCode}; graceIds: {graceIds}",
getCurrentInstantExtended(), context.Request.Path, elapsed, context.Response.StatusCode, serialize graceIds)
#endif
Expand Down
Loading

0 comments on commit 0e8eabf

Please sign in to comment.