Skip to content

Commit

Permalink
fix(cancellation): handle message cancellation properly
Browse files Browse the repository at this point in the history
a previous fix #391, attempted to address a lock that occurred on context cancel. however in doing
so, it introduced a new lock. essentially, if a message was not sent to the
requestmanager/responsemanager go routine, waiting for a response to that message could last
indefinitely. the previous fix therefore stopped waiting when the calling context cancelled. However
once a message reaches the go routine of the requestmanager/responsemanager, it's important that
it's processed to completion, so that the the message loop doesn't lock. The proper fix is to detect
when the message is sent successfully, and if so, wait for it to be processed. If it isn't sent, we
can safely abort the go routine immediately.
  • Loading branch information
hannahhoward committed Aug 1, 2023
1 parent 6a2fa5c commit 8637464
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 41 deletions.
48 changes: 28 additions & 20 deletions requestmanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,14 @@ func (rm *RequestManager) NewRequest(ctx context.Context,

inProgressRequestChan := make(chan inProgressRequest)

rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
err := rm.send(&newRequestMessage{requestID, span, p, root, selectorNode, extensions, inProgressRequestChan}, ctx.Done())
if err != nil {
return rm.emptyResponse()
}
var receivedInProgressRequest inProgressRequest
select {
case <-rm.ctx.Done():
return rm.emptyResponse()
case <-ctx.Done():
return rm.emptyResponse()
case receivedInProgressRequest = <-inProgressRequestChan:
}

Expand Down Expand Up @@ -283,12 +284,13 @@ func (rm *RequestManager) cancelRequestAndClose(requestID graphsync.RequestID,
// CancelRequest cancels the given request ID and waits for the request to terminate
func (rm *RequestManager) CancelRequest(ctx context.Context, requestID graphsync.RequestID) error {
terminated := make(chan error, 1)
rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
err := rm.send(&cancelRequestMessage{requestID, terminated, graphsync.RequestClientCancelledErr{}}, ctx.Done())
if err != nil {
return err
}

Check warning on line 290 in requestmanager/client.go

View check run for this annotation

Codecov / codecov/patch

requestmanager/client.go#L289-L290

Added lines #L289 - L290 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-terminated:
return err
}
Expand All @@ -300,19 +302,20 @@ func (rm *RequestManager) ProcessResponses(p peer.ID,
responses []gsmsg.GraphSyncResponse,
blks []blocks.Block) {

rm.send(&processResponsesMessage{p, responses, blks}, nil)
_ = rm.send(&processResponsesMessage{p, responses, blks}, nil)
}

// UnpauseRequest unpauses a request that was paused in a block hook based request ID
// Can also send extensions with unpause
func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
err := rm.send(&unpauseRequestMessage{requestID, extensions, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 315 in requestmanager/client.go

View check run for this annotation

Codecov / codecov/patch

requestmanager/client.go#L314-L315

Added lines #L314 - L315 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -321,12 +324,13 @@ func (rm *RequestManager) UnpauseRequest(ctx context.Context, requestID graphsyn
// PauseRequest pauses an in progress request (may take 1 or more blocks to process)
func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 330 in requestmanager/client.go

View check run for this annotation

Codecov / codecov/patch

requestmanager/client.go#L329-L330

Added lines #L329 - L330 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -335,26 +339,27 @@ func (rm *RequestManager) PauseRequest(ctx context.Context, requestID graphsync.
// UpdateRequest updates an in progress request
func (rm *RequestManager) UpdateRequest(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 345 in requestmanager/client.go

View check run for this annotation

Codecov / codecov/patch

requestmanager/client.go#L344-L345

Added lines #L344 - L345 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
}

// GetRequestTask gets data for the given task in the request queue
func (rm *RequestManager) GetRequestTask(p peer.ID, task *peertask.Task, requestExecutionChan chan executor.RequestTask) {
rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
_ = rm.send(&getRequestTaskMessage{p, task, requestExecutionChan}, nil)
}

// ReleaseRequestTask releases a task request the requestQueue
func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err error) {
done := make(chan struct{}, 1)
rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
_ = rm.send(&releaseRequestTaskMessage{p, task, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand All @@ -364,7 +369,7 @@ func (rm *RequestManager) ReleaseRequestTask(p peer.ID, task *peertask.Task, err
// PeerState gets stats on all outgoing requests for a given peer
func (rm *RequestManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
rm.send(&peerStateMessage{p, response}, nil)
_ = rm.send(&peerStateMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return peerstate.PeerState{}
Expand Down Expand Up @@ -392,11 +397,14 @@ func (rm *RequestManager) Shutdown() {
rm.cancel()
}

func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) {
func (rm *RequestManager) send(message requestManagerMessage, done <-chan struct{}) error {
select {
case <-rm.ctx.Done():
return rm.ctx.Err()
case <-done:
return errors.New("unable to send message before cancellation")
case rm.messages <- message:
return nil
}
}

Expand Down
49 changes: 28 additions & 21 deletions responsemanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,19 @@ func New(ctx context.Context,

// ProcessRequests processes incoming requests for the given peer
func (rm *ResponseManager) ProcessRequests(ctx context.Context, p peer.ID, requests []gsmsg.GraphSyncRequest) {
rm.send(&processRequestsMessage{p, requests}, ctx.Done())
_ = rm.send(&processRequestsMessage{p, requests}, ctx.Done())
}

// UnpauseResponse unpauses a response that was previously paused
func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
err := rm.send(&unpauseRequestMessage{requestID, response, extensions}, ctx.Done())
if err != nil {
return err
}

Check warning on line 169 in responsemanager/client.go

View check run for this annotation

Codecov / codecov/patch

responsemanager/client.go#L168-L169

Added lines #L168 - L169 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -177,12 +178,13 @@ func (rm *ResponseManager) UnpauseResponse(ctx context.Context, requestID graphs
// PauseResponse pauses an in progress response (may take 1 or more blocks to process)
func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
err := rm.send(&pauseRequestMessage{requestID, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 184 in responsemanager/client.go

View check run for this annotation

Codecov / codecov/patch

responsemanager/client.go#L183-L184

Added lines #L183 - L184 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -191,12 +193,13 @@ func (rm *ResponseManager) PauseResponse(ctx context.Context, requestID graphsyn
// CancelResponse cancels an in progress response
func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsync.RequestID) error {
response := make(chan error, 1)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
err := rm.send(&errorRequestMessage{requestID, queryexecutor.ErrCancelledByCommand, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 199 in responsemanager/client.go

View check run for this annotation

Codecov / codecov/patch

responsemanager/client.go#L198-L199

Added lines #L198 - L199 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -205,12 +208,13 @@ func (rm *ResponseManager) CancelResponse(ctx context.Context, requestID graphsy
// UpdateRequest updates an in progress response
func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsync.RequestID, extensions ...graphsync.ExtensionData) error {
response := make(chan error, 1)
rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
err := rm.send(&updateRequestMessage{requestID, extensions, response}, ctx.Done())
if err != nil {
return err
}

Check warning on line 214 in responsemanager/client.go

View check run for this annotation

Codecov / codecov/patch

responsemanager/client.go#L213-L214

Added lines #L213 - L214 were not covered by tests
select {
case <-rm.ctx.Done():
return errors.New("context cancelled")
case <-ctx.Done():
return ctx.Err()
case err := <-response:
return err
}
Expand All @@ -219,7 +223,7 @@ func (rm *ResponseManager) UpdateResponse(ctx context.Context, requestID graphsy
// Synchronize is a utility method that blocks until all current messages are processed
func (rm *ResponseManager) synchronize() {
sync := make(chan error)
rm.send(&synchronizeMessage{sync}, nil)
_ = rm.send(&synchronizeMessage{sync}, nil)
select {
case <-rm.ctx.Done():
case <-sync:
Expand All @@ -228,18 +232,18 @@ func (rm *ResponseManager) synchronize() {

// StartTask starts the given task from the peer task queue
func (rm *ResponseManager) StartTask(task *peertask.Task, p peer.ID, responseTaskChan chan<- queryexecutor.ResponseTask) {
rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
_ = rm.send(&startTaskRequest{task, p, responseTaskChan}, nil)
}

// GetUpdates is called to read pending updates for a task and clear them
func (rm *ResponseManager) GetUpdates(requestID graphsync.RequestID, updatesChan chan<- []gsmsg.GraphSyncRequest) {
rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
_ = rm.send(&responseUpdateRequest{requestID, updatesChan}, nil)
}

// FinishTask marks a task from the task queue as done
func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error) {
done := make(chan struct{}, 1)
rm.send(&finishTaskRequest{task, p, err, done}, nil)
_ = rm.send(&finishTaskRequest{task, p, err, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand All @@ -249,7 +253,7 @@ func (rm *ResponseManager) FinishTask(task *peertask.Task, p peer.ID, err error)
// CloseWithNetworkError closes a request due to a network error
func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID) {
done := make(chan error, 1)
rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
_ = rm.send(&errorRequestMessage{requestID, queryexecutor.ErrNetworkError, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand All @@ -259,7 +263,7 @@ func (rm *ResponseManager) CloseWithNetworkError(requestID graphsync.RequestID)
// TerminateRequest indicates a request has finished sending data and should no longer be tracked
func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
done := make(chan struct{}, 1)
rm.send(&terminateRequestMessage{requestID, done}, nil)
_ = rm.send(&terminateRequestMessage{requestID, done}, nil)
select {
case <-rm.ctx.Done():
case <-done:
Expand All @@ -269,7 +273,7 @@ func (rm *ResponseManager) TerminateRequest(requestID graphsync.RequestID) {
// PeerState gets current state of the outgoing responses for a given peer
func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
response := make(chan peerstate.PeerState)
rm.send(&peerStateMessage{p, response}, nil)
_ = rm.send(&peerStateMessage{p, response}, nil)
select {
case <-rm.ctx.Done():
return peerstate.PeerState{}
Expand All @@ -278,11 +282,14 @@ func (rm *ResponseManager) PeerState(p peer.ID) peerstate.PeerState {
}
}

func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) {
func (rm *ResponseManager) send(message responseManagerMessage, done <-chan struct{}) error {
select {
case <-rm.ctx.Done():
return rm.ctx.Err()
case <-done:
return errors.New("unable to send message before cancellation")

Check warning on line 290 in responsemanager/client.go

View check run for this annotation

Codecov / codecov/patch

responsemanager/client.go#L290

Added line #L290 was not covered by tests
case rm.messages <- message:
return nil
}
}

Expand Down

0 comments on commit 8637464

Please sign in to comment.