diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt index 657dc977..50d14535 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Connect.kt @@ -21,6 +21,9 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.* import io.rsocket.kotlin.core.* import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.operation.* +import io.rsocket.kotlin.internal.sequential.* import io.rsocket.kotlin.transport.* import kotlinx.coroutines.* @@ -35,9 +38,7 @@ internal suspend inline fun connect( @Suppress("DEPRECATION") bufferPool: ObjectPool, ): RSocket { val prioritizer = Prioritizer() - val frameSender = FrameSender(prioritizer, bufferPool, maxFragmentSize) - val streamsStorage = StreamsStorage(isServer, bufferPool) - val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, frameSender) + val streamsStorage = StreamsStorage(isServer) val requestJob = SupervisorJob(connection.coroutineContext[Job]) val requestContext = connection.coroutineContext + requestJob @@ -48,12 +49,22 @@ internal suspend inline fun connect( connectionConfig.setupPayload.close() } + val requesterExecutor = SequentialRequesterOperationExecutor( +// requestsScope = CoroutineScope(requestContext + CoroutineName("rSocket-requester")), + streamsStorage = streamsStorage, + prioritizer = prioritizer, + maxFragmentSize = maxFragmentSize, + bufferPool = bufferPool + ) + val connectionOutbound = SequentialConnectionOutbound(prioritizer) + + val keepAliveHandler = KeepAliveHandler(connectionConfig.keepAlive, connectionOutbound) + val requester = interceptors.wrapRequester( - RSocketRequester( + Requester( requestContext + CoroutineName("rSocket-requester"), - frameSender, - streamsStorage, - bufferPool + requesterExecutor, + connectionOutbound ) ) val requestHandler = interceptors.wrapResponder( @@ -79,6 +90,13 @@ internal suspend inline fun connect( requestHandler.cancel("Connection closed", it) } + val connectionInbound = DefaultConnectionInbound( + CoroutineScope(requestContext + CoroutineName("rSocket-responder")), + requestHandler, + keepAliveHandler, + ) + val connectionFrameHandler = ConnectionFrameHandler(connectionInbound) + // start keepalive ticks (connection + CoroutineName("rSocket-connection-keep-alive")).launch { while (isActive) keepAliveHandler.tick() @@ -93,12 +111,15 @@ internal suspend inline fun connect( (connection + CoroutineName("rSocket-connection-receive")).launch { while (isActive) connection.receiveFrame(bufferPool) { frame -> when (frame.streamId) { - 0 -> when (frame) { - is MetadataPushFrame -> responder.handleMetadataPush(frame.metadata) - is ErrorFrame -> connection.cancel("Error frame received on 0 stream", frame.throwable) - is KeepAliveFrame -> keepAliveHandler.mark(frame) - is LeaseFrame -> frame.close().also { error("lease isn't implemented") } - else -> frame.close() + 0 -> { + connectionFrameHandler.handleFrame(frame) +// when (frame) { +// is MetadataPushFrame -> responder.handleMetadataPush(frame.metadata) +// is ErrorFrame -> connection.cancel("Error frame received on 0 stream", frame.throwable) +// is KeepAliveFrame -> keepAliveHandler.mark(frame) +// is LeaseFrame -> frame.close().also { error("lease isn't implemented") } +// else -> frame.close() +// } } else -> streamsStorage.handleFrame(frame, responder) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt index ad8d1760..7554625b 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/KeepAliveHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package io.rsocket.kotlin.internal import io.ktor.utils.io.core.* import io.rsocket.kotlin.* -import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* import io.rsocket.kotlin.keepalive.* import kotlinx.atomicfu.* import kotlinx.coroutines.* @@ -26,16 +26,16 @@ import kotlin.time.* internal class KeepAliveHandler( private val keepAlive: KeepAlive, - private val sender: FrameSender, + private val outbound: ConnectionOutbound, ) { private val initial = TimeSource.Monotonic.markNow() private fun currentMillis() = initial.elapsedNow().inWholeMilliseconds private val lastMark = atomic(currentMillis()) // mark initial timestamp for keepalive - suspend fun mark(frame: KeepAliveFrame) { + suspend fun mark(respond: Boolean, data: ByteReadPacket) { lastMark.value = currentMillis() - if (frame.respond) sender.sendKeepAlive(false, 0, frame.data) + if (respond) outbound.sendKeepAlive(false, data, 0) } suspend fun tick() { @@ -43,6 +43,6 @@ internal class KeepAliveHandler( if (currentMillis() - lastMark.value >= keepAlive.maxLifetimeMillis) throw RSocketError.ConnectionError("No keep-alive for ${keepAlive.maxLifetimeMillis} ms") - sender.sendKeepAlive(true, 0, ByteReadPacket.Empty) + outbound.sendKeepAlive(true, ByteReadPacket.Empty, 0) } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt index 59d55511..a0de6729 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/Prioritizer.kt @@ -24,6 +24,7 @@ import kotlinx.coroutines.selects.* private val selectFrame: suspend (Frame) -> Frame = { it } +// TODO: refactor to not use frames but packets internal class Prioritizer { private val priorityChannel = channelForCloseable(Channel.UNLIMITED) private val commonChannel = channelForCloseable(Channel.UNLIMITED) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt index 25c3a9f6..e230fa1f 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamId.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ package io.rsocket.kotlin.internal import kotlinx.atomicfu.* +// TODO: move to internal.connection internal class StreamId(streamId: Int) { private val streamId = atomic(streamId) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt index b33128a4..438fccb8 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/StreamsStorage.kt @@ -16,24 +16,24 @@ package io.rsocket.kotlin.internal -import io.ktor.utils.io.core.internal.* -import io.ktor.utils.io.pool.* +import io.ktor.utils.io.core.* import io.rsocket.kotlin.frame.* -import io.rsocket.kotlin.internal.handler.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.operation.* import kotlinx.atomicfu.locks.* +// TODO: move to internal.connection internal class StreamsStorage( private val isServer: Boolean, - @Suppress("DEPRECATION") private val pool: ObjectPool, ) : SynchronizedObject() { private val streamId: StreamId = StreamId(isServer) - private val handlers: IntMap = IntMap() + private val handlers: IntMap = IntMap() fun nextId(): Int = synchronized(this) { streamId.next(handlers) } - fun save(id: Int, handler: FrameHandler) = synchronized(this) { handlers[id] = handler } - fun remove(id: Int): FrameHandler? = synchronized(this) { handlers.remove(id) }?.also(FrameHandler::close) + fun save(id: Int, handler: StreamFrameHandler) = synchronized(this) { handlers[id] = handler } + fun remove(id: Int): StreamFrameHandler? = synchronized(this) { handlers.remove(id) }?.also(Closeable::close) fun contains(id: Int): Boolean = synchronized(this) { id in handlers } - private fun get(id: Int): FrameHandler? = synchronized(this) { handlers[id] } + private fun get(id: Int): StreamFrameHandler? = synchronized(this) { handlers[id] } fun cleanup(error: Throwable?) { val values = synchronized(this) { @@ -41,48 +41,24 @@ internal class StreamsStorage( handlers.clear() values } - values.forEach { - it.cleanup(error) - it.close() - } + values.forEach(Closeable::close) } - fun handleFrame(frame: Frame, responder: RSocketResponder) { + fun handleFrame(frame: Frame, handler: ResponderOperationHandler) { val id = frame.streamId when (frame) { - is RequestNFrame -> get(id)?.handleRequestN(frame.requestN) - is CancelFrame -> get(id)?.handleCancel() - is ErrorFrame -> get(id)?.handleError(frame.throwable) - is RequestFrame -> when { - frame.type == FrameType.Payload -> get(id)?.handleRequest(frame) + is RequestFrame -> when { + frame.type == FrameType.Payload -> get(id)?.handleFrame(frame) ?: frame.close() // release on unknown stream id isServer.xor(id % 2 != 0) -> frame.close() // request frame on wrong stream id else -> { - val initialRequest = frame.initialRequest - val handler = when (frame.type) { - FrameType.RequestFnF -> ResponderFireAndForgetFrameHandler(id, this, responder, pool) - FrameType.RequestResponse -> ResponderRequestResponseFrameHandler(id, this, responder, pool) - FrameType.RequestStream -> ResponderRequestStreamFrameHandler( - id, - this, - responder, - initialRequest, - pool - ) - FrameType.RequestChannel -> ResponderRequestChannelFrameHandler( - id, - this, - responder, - initialRequest, - pool - ) - else -> error("Wrong request frame type") // should never happen - } + val handler = StreamFrameHandler(handler.handle(frame, TODO())) save(id, handler) - handler.handleRequest(frame) + handler.handleFrame(frame) } } - else -> frame.close() + + else -> get(id)?.handleFrame(frame) ?: frame.close() // release on unknown stream id } } } diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionEstablishmentHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionEstablishmentHandler.kt new file mode 100644 index 00000000..33af7c08 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionEstablishmentHandler.kt @@ -0,0 +1,84 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +// TODO: change package to internal.transport? + +// TODO? +//internal abstract class ConnectionEstablishmentHandler { +// abstract suspend fun receiveFrame(): ByteReadPacket +// abstract suspend fun sendFrame(frame: ByteReadPacket) +//} +// +///* +// Client setup: +// 1. configure +// 2. send setup frame (or resume + await resume ok frame) +// 3. done +// +// Server setup: +// 1. await setup frame (or resume) +// 2. configure +// 3. done | send error (or send resume ok frame) +// */ +// +///* +// done means: +// 1. start receiving frames/streams +// 2. create requester +// */ +// +//@RSocketTransportApi +//internal suspend fun RSocketClientTransport.connect( +// isClient: Boolean, +// maxFragmentSize: Int, +// interceptors: Interceptors, +// connectionConfig: ConnectionConfig, +// acceptor: ConnectionAcceptor, +//) { +// suspend fun setup(handler: ConnectionEstablishmentHandler) { +// val setupFrame = SetupFrame( +// version = Version.Current, +// honorLease = false, +// keepAlive = connectionConfig.keepAlive, +// resumeToken = null, +// payloadMimeType = connectionConfig.payloadMimeType, +// payload = connectionConfig.setupPayload.copy() //copy needed, as it can be used in acceptor +// ).toPacket(ChunkBuffer.Pool) +// handler.sendFrame(setupFrame) +// // done +// } +// +// when (val connection = connect()) { +// is RSocketTransportConnection.Multiplexed -> { +// val connectionStream = connection.createStream() +// connectionStream.prioritize() +// +// while (true) { +// val stream = connection.awaitStream() +// launch { +// val firstFrame = stream.receiveFrame() +// } +// } +// } +// +// is RSocketTransportConnection.Sequential -> { +// setup(SequentialConnectionEstablishmentHandler(connection)) +// +// } +// } +//} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionFrameHandler.kt new file mode 100644 index 00000000..ec5def65 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionFrameHandler.kt @@ -0,0 +1,39 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +import io.rsocket.kotlin.frame.* + +internal class ConnectionFrameHandler( + private val inbound: ConnectionInbound, +) { + suspend fun handleFrame(frame: Frame): Boolean = when (frame) { + is MetadataPushFrame -> inbound.receiveMetadataPush(frame.metadata) + is KeepAliveFrame -> inbound.receiveKeepAlive(frame.respond, frame.data, frame.lastPosition) + is ErrorFrame -> inbound.receiveError(frame.throwable) + is LeaseFrame -> { + frame.close() + TODO("lease is not supported") + } + + else -> { + frame.close() + false // wrong frame + } + } +} + diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionInbound.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionInbound.kt new file mode 100644 index 00000000..26c6bfa9 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionInbound.kt @@ -0,0 +1,50 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import kotlinx.coroutines.* + +internal abstract class ConnectionInbound { + abstract suspend fun receiveMetadataPush(metadata: ByteReadPacket): Boolean + abstract suspend fun receiveKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long): Boolean + abstract fun receiveError(cause: Throwable): Boolean +} + +internal class DefaultConnectionInbound( + private val requestsScope: CoroutineScope, + private val responder: RSocket, + private val keepAliveHandler: KeepAliveHandler, +) : ConnectionInbound() { + override suspend fun receiveMetadataPush(metadata: ByteReadPacket): Boolean { + requestsScope.launch { + responder.metadataPush(metadata) + } + return false + } + + override suspend fun receiveKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long): Boolean { + keepAliveHandler.mark(respond, data) + return false + } + + override fun receiveError(cause: Throwable): Boolean { + TODO("Not yet implemented") + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionOutbound.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionOutbound.kt new file mode 100644 index 00000000..e7f8ecc2 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/ConnectionOutbound.kt @@ -0,0 +1,36 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.frame.* + +internal abstract class ConnectionOutbound { + protected abstract suspend fun sendFrame(frame: Frame) + + suspend fun sendMetadataPush(metadata: ByteReadPacket) { + sendFrame(MetadataPushFrame(metadata)) + } + + suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) { + sendFrame(KeepAliveFrame(respond, lastPosition, data)) + } + + suspend fun sendError(cause: Throwable) { + sendFrame(ErrorFrame(0, cause)) + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameHandler.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameHandler.kt new file mode 100644 index 00000000..08c06f3e --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameHandler.kt @@ -0,0 +1,121 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.internal.transport.StreamInbound +import io.rsocket.kotlin.payload.* + +internal class StreamFrameHandler( + private val inbound: StreamInbound, +) : Closeable { + private var hasPayload: Boolean = false + private var hasMetadata: Boolean = false + private val data = BytePacketBuilder(NoPool) + private val metadata = BytePacketBuilder(NoPool) + + override fun close() { + data.close() + metadata.close() + } + + private fun appendFragment(fragment: Payload) { + hasPayload = true + data.writePacket(fragment.data) + when (val meta = fragment.metadata) { + null -> {} + else -> { + hasMetadata = true + metadata.writePacket(meta) + } + } + } + + private fun assemblePayload(): Payload? { + if (!hasPayload) return null + + val payload = Payload( + data = data.build(), + metadata = when { + hasMetadata -> metadata.build() + else -> null + } + ) + hasMetadata = false + hasPayload = false + return payload + } + + // TODO use BRP here? + fun handleFrame(frame: Frame): ReceiveResult = when (frame) { + is CancelFrame -> inbound.receiveCancel() + is ErrorFrame -> inbound.receiveError(frame.throwable) + is RequestNFrame -> inbound.receiveRequestN(frame.requestN) + is RequestFrame -> receivePayload(frame.payload, frame.complete, frame.follows) + else -> { + frame.close() + ReceiveResult.WrongFrame + } + } + + // todo suspend variant? + // TODO: if there are no fragments saved and there are no following - we can ignore going through buffer + // TODO: really, fragment could be NULL when `complete` is true, but `next` is false + private fun receivePayload( + fragment: Payload, + complete: Boolean, + follows: Boolean, + ): ReceiveResult { + appendFragment(fragment) + return when { + complete -> when (val payload = assemblePayload()) { + null -> inbound.receiveComplete() + else -> inbound.receiveNext(payload, complete = true) + } + + else -> when { + follows -> ReceiveResult.Continue // not all fragments received + else -> when (val payload = assemblePayload()) { + null -> error("protocol violation: next or complete flags should be set") + else -> inbound.receiveNext(payload, complete = false) + } + } + } + } +} + +@Suppress("DEPRECATION") +private object NoPool : ObjectPool { + override val capacity: Int + get() = error("should not be called") + + override fun borrow(): ChunkBuffer { + error("should not be called") + } + + override fun dispose() { + error("should not be called") + } + + override fun recycle(instance: ChunkBuffer) { + error("should not be called") + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameSender.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameSender.kt new file mode 100644 index 00000000..5ba14543 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamFrameSender.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.connection + +import io.rsocket.kotlin.frame.* + +internal abstract class StreamFrameSender { + // TODO use BRP here? + abstract suspend fun sendFrame(frame: Frame) +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamOutbound.kt similarity index 61% rename from rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt rename to rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamOutbound.kt index 6c70202b..b700d368 100644 --- a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/FrameSender.kt +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/connection/StreamOutbound.kt @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.rsocket.kotlin.internal +package io.rsocket.kotlin.internal.connection import io.ktor.utils.io.core.* import io.ktor.utils.io.core.internal.* @@ -22,7 +22,6 @@ import io.ktor.utils.io.pool.* import io.rsocket.kotlin.frame.* import io.rsocket.kotlin.frame.io.* import io.rsocket.kotlin.payload.* -import kotlinx.coroutines.* import kotlin.math.* private const val lengthSize = 3 @@ -30,50 +29,73 @@ private const val headerSize = 6 private const val fragmentOffset = lengthSize + headerSize private const val fragmentOffsetWithMetadata = fragmentOffset + lengthSize -internal class FrameSender( - private val prioritizer: Prioritizer, - @Suppress("DEPRECATION") private val pool: ObjectPool, +internal class StreamOutbound( + private val sender: StreamFrameSender, + private val streamId: Int, private val maxFragmentSize: Int, + @Suppress("DEPRECATION") private val bufferPool: ObjectPool, ) { + private suspend fun sendFrame(frame: Frame): Unit = sender.sendFrame(frame) - suspend fun sendKeepAlive(respond: Boolean, lastPosition: Long, data: ByteReadPacket): Unit = - prioritizer.send(KeepAliveFrame(respond, lastPosition, data)) - - suspend fun sendMetadataPush(metadata: ByteReadPacket): Unit = prioritizer.send(MetadataPushFrame(metadata)) - - suspend fun sendCancel(id: Int): Unit = withContext(NonCancellable) { prioritizer.send(CancelFrame(id)) } - suspend fun sendError(id: Int, throwable: Throwable): Unit = - withContext(NonCancellable) { prioritizer.send(ErrorFrame(id, throwable)) } + suspend fun sendCancel() { + sendFrame(CancelFrame(streamId)) + } - suspend fun sendRequestN(id: Int, n: Int): Unit = prioritizer.send(RequestNFrame(id, n)) + suspend fun sendError(cause: Throwable) { + sendFrame(ErrorFrame(streamId, cause)) + } - suspend fun sendRequestPayload(type: FrameType, streamId: Int, payload: Payload, initialRequest: Int = 0) { - sendFragmented(type, streamId, payload, false, false, initialRequest) + suspend fun sendComplete() { + sendFrame( + RequestFrame( + type = FrameType.Payload, + streamId = streamId, + follows = false, + complete = true, + next = false, + initialRequest = 0, + payload = Payload.Empty + ) + ) } - suspend fun sendNextPayload(streamId: Int, payload: Payload) { - sendFragmented(FrameType.Payload, streamId, payload, false, true, 0) + suspend fun sendNextPayload(payload: Payload, complete: Boolean) { + sendRequestPayload( + type = FrameType.Payload, + payload = payload, + complete = complete, + initialRequest = 0 + ) {} } - suspend fun sendNextCompletePayload(streamId: Int, payload: Payload) { - sendFragmented(FrameType.Payload, streamId, payload, true, true, 0) + suspend fun sendRequestN(requestN: Int) { + sendFrame(RequestNFrame(streamId, requestN)) } - suspend fun sendCompletePayload(streamId: Int) { - sendFragmented(FrameType.Payload, streamId, Payload.Empty, true, false, 0) + suspend inline fun sendRequest( + type: FrameType, + payload: Payload, + initialRequest: Int, + onFirstFrameSent: () -> Unit, + ) { + sendRequestPayload(type, payload, complete = false, initialRequest, onFirstFrameSent) + //send first + //on first + //send fragments } - private suspend fun sendFragmented( + private suspend inline fun sendRequestPayload( type: FrameType, - streamId: Int, payload: Payload, complete: Boolean, - next: Boolean, - initialRequest: Int + initialRequest: Int, + onFirstFrameSent: () -> Unit, ) { - //TODO release on fail ? + // TODO rework/simplify later + // TODO release on fail ? if (!payload.isFragmentable(type.hasInitialRequest)) { - prioritizer.send(RequestFrame(type, streamId, false, complete, next, initialRequest, payload)) + sendFrame(RequestFrame(type, streamId, false, complete, true, initialRequest, payload)) + onFirstFrameSent() return } @@ -91,13 +113,13 @@ internal class FrameSender( if (!first) remaining -= lengthSize val length = min(metadata.remaining.toInt(), remaining) remaining -= length - metadata.readPacket(pool, length) + metadata.readPacket(bufferPool, length) } else null val dataFragment = if (remaining > 0 && data.isNotEmpty) { val length = min(data.remaining.toInt(), remaining) remaining -= length - data.readPacket(pool, length) + data.readPacket(bufferPool, length) } else { ByteReadPacket.Empty } @@ -105,7 +127,7 @@ internal class FrameSender( val fType = if (first && type.isRequestType) type else FrameType.Payload val fragment = Payload(dataFragment, metadataFragment) val follows = metadata != null && metadata.isNotEmpty || data.isNotEmpty - prioritizer.send( + sendFrame( RequestFrame( type = fType, streamId = streamId, @@ -116,13 +138,14 @@ internal class FrameSender( payload = fragment ) ) + if (first) onFirstFrameSent() first = false remaining = fragmentSize } while (follows) } private fun Payload.isFragmentable(hasInitialRequest: Boolean) = when (maxFragmentSize) { - 0 -> false + 0 -> false else -> when (val meta = metadata) { null -> data.remaining > maxFragmentSize - fragmentOffset - (if (hasInitialRequest) Int.SIZE_BYTES else 0) else -> data.remaining + meta.remaining > maxFragmentSize - fragmentOffsetWithMetadata - (if (hasInitialRequest) Int.SIZE_BYTES else 0) diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/multiplexed/Outbounds.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/multiplexed/Outbounds.kt new file mode 100644 index 00000000..48d54463 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/multiplexed/Outbounds.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.multiplexed + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.transport.* + +internal class MultiplexedStreamOutbound( + private val prioritizer: Prioritizer, + maxFragmentSize: Int, + @Suppress("DEPRECATION") bufferPool: ObjectPool, + streamId: Int, +) : StreamOutbound(streamId, maxFragmentSize, bufferPool) { + override suspend fun sendFrame(frame: Frame): Unit = prioritizer.send(frame) +} + +@RSocketTransportApi +internal class MultiplexedConnectionOutbound( + private val stream: RSocketTransportSession.Multiplexed.Stream, +) : ConnectionOutbound() { + override suspend fun sendFrame(frame: Frame): Unit { + stream.sendFrame(frame.toPacket()) + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Operations.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Operations.kt new file mode 100644 index 00000000..86f5038d --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Operations.kt @@ -0,0 +1,92 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* + +internal abstract class RequesterOperation : StreamInbound { + abstract suspend fun execute(outbound: StreamOutbound) +} + +internal abstract class ResponderOperation : StreamInbound { + abstract suspend fun execute( + outbound: StreamOutbound, + responder: RSocket, + requestPayload: Payload, + complete: Boolean, + ) + + abstract fun receiveRequest(complete: Boolean): ReceiveResult + final override fun receiveCancel(): ReceiveResult = ReceiveResult.Done +} + +internal abstract class RequesterOperationExecutor { + abstract suspend fun execute(operation: RequesterOperation) +} + +internal class ResponderOperationHandler( + private val requestsScope: CoroutineScope, + private val responder: RSocket, +) { + fun handle(frame: RequestFrame, outbound: StreamOutbound): StreamInbound { + val operation = when (frame.type) { + FrameType.RequestFnF -> ResponderFireAndForgetOperation + FrameType.RequestResponse -> ResponderRequestResponseOperation + FrameType.RequestStream -> ResponderRequestStreamOperation(frame.initialRequest) + FrameType.RequestChannel -> TODO() + else -> error("wrong frame type") // TODO: enforce it + } + return ResponderOperationWrapper( + requestsScope = requestsScope, + operation = operation, + outbound = outbound, + responder = responder + ) + } +} + +private class ResponderOperationWrapper( + private val requestsScope: CoroutineScope, + private val operation: ResponderOperation, + private val outbound: StreamOutbound, + private val responder: RSocket, +) : StreamInbound() { + private var requestJob: Job? = null + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult = when (requestJob) { + null -> { + requestJob = requestsScope.launch { + operation.execute(outbound, responder, payload, complete) + } + operation.receiveRequest(complete) + } + + else -> operation.receiveNext(payload, complete) + } + + override fun receiveCancel(): ReceiveResult { + requestJob?.cancel("Request cancelled") + return ReceiveResult.Done + } + + override fun receiveComplete(): ReceiveResult = operation.receiveComplete() + override fun receiveError(cause: Throwable): ReceiveResult = operation.receiveError(cause) + override fun receiveRequestN(requestN: Int): ReceiveResult = operation.receiveRequestN(requestN) +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Requester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Requester.kt new file mode 100644 index 00000000..a07d76f3 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/Requester.kt @@ -0,0 +1,110 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* + +internal class Requester( + override val coroutineContext: CoroutineContext, + private val executor: RequesterOperationExecutor, + private val connectionOutbound: ConnectionOutbound, +) : RSocket { + private fun execute(operation: RequesterOperation): Job = launch { executor.execute(operation) } + + override suspend fun metadataPush(metadata: ByteReadPacket) { + // TODO: launch? + connectionOutbound.sendMetadataPush(metadata) + } + + override suspend fun fireAndForget(payload: Payload) { + ensureActiveOrClose(payload) + + return suspendCancellableCoroutine { cont -> + val requestJob = execute(RequesterFireAndForgetOperation(payload, cont)) + cont.invokeOnCancellation { + requestJob.cancel("Request cancelled", it) + } + } + } + + override suspend fun requestResponse(payload: Payload): Payload { + ensureActiveOrClose(payload) + + val deferred = CompletableDeferred() + + val requestJob = execute(RequesterRequestResponseOperation(payload, deferred)) + try { + deferred.join() + } catch (cause: Throwable) { + deferred.cancel() + requestJob.cancel("Request cancelled", cause) + throw cause + } + + return deferred.await() + } + + @OptIn(ExperimentalStreamsApi::class) + override fun requestStream(payload: Payload): Flow = requestFlow { strategy, initialRequest -> + ensureActiveOrClose(payload) + val channel = channelForCloseable(Channel.UNLIMITED) // TODO: should be configurable + val requests = Channel(Channel.UNLIMITED) + channel.consume { + val requestJob = execute(RequesterRequestStreamOperation(payload, initialRequest, requests, channel)) + val error: Throwable? + try { + while (true) { + val result = channel.receiveCatching() + if (result.isClosed) { + error = result.exceptionOrNull() + break + } + emit(result.getOrThrow()) + + @OptIn(DelicateCoroutinesApi::class) + if (!requests.isClosedForSend) { + val next = strategy.nextRequest() + if (next > 0) requests.trySend(next) + } + } + } catch (cause: Throwable) { + requestJob.cancel("Request cancelled", cause) + throw cause + } finally { + requests.cancel() + } + throw error ?: return@consume + } + } + + // TODO: after removing CoroutineScope from RSocket move this to OperationExecutor + private suspend inline fun ensureActiveOrClose(closeable: Closeable) { + if (currentCoroutineContext().isActive && isActive) return + closeable.close() + currentCoroutineContext().ensureActive() + ensureActive() + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterFireAndForgetOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterFireAndForgetOperation.kt new file mode 100644 index 00000000..66c9e983 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterFireAndForgetOperation.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +internal class RequesterFireAndForgetOperation( + private val requestPayload: Payload, + private val completionContinuation: CancellableContinuation, +) : RequesterOperation() { + override suspend fun execute(outbound: StreamOutbound) { + try { + outbound.sendRequest(FrameType.RequestFnF, requestPayload, 0) {} + completionContinuation.resume(Unit) + } catch (cause: Throwable) { + completionContinuation.resumeWithException(cause) + throw cause + } + } + + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveError(cause: Throwable): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveComplete(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveCancel(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveRequestN(requestN: Int): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestResponseOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestResponseOperation.kt new file mode 100644 index 00000000..55b5f14e --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestResponseOperation.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* + +internal class RequesterRequestResponseOperation( + private val requestPayload: Payload, + private val responsePayloadDeferred: CompletableDeferred, +) : RequesterOperation() { + override suspend fun execute(outbound: StreamOutbound) { + var requestFrameSent = false + try { + outbound.sendRequest(FrameType.RequestResponse, requestPayload, 0) { + requestFrameSent = true + } + responsePayloadDeferred.join() + } catch (cause: Throwable) { + if (requestFrameSent) withContext(NonCancellable) { + outbound.sendCancel() + } + throw cause + } + } + + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult { + if (!responsePayloadDeferred.complete(payload)) { + payload.close() + } + return ReceiveResult.Done + } + + override fun receiveError(cause: Throwable): ReceiveResult { + responsePayloadDeferred.completeExceptionally(cause) + return ReceiveResult.Done + } + + override fun receiveComplete(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveCancel(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveRequestN(requestN: Int): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestStreamOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestStreamOperation.kt new file mode 100644 index 00000000..9c4932da --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/RequesterRequestStreamOperation.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlin.coroutines.* + +internal class RequesterRequestStreamOperation( + private val requestPayload: Payload, + private val initialRequestN: Int, + private val requestNs: ReceiveChannel, + private val responsePayloadChannel: SendChannel, +) : RequesterOperation() { + + override suspend fun execute(outbound: StreamOutbound): Unit = coroutineScope { + var requestFrameSent = false + try { + outbound.sendRequest(FrameType.RequestStream, requestPayload, initialRequestN) { + requestFrameSent = true + } + launch { + try { + requestNs.consumeEach { + outbound.sendRequestN(it) + } + } catch (ignored: Throwable) { + // ignore failures + } + } + suspendCancellableCoroutine { cont -> + responsePayloadChannel.invokeOnClose { cont.resume(Unit) } + } + } catch (cause: Throwable) { + if (requestFrameSent) withContext(NonCancellable) { + outbound.sendCancel() + } + throw cause + } + } + + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult { + if (responsePayloadChannel.trySend(payload).isFailure) { + payload.close() + return ReceiveResult.Done + } + if (complete) return receiveComplete() + return ReceiveResult.Continue + } + + override fun receiveComplete(): ReceiveResult { + responsePayloadChannel.close() + requestNs.cancel() // no more requestN can be sent + return ReceiveResult.Done + } + + override fun receiveError(cause: Throwable): ReceiveResult { + responsePayloadChannel.close(cause) + return ReceiveResult.Done + } + + override fun receiveCancel(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveRequestN(requestN: Int): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderFireAndForgetOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderFireAndForgetOperation.kt new file mode 100644 index 00000000..518ed688 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderFireAndForgetOperation.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* + +internal object ResponderFireAndForgetOperation : ResponderOperation() { + override suspend fun execute( + outbound: StreamOutbound, + responder: RSocket, + requestPayload: Payload, + complete: Boolean, + ) { + responder.fireAndForget(requestPayload) + } + + override fun receiveRequest(complete: Boolean): ReceiveResult = ReceiveResult.Done + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveComplete(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveError(cause: Throwable): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveRequestN(requestN: Int): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestResponseOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestResponseOperation.kt new file mode 100644 index 00000000..7c5c0d1d --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestResponseOperation.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* + +internal object ResponderRequestResponseOperation : ResponderOperation() { + override suspend fun execute( + outbound: StreamOutbound, + responder: RSocket, + requestPayload: Payload, + complete: Boolean, + ) { + try { + val response = responder.requestResponse(requestPayload) + outbound.sendNext(response, true) + } catch (cause: Throwable) { + if (currentCoroutineContext().isActive) outbound.sendError(cause) + throw cause + } + } + + override fun receiveRequest(complete: Boolean): ReceiveResult = ReceiveResult.Done + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveComplete(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveError(cause: Throwable): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveRequestN(requestN: Int): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestStreamOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestStreamOperation.kt new file mode 100644 index 00000000..4c96d6f0 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/ResponderRequestStreamOperation.kt @@ -0,0 +1,53 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* + +internal class ResponderRequestStreamOperation(initialRequest: Int) : ResponderOperation() { + private val limiter = Limiter(initialRequest) + override suspend fun execute( + outbound: StreamOutbound, + responder: RSocket, + requestPayload: Payload, + complete: Boolean, + ) { + try { + responder.requestStream(requestPayload).collectLimiting(limiter) { payload -> + outbound.sendNext(payload, complete = false) + } + outbound.sendComplete() + } catch (cause: Throwable) { + if (currentCoroutineContext().isActive) outbound.sendError(cause) + throw cause + } + } + + override fun receiveRequest(complete: Boolean): ReceiveResult = ReceiveResult.Continue + override fun receiveRequestN(requestN: Int): ReceiveResult { + limiter.updateRequests(requestN) + return ReceiveResult.Continue + } + + override fun receiveNext(payload: Payload, complete: Boolean): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveComplete(): ReceiveResult = ReceiveResult.WrongFrame + override fun receiveError(cause: Throwable): ReceiveResult = ReceiveResult.WrongFrame +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/responder.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/responder.kt new file mode 100644 index 00000000..abb0f906 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/operation/responder.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.operation + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.transport.* +import io.rsocket.kotlin.transport.* +import kotlinx.coroutines.* + +@Suppress("UNREACHABLE_CODE") +@OptIn(RSocketTransportApi::class) +internal suspend fun temp( + connection: RSocketTransportSession.Multiplexed, + handler: ResponderOperationHandler, +) { + val stream = connection.awaitStream() + connection.launch { + val firstFrame = stream.receiveFrame().readFrame(ChunkBuffer.Pool) + firstFrame as RequestFrame + val reqeustId = firstFrame.streamId + + handler.handle(firstFrame, TODO()).use { handler -> + when (handler.handleFrame(firstFrame)) { + ReceiveResult.WrongFrame -> TODO("Should not happen: wrong frame on request") + ReceiveResult.Done -> {} + ReceiveResult.Continue -> { + while (true) { + val frame = stream.receiveFrame().readFrame(ChunkBuffer.Pool) + + when (handler.handleFrame(frame)) { + ReceiveResult.WrongFrame -> {} + ReceiveResult.Continue -> {} + ReceiveResult.Done -> return@launch + } + } + } + } + } + } +} + +@OptIn(RSocketTransportApi::class) +internal suspend fun temp( + connection: RSocketTransportConnection.Sequential, + responder: RSocket, +) { + val firstFrame = connection.receiveFrame().readFrame(ChunkBuffer.Pool) + firstFrame as RequestFrame + val reqeustId = firstFrame.streamId + + while (true) { + val frame = connection.receiveFrame().readFrame(ChunkBuffer.Pool) + + + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/Outbounds.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/Outbounds.kt new file mode 100644 index 00000000..cd58871c --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/Outbounds.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.sequential + +import io.rsocket.kotlin.frame.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.connection.* + +internal class SequentialStreamFrameSender( + private val prioritizer: Prioritizer, +) : StreamFrameSender() { + override suspend fun sendFrame(frame: Frame): Unit = prioritizer.send(frame) +} + +internal class SequentialConnectionOutbound( + private val prioritizer: Prioritizer, +) : ConnectionOutbound() { + override suspend fun sendFrame(frame: Frame): Unit = prioritizer.send(frame) +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/SequentialRequesterOperationExecutor.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/SequentialRequesterOperationExecutor.kt new file mode 100644 index 00000000..1305ffac --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/internal/sequential/SequentialRequesterOperationExecutor.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.internal.sequential + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.connection.* +import io.rsocket.kotlin.internal.operation.* + +internal class SequentialRequesterOperationExecutor( + private val streamsStorage: StreamsStorage, + private val prioritizer: Prioritizer, + private val maxFragmentSize: Int, + @Suppress("DEPRECATION") private val bufferPool: ObjectPool, +) : RequesterOperationExecutor() { + override suspend fun execute(operation: RequesterOperation) { + val streamId = streamsStorage.nextId() + streamsStorage.save( + streamId, + StreamFrameHandler(operation) + ) + try { + operation.execute( + StreamOutbound( + SequentialStreamFrameSender(prioritizer), streamId, maxFragmentSize, bufferPool + ) + ) + } finally { + streamsStorage.remove(streamId) + } + } +}