Skip to content

Commit

Permalink
introduce new module rsocket-internal-io which is a support module …
Browse files Browse the repository at this point in the history
…for core and transport implementations

This allows to drop using internal declarations, as well as not exposing it to library consumers, until explicitly requested
  • Loading branch information
whyoleg committed Mar 2, 2024
1 parent e5e292b commit ce3426b
Show file tree
Hide file tree
Showing 25 changed files with 151 additions and 74 deletions.
2 changes: 2 additions & 0 deletions rsocket-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ kotlin {
sourceSets {
commonMain {
dependencies {
implementation(projects.rsocketInternalIo)

api(libs.kotlinx.coroutines.core)
api(libs.ktor.io)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,16 +19,17 @@ package io.rsocket.kotlin.frame.io
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*

internal fun ByteReadPacket.readMetadata(pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
val length = readLength()
val length = readInt24()
return readPacket(pool, length)
}

internal fun BytePacketBuilder.writeMetadata(metadata: ByteReadPacket?) {
metadata?.let {
writeLength(it.remaining.toInt())
writeInt24(it.remaining.toInt())
writePacket(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,17 +28,6 @@ internal inline fun <T : Closeable, R> T.closeOnError(block: (T) -> R): R {
}
}

private val onUndeliveredCloseable: (Closeable) -> Unit = Closeable::close

@Suppress("FunctionName")
internal fun <E : Closeable> SafeChannel(capacity: Int): Channel<E> =
Channel(capacity, onUndeliveredElement = onUndeliveredCloseable)

internal fun <E : Closeable> SendChannel<E>.safeTrySend(element: E) {
trySend(element).onFailure { element.close() }
}

internal fun Channel<out Closeable>.fullClose(cause: Throwable?) {
close(cause) // close channel to provide right cause
cancel() // force call of onUndeliveredElement to release buffered elements
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,15 +17,16 @@
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

private val selectFrame: suspend (Frame) -> Frame = { it }

internal class Prioritizer {
private val priorityChannel = SafeChannel<Frame>(Channel.UNLIMITED)
private val commonChannel = SafeChannel<Frame>(Channel.UNLIMITED)
private val priorityChannel = channelForCloseable<Frame>(Channel.UNLIMITED)
private val commonChannel = channelForCloseable<Frame>(Channel.UNLIMITED)

suspend fun send(frame: Frame) {
currentCoroutineContext().ensureActive()
Expand All @@ -43,7 +44,7 @@ internal class Prioritizer {
}

fun close(error: Throwable?) {
priorityChannel.fullClose(error)
commonChannel.fullClose(error)
priorityChannel.cancelWithCause(error)
commonChannel.cancelWithCause(error)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.handler.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand Down Expand Up @@ -77,7 +78,7 @@ internal class RSocketRequester(

val id = streamsStorage.nextId()

val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)
val handler = RequesterRequestStreamFrameHandler(id, streamsStorage, channel, pool)
streamsStorage.save(id, handler)

Expand All @@ -93,7 +94,7 @@ internal class RSocketRequester(

val id = streamsStorage.nextId()

val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)
val limiter = Limiter(0)
val payloadsJob = Job(this@RSocketRequester.coroutineContext.job)
val handler = RequesterRequestChannelFrameHandler(id, streamsStorage, limiter, payloadsJob, channel, pool)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand All @@ -42,7 +43,7 @@ internal class RequesterRequestChannelFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
sender.cancel("Request failed", cause)
}

Expand All @@ -55,7 +56,7 @@ internal class RequesterRequestChannelFrameHandler(
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
sender.cancel("Connection closed", cause)
}

Expand All @@ -78,7 +79,7 @@ internal class RequesterRequestChannelFrameHandler(
if (sender.isCancelled) return false

val isFailed = streamsStorage.remove(id) != null
if (isFailed) channel.fullClose(cause)
if (isFailed) channel.cancelWithCause(cause)
return isFailed
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@ package io.rsocket.kotlin.internal.handler
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.channels.*

Expand All @@ -39,11 +40,11 @@ internal class RequesterRequestStreamFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun onReceiveComplete() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
Expand All @@ -32,7 +33,7 @@ internal class ResponderRequestChannelFrameHandler(
pool: ObjectPool<ChunkBuffer>
) : ResponderFrameHandler(pool), ReceiveFrameHandler {
val limiter = Limiter(initialRequest)
val channel = SafeChannel<Payload>(Channel.UNLIMITED)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED)

@OptIn(ExperimentalStreamsApi::class)
override fun start(payload: Payload): Job = responder.handleRequestChannel(payload, id, this)
Expand All @@ -47,13 +48,13 @@ internal class ResponderRequestChannelFrameHandler(

override fun handleError(cause: Throwable) {
streamsStorage.remove(id)
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun handleCancel() {
streamsStorage.remove(id)
val cancelError = CancellationException("Request cancelled")
channel.fullClose(cancelError)
channel.cancelWithCause(cancelError)
job?.cancel(cancelError)
}

Expand All @@ -62,7 +63,7 @@ internal class ResponderRequestChannelFrameHandler(
}

override fun cleanup(cause: Throwable?) {
channel.fullClose(cause)
channel.cancelWithCause(cause)
}

override fun onSendComplete() {
Expand All @@ -72,7 +73,7 @@ internal class ResponderRequestChannelFrameHandler(

override fun onSendFailed(cause: Throwable): Boolean {
val isFailed = streamsStorage.remove(id) != null
if (isFailed) channel.fullClose(cause)
if (isFailed) channel.cancelWithCause(cause)
return isFailed
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@ import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

@ExperimentalMetadataApi
public fun CompositeMetadata(vararg entries: Metadata): CompositeMetadata =
Expand All @@ -39,7 +40,7 @@ public sealed interface CompositeMetadata : Metadata {
override fun BytePacketBuilder.writeSelf() {
entries.forEach {
writeMimeType(it.mimeType)
writeLength(it.content.remaining.toInt()) //write metadata length
writeInt24(it.content.remaining.toInt()) //write metadata length
writePacket(it.content) //write metadata content
}
}
Expand All @@ -58,7 +59,7 @@ public sealed interface CompositeMetadata : Metadata {
val list = mutableListOf<Entry>()
while (isNotEmpty) {
val type = readMimeType()
val length = readLength()
val length = readInt24()
val packet = readPacket(pool, length)
list.add(Entry(type, packet))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.test.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
Expand All @@ -37,13 +37,13 @@ class TestConnection : Connection, ClientTransport {
override val coroutineContext: CoroutineContext =
Job() + Dispatchers.Unconfined + TestExceptionHandler

private val sendChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
private val receiveChannel = Channel<ByteReadPacket>(Channel.UNLIMITED)
private val sendChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)
private val receiveChannel = channelForCloseable<ByteReadPacket>(Channel.UNLIMITED)

init {
coroutineContext.job.invokeOnCompletion {
sendChannel.close(it)
receiveChannel.fullClose(it)
receiveChannel.cancelWithCause(it)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,17 +18,18 @@ package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.test.*
import kotlin.test.*

internal fun Frame.toPacketWithLength(): ByteReadPacket = buildPacket(InUseTrackingPool) {
val packet = toPacket(InUseTrackingPool)
writeLength(packet.remaining.toInt())
writeInt24(packet.remaining.toInt())
writePacket(packet)
}

internal fun ByteReadPacket.toFrameWithLength(): Frame {
val length = readLength()
val length = readInt24()
assertEquals(length, remaining.toInt())
return readFrame(InUseTrackingPool)
}
Expand Down
10 changes: 10 additions & 0 deletions rsocket-internal-io/api/rsocket-internal-io.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
public final class io/rsocket/kotlin/internal/io/ChannelsKt {
public static final fun cancelWithCause (Lkotlinx/coroutines/channels/Channel;Ljava/lang/Throwable;)V
public static final fun channelForCloseable (I)Lkotlinx/coroutines/channels/Channel;
}

public final class io/rsocket/kotlin/internal/io/Int24Kt {
public static final fun readInt24 (Lio/ktor/utils/io/core/ByteReadPacket;)I
public static final fun writeInt24 (Lio/ktor/utils/io/core/BytePacketBuilder;I)V
}

33 changes: 33 additions & 0 deletions rsocket-internal-io/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins {
id("rsocket.template.library")
id("rsocket.target.all")
}

kotlin {
sourceSets {
commonMain {
dependencies {
api(libs.kotlinx.coroutines.core)
api(libs.ktor.io)
}
}
}
}

description = "rsocket-kotlin internal IO support"
Loading

0 comments on commit ce3426b

Please sign in to comment.