Skip to content

Commit

Permalink
Merge pull request #28 in XP/xp-v-golden-gate from shawn/FC-1525_canc…
Browse files Browse the repository at this point in the history
…el_ongoing_blockwise_coap_req_xp to master

* commit '4be8c171894faddccac2ed55583cb1c0552ccaba':
  FC-1525: code review
  FC-1525: add unit tests
  FC-1525: code review
  FC-1525: support to cancel ongoing blockwise coap request
  • Loading branch information
ShawnW858 committed Dec 8, 2020
2 parents 3dce73c + 4be8c17 commit 0996351
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ typedef struct {
GG_CoapRequestHandle request_handle;
} ResponseListenerBlockwise;

typedef struct {
ResponseListenerBlockwise *responseListener;
jboolean canceled;
} CancelResponseForBlockwiseArgs;

/**
* Send a blockwise request to coap server. Must be called from GetLoop thread
*
Expand Down Expand Up @@ -164,22 +169,28 @@ static GG_Result CoapEndpoint_Cleanup_Wrapper(void *_args) {
* @thread GG Loop
*/
static GG_Result CoapEndpoint_CancelResponseFor_Blockwise(void *_args) {
ResponseListenerBlockwise *args = (ResponseListenerBlockwise *) _args;

// ----------------------------------------------
// *args may have been freed. We can check its fields for null, but if that memory has been reused
// who knows what might happen.
//--------------------------------------------------
if (args->endpoint != NULL && args->request_handle != GG_COAP_INVALID_REQUEST_HANDLE) {
GG_Result result = GG_CoapEndpoint_CancelBlockwiseRequest(
args->endpoint,
args->request_handle);
if (GG_SUCCEEDED(result)) {
CoapEndpoint_OnResponseCompleteCleanup_Blockwise(args);
CancelResponseForBlockwiseArgs *args = (CancelResponseForBlockwiseArgs *) _args;

if (!args->canceled){
// ----------------------------------------------
// args->responseListener may have been freed. We can check its fields for null, but if that memory has been reused
// who knows what might happen.
//--------------------------------------------------
if (args->responseListener->endpoint != NULL &&
args->responseListener->request_handle != GG_COAP_INVALID_REQUEST_HANDLE) {
GG_Result result = GG_CoapEndpoint_CancelBlockwiseRequest(
args->responseListener->endpoint,
args->responseListener->request_handle);
if (GG_SUCCEEDED(result)) {
CoapEndpoint_OnResponseCompleteCleanup_Blockwise(args->responseListener);
}
return result;
}
return result;
return GG_ERROR_INVALID_STATE;
} else {
CoapEndpoint_OnResponseCompleteCleanup_Blockwise(args->responseListener);
return GG_SUCCESS;
}
return GG_ERROR_INVALID_STATE;
}

/**
Expand Down Expand Up @@ -597,20 +608,27 @@ Java_com_fitbit_goldengate_bindings_coap_CoapEndpoint_responseForBlockwise(
* Cancel any pending Coap request
*
* @param _response_listener object holding reference to [CoapResponseListener] creating from responseFor call
* @param _canceled set to true if the ongoing blockwise coap request has been canceled
* @thread any
*/
JNIEXPORT jint
JNICALL
Java_com_fitbit_goldengate_bindings_coap_CoapEndpoint_cancelResponseForBlockwise(
JNIEnv *env,
jobject thiz,
jlong _response_listener
jlong _response_listener,
jboolean _canceled
) {
ResponseListenerBlockwise *response_listener = (ResponseListenerBlockwise *) (intptr_t) _response_listener;
GG_ASSERT(response_listener);

CancelResponseForBlockwiseArgs args = {
.responseListener = response_listener,
.canceled = _canceled
};

GG_Result result;
Loop_InvokeSync(CoapEndpoint_CancelResponseFor_Blockwise, response_listener, &result);
Loop_InvokeSync(CoapEndpoint_CancelResponseFor_Blockwise, &args, &result);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ class CoapEndpoint(customScheduler : Scheduler? = null) : NativeReference, Stack
responseForBlockwise(
selfPtr = thisPointer,
request = request,
responseListener = coapResponseListener
)
responseListener = coapResponseListener)
.also {
coapResponseListener.setCancellable {
cancelResponseForBlockwise(it.nativeResponseListenerReference, !initialized.get())
}
}
} else {
coapResponseListener = SingleCoapResponseListener(request, emitter)
responseFor(
Expand All @@ -78,7 +82,7 @@ class CoapEndpoint(customScheduler : Scheduler? = null) : NativeReference, Stack
*/
if (initialized.get() && responseForResult.resultCode >= 0 && !coapResponseListener.isComplete()) {
if (!request.forceNonBlockwise) {
cancelResponseForBlockwise(responseForResult.nativeResponseListenerReference)
cancelResponseForBlockwise(responseForResult.nativeResponseListenerReference, canceled = false)
} else {
cancelResponseFor(responseForResult.nativeResponseListenerReference)
}
Expand Down Expand Up @@ -218,7 +222,10 @@ class CoapEndpoint(customScheduler : Scheduler? = null) : NativeReference, Stack
responseListener: CoapResponseListener
): ResponseForResult

private external fun cancelResponseForBlockwise(nativeResponseListenerReference: Long)
private external fun cancelResponseForBlockwise(
nativeResponseListenerReference: Long,
canceled: Boolean
)

private external fun addResourceHandler(
selfPtr: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal class BlockwiseCoapResponseListener(
private var completed: Boolean = false
private var started = false
private var data = Data(0)
private var cancelable = {}

private val bodyBehaviorSubject = BehaviorSubject.create<Data>()
private val bodySingle = Single.fromObservable(bodyBehaviorSubject.take(1))
Expand Down Expand Up @@ -75,11 +76,6 @@ internal class BlockwiseCoapResponseListener(
}
if (!started) {
started = true

/**
* TODO: FC-1525 - setting complete here means we can only cancel blockwise coap request
* before first block is received and after first block is received canceling will not work.
*/
completed = true

if (exception != null) {
Expand Down Expand Up @@ -107,6 +103,13 @@ internal class BlockwiseCoapResponseListener(

override fun isComplete() = completed

/**
* Sets a Cancellable for incoming coap response body stream
*/
fun setCancellable(cancellable: () -> Unit) {
this.cancelable = cancellable
}

private fun createIncomingResponse(message: RawResponseMessage): IncomingResponse {
return object : IncomingResponse {
override val responseCode: ResponseCode
Expand All @@ -122,6 +125,10 @@ internal class BlockwiseCoapResponseListener(
get() = object : IncomingBody {
override fun asData(): Single<Data> {
return bodySingle
.doOnDispose {
cancelable()
Timber.d("cancel ongoing blockwise coap request")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ class BlockwiseCoapResponseListenerTest {
}
private val mockSingleEmitter = mock<SingleEmitter<IncomingResponse>>()
private val mockDecoder = mock<ExtendedErrorDecoder>()
private val mockCancellable = mock<() -> Unit>()

private val listener = BlockwiseCoapResponseListener(
mockRequest,
mockSingleEmitter,
mockDecoder
)
).also { it.setCancellable(mockCancellable) }

private val testCode = ResponseCode.created
private val testData = "Hello,".toByteArray()
Expand Down Expand Up @@ -165,6 +166,34 @@ class BlockwiseCoapResponseListenerTest {
verify(mockProgressObserver).onError(any())
}

@Test
fun shouldCallCancellableWhenBodyStreamIsCancelledAfterReceivingFirstBlock() {
listener.onNext(testMessage)

val response = verifySuccessSignalOnResponseEmitter()

// cancel the body stream; set cancelled = true
response.body.asData().test(true)

verify(mockCancellable).invoke()
}

@Test
fun shouldCallCancellableWhenBodyStreamIsCancelledAfterReceivingTwoBlocks() {
val testData2 = " World".toByteArray()
val testMessage2 = RawResponseMessage(testCode, testOptions, testData2)

listener.onNext(testMessage)
listener.onNext(testMessage2)

val response = verifySuccessSignalOnResponseEmitter()

// cancel the body stream; set cancelled = true
response.body.asData().test(true)

verify(mockCancellable).invoke()
}

private fun verifySuccessSignalOnResponseEmitter(): IncomingResponse {
val responseCaptor = argumentCaptor<IncomingResponse>()
verify(mockSingleEmitter).onSuccess(responseCaptor.capture())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class NodeMapper internal constructor(
@Synchronized
fun <K: NodeKey<*>> removeNode(nodeKey:K){
nodeMap[nodeKey]?.let {
Timber.i("Removing the exisiting node with key $nodeKey")
Timber.i("Removing the existing node with key $nodeKey")
it.close()
nodeMap.remove(nodeKey)
}
Expand Down

0 comments on commit 0996351

Please sign in to comment.