diff --git a/ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/DataChannel.kt b/ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/DataChannel.kt index ba29308c152..0d413aa0687 100644 --- a/ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/DataChannel.kt +++ b/ktor-client/ktor-client-webrtc/android/src/io/ktor/client/webrtc/DataChannel.kt @@ -63,13 +63,17 @@ public class AndroidWebRtcDataChannel( override suspend fun send(text: String) { assertOpen() val buffer = DataChannel.Buffer(Charsets.UTF_8.encode(text), false) - nativeChannel.send(buffer) + if (!nativeChannel.send(buffer)) { + throw WebRtc.IOException("Failed to send text message over data channel '$label'.") + } } override suspend fun send(bytes: ByteArray) { assertOpen() val buffer = DataChannel.Buffer(ByteBuffer.wrap(bytes), true) - nativeChannel.send(buffer) + if (!nativeChannel.send(buffer)) { + throw WebRtc.IOException("Failed to send binary message over data channel '$label'.") + } } override fun setBufferedAmountLowThreshold(threshold: Long) { diff --git a/ktor-client/ktor-client-webrtc/api/jvm/ktor-client-webrtc.api b/ktor-client/ktor-client-webrtc/api/jvm/ktor-client-webrtc.api index 70b2c4b81fe..64f4be07072 100644 --- a/ktor-client/ktor-client-webrtc/api/jvm/ktor-client-webrtc.api +++ b/ktor-client/ktor-client-webrtc/api/jvm/ktor-client-webrtc.api @@ -174,6 +174,11 @@ public final class io/ktor/client/webrtc/WebRtc$DataChannel$State : java/lang/En public static fun values ()[Lio/ktor/client/webrtc/WebRtc$DataChannel$State; } +public class io/ktor/client/webrtc/WebRtc$DataChannelClosedException : io/ktor/client/webrtc/WebRtc$IOException { + public fun (Ljava/lang/String;Ljava/lang/Throwable;)V + public synthetic fun (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V +} + public final class io/ktor/client/webrtc/WebRtc$DegradationPreference : java/lang/Enum { public static final field BALANCED Lio/ktor/client/webrtc/WebRtc$DegradationPreference; public static final field DISABLED Lio/ktor/client/webrtc/WebRtc$DegradationPreference; @@ -190,6 +195,11 @@ public abstract interface class io/ktor/client/webrtc/WebRtc$DtmfSender { public abstract fun insertDtmf (Ljava/lang/String;II)V } +public class io/ktor/client/webrtc/WebRtc$IOException : java/io/IOException { + public fun (Ljava/lang/String;Ljava/lang/Throwable;)V + public synthetic fun (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V +} + public final class io/ktor/client/webrtc/WebRtc$IceCandidate { public static final field Companion Lio/ktor/client/webrtc/WebRtc$IceCandidate$Companion; public fun (Ljava/lang/String;Ljava/lang/String;I)V @@ -487,6 +497,7 @@ public abstract class io/ktor/client/webrtc/WebRtcDataChannel : io/ktor/client/w public fun (Lio/ktor/client/webrtc/DataChannelReceiveOptions;)V public fun close ()V protected final fun emitMessage-JP2dKIU (Lio/ktor/client/webrtc/WebRtc$DataChannel$Message;)Ljava/lang/Object; + public final fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator; public fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun receiveBinary (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun receiveText (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -496,6 +507,10 @@ public abstract class io/ktor/client/webrtc/WebRtcDataChannel : io/ktor/client/w public fun tryReceiveText ()Ljava/lang/String; } +public final class io/ktor/client/webrtc/WebRtcDataChannelKt { + public static final fun withIOException (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + public final class io/ktor/client/webrtc/WebRtcDataChannelOptions { public fun ()V public final fun getId ()Ljava/lang/Integer; diff --git a/ktor-client/ktor-client-webrtc/api/ktor-client-webrtc.klib.api b/ktor-client/ktor-client-webrtc/api/ktor-client-webrtc.klib.api index a96b47e0b9e..fa7f051ecb5 100644 --- a/ktor-client/ktor-client-webrtc/api/ktor-client-webrtc.klib.api +++ b/ktor-client/ktor-client-webrtc/api/ktor-client-webrtc.klib.api @@ -112,6 +112,7 @@ abstract class io.ktor.client.webrtc/WebRtcDataChannel : io.ktor.client.webrtc/W constructor (io.ktor.client.webrtc/DataChannelReceiveOptions) // io.ktor.client.webrtc/WebRtcDataChannel.|(io.ktor.client.webrtc.DataChannelReceiveOptions){}[0] final fun emitMessage(io.ktor.client.webrtc/WebRtc.DataChannel.Message): kotlinx.coroutines.channels/ChannelResult // io.ktor.client.webrtc/WebRtcDataChannel.emitMessage|emitMessage(io.ktor.client.webrtc.WebRtc.DataChannel.Message){}[0] + final fun iterator(): kotlinx.coroutines.channels/ChannelIterator // io.ktor.client.webrtc/WebRtcDataChannel.iterator|iterator(){}[0] final fun stopReceivingMessages() // io.ktor.client.webrtc/WebRtcDataChannel.stopReceivingMessages|stopReceivingMessages(){}[0] open fun tryReceive(): io.ktor.client.webrtc/WebRtc.DataChannel.Message? // io.ktor.client.webrtc/WebRtcDataChannel.tryReceive|tryReceive(){}[0] open fun tryReceiveBinary(): kotlin/ByteArray? // io.ktor.client.webrtc/WebRtcDataChannel.tryReceiveBinary|tryReceiveBinary(){}[0] @@ -689,6 +690,14 @@ final object io.ktor.client.webrtc/WebRtc { // io.ktor.client.webrtc/WebRtc|null final fun hashCode(): kotlin/Int // io.ktor.client.webrtc/WebRtc.Stats.hashCode|hashCode(){}[0] final fun toString(): kotlin/String // io.ktor.client.webrtc/WebRtc.Stats.toString|toString(){}[0] } + + open class DataChannelClosedException : io.ktor.client.webrtc/WebRtc.IOException { // io.ktor.client.webrtc/WebRtc.DataChannelClosedException|null[0] + constructor (kotlin/String, kotlin/Throwable? = ...) // io.ktor.client.webrtc/WebRtc.DataChannelClosedException.|(kotlin.String;kotlin.Throwable?){}[0] + } + + open class IOException : kotlinx.io/IOException { // io.ktor.client.webrtc/WebRtc.IOException|null[0] + constructor (kotlin/String, kotlin/Throwable? = ...) // io.ktor.client.webrtc/WebRtc.IOException.|(kotlin.String;kotlin.Throwable?){}[0] + } } final object io.ktor.client.webrtc/WebRtcMedia { // io.ktor.client.webrtc/WebRtcMedia|null[0] @@ -828,6 +837,7 @@ final object io.ktor.client.webrtc/WebRtcMedia { // io.ktor.client.webrtc/WebRtc } final fun <#A: io.ktor.client.webrtc/WebRtcConfig> io.ktor.client.webrtc/WebRtcClient(io.ktor.client.webrtc/WebRtcClientEngineFactory<#A>, kotlin/Function1<#A, kotlin/Unit> = ...): io.ktor.client.webrtc/WebRtcClient // io.ktor.client.webrtc/WebRtcClient|WebRtcClient(io.ktor.client.webrtc.WebRtcClientEngineFactory<0:0>;kotlin.Function1<0:0,kotlin.Unit>){0§}[0] +final suspend inline fun <#A: kotlin/Any?> io.ktor.client.webrtc/withIOException(crossinline kotlin.coroutines/SuspendFunction0<#A>): #A // io.ktor.client.webrtc/withIOException|withIOException(kotlin.coroutines.SuspendFunction0<0:0>){0§}[0] // Targets: [ios] abstract interface io.ktor.client.webrtc.media/Capturer : kotlin/AutoCloseable { // io.ktor.client.webrtc.media/Capturer|null[0] diff --git a/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtc.kt b/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtc.kt index 557a3cc2cff..33c97c6cf84 100644 --- a/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtc.kt +++ b/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtc.kt @@ -476,6 +476,8 @@ public object WebRtc { * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.send) * * @param text The text message to send. + * @throws DataChannelClosedException if the channel is closed or not open for sending. + * @throws IOException if the message cannot be sent by the underlying WebRTC transport. * @see [MDN RTCDataChannel.send()](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/send) */ public suspend fun send(text: String) @@ -486,6 +488,8 @@ public object WebRtc { * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.send) * * @param bytes The binary data to send. + * @throws DataChannelClosedException if the channel is closed or not open for sending. + * @throws IOException if the message cannot be sent by the underlying WebRTC transport. * @see [MDN RTCDataChannel.send()](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/send) */ public suspend fun send(bytes: ByteArray) @@ -496,6 +500,8 @@ public object WebRtc { * This method will suspend the current coroutine until a message is received. * The message can be either text or binary data. * + * @throws DataChannelClosedException if the channel is closed by this or remote peer. + * * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.receive) */ public suspend fun receive(): Message @@ -593,4 +599,21 @@ public object WebRtc { * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.IceException) */ public class IceException(message: String?, cause: Throwable? = null) : RuntimeException(message, cause) + + /** + * Signals that some I/O exception has occurred. + * + * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.IOException) + */ + public open class IOException(message: String, cause: Throwable? = null) : kotlinx.io.IOException(message, cause) + + /** + * Exception thrown when trying to send to or read from a closed [DataChannel]. + * + * [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannelClosedException) + */ + public open class DataChannelClosedException( + message: String, + cause: Throwable? = null + ) : IOException(message, cause) } diff --git a/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtcDataChannel.kt b/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtcDataChannel.kt index 9b0b4d95d50..8c0faff7636 100644 --- a/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtcDataChannel.kt +++ b/ktor-client/ktor-client-webrtc/common/src/io/ktor/client/webrtc/WebRtcDataChannel.kt @@ -205,7 +205,24 @@ public abstract class WebRtcDataChannel private constructor( receiveChannel = Channel(options = receiveOptions) ) - override suspend fun receive(): WebRtc.DataChannel.Message = receiveChannel.receive() + /** + * Returns an iterator over received messages. + * + * If the data channel is closed, the iterator keeps returning messages already stored in the buffer + * and completes when no buffered messages remain. + */ + public operator fun iterator(): ChannelIterator { + return receiveChannel.iterator() + } + + override suspend fun receive(): WebRtc.DataChannel.Message { + val result = receiveChannel.receiveCatching() + if (result.isSuccess) { + return result.getOrThrow() + } + val message = "Data channel '$label' is closed and no more messages will be received." + throw WebRtc.DataChannelClosedException(message, result.exceptionOrNull()) + } override suspend fun receiveBinary(): ByteArray = receive().binaryOrThrow() @@ -225,3 +242,16 @@ public abstract class WebRtcDataChannel private constructor( receiveChannel.close() } } + +@InternalAPI +public suspend inline fun withIOException(crossinline block: suspend () -> R): R { + return try { + block() + } catch (cause: kotlinx.coroutines.CancellationException) { + throw cause + } catch (cause: WebRtc.IOException) { + throw cause + } catch (cause: Exception) { + throw WebRtc.IOException("Error in WebRtcDataChannel operation", cause) + } +} diff --git a/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelIteratorTest.kt b/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelIteratorTest.kt new file mode 100644 index 00000000000..44430a60d58 --- /dev/null +++ b/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelIteratorTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ + +package io.ktor.client.webrtc + +import kotlinx.coroutines.test.* +import kotlin.test.* + +class WebRtcDataChannelIteratorTest { + + @Test + fun testIteratorDrainsBufferedMessagesAfterChannelIsClosed() = runTest { + val channel = TestWebRtcDataChannel() + channel.emit(WebRtc.DataChannel.Message.Text("first")) + channel.emit(WebRtc.DataChannel.Message.Text("second")) + channel.closeReceiving() + + val messages = channel.toList() + + assertEquals(listOf("first", "second"), messages.map { it.textOrThrow() }) + } + + @Test + fun testIteratorStopsOnClosedChannelWithoutBufferedMessages() = runTest { + val channel = TestWebRtcDataChannel() + channel.closeReceiving() + + assertTrue(channel.toList().isEmpty()) + } + + @Test + fun testIteratorPreservesMessageOrderAndType() = runTest { + val channel = TestWebRtcDataChannel() + val bytes = byteArrayOf(1, 2, 3) + channel.emit(WebRtc.DataChannel.Message.Text("text")) + channel.emit(WebRtc.DataChannel.Message.Binary(bytes)) + channel.closeReceiving() + + val messages = channel.toList() + + assertEquals("text", messages[0].textOrThrow()) + assertContentEquals(bytes, messages[1].binaryOrThrow()) + } + + private suspend fun WebRtcDataChannel.toList(): List = buildList { + for (message in this@toList) add(message) + } + + private class TestWebRtcDataChannel : WebRtcDataChannel(DataChannelReceiveOptions()) { + override val id: Int? = null + override val label: String = "test" + override var state: WebRtc.DataChannel.State = WebRtc.DataChannel.State.OPEN + override val bufferedAmount: Long = 0 + override val bufferedAmountLowThreshold: Long = 0 + override val maxPacketLifeTime: Int? = null + override val maxRetransmits: Int? = null + override val negotiated: Boolean = false + override val ordered: Boolean = true + override val protocol: String = "" + + fun emit(message: WebRtc.DataChannel.Message) { + check(emitMessage(message).isSuccess) + } + + override fun setBufferedAmountLowThreshold(threshold: Long) = TODO() + + override suspend fun send(text: String) = TODO() + + override suspend fun send(bytes: ByteArray) = TODO() + + fun closeReceiving() { + state = WebRtc.DataChannel.State.CLOSED + stopReceivingMessages() + } + + override fun closeTransport() { + closeReceiving() + } + } +} diff --git a/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelTest.kt b/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelTest.kt index 0d7d3ab7630..fdaa59f5a8f 100644 --- a/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelTest.kt +++ b/ktor-client/ktor-client-webrtc/common/test/io/ktor/client/webrtc/WebRtcDataChannelTest.kt @@ -239,10 +239,9 @@ class WebRtcDataChannelTest { dataChannel1.waitForClose(dataChannelEvents1) dataChannel2.waitForClose(dataChannelEvents2) - assertFails { dataChannel1.send("Hello") } - assertFails { dataChannel2.send("Hello") } - assertFails { dataChannel1.receive() } - assertEquals(null, dataChannel1.tryReceive()) + assertFailsWith { dataChannel1.send("Hello") } + assertFailsWith { dataChannel2.send("Hello") } + assertFailsWith { dataChannel1.receive() } assertEquals(null, dataChannel2.tryReceive()) } diff --git a/ktor-client/ktor-client-webrtc/ios/src/io/ktor/client/webrtc/DataChannel.kt b/ktor-client/ktor-client-webrtc/ios/src/io/ktor/client/webrtc/DataChannel.kt index dfcd7c0c279..ad99b4f2bd3 100644 --- a/ktor-client/ktor-client-webrtc/ios/src/io/ktor/client/webrtc/DataChannel.kt +++ b/ktor-client/ktor-client-webrtc/ios/src/io/ktor/client/webrtc/DataChannel.kt @@ -59,23 +59,22 @@ public class IosWebRtcDataChannel( override val protocol: String get() = nativeChannel.protocol() - private fun assertOpen() { - if (!state.canSend()) { - error("Data channel is closed.") - } + private fun requireOpen() { + if (state.canSend()) return + throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.") } override suspend fun send(text: String) { - assertOpen() + requireOpen() if (!nativeChannel.sendData(data = text.toRTCDataBuffer())) { - error("Failed to send text message over DataChannel.") + throw WebRtc.IOException("Failed to send text message over data channel '$label'.") } } override suspend fun send(bytes: ByteArray) { - assertOpen() + requireOpen() if (!nativeChannel.sendData(data = bytes.toRTCDataBuffer())) { - error("Failed to send binary message over DataChannel.") + throw WebRtc.IOException("Failed to send binary message over data channel '$label'.") } } diff --git a/ktor-client/ktor-client-webrtc/ktor-client-webrtc-rs/common/src/io/ktor/client/webrtc/rs/DataChannel.kt b/ktor-client/ktor-client-webrtc/ktor-client-webrtc-rs/common/src/io/ktor/client/webrtc/rs/DataChannel.kt index 3a38c08fef1..4ed47ac59ed 100644 --- a/ktor-client/ktor-client-webrtc/ktor-client-webrtc-rs/common/src/io/ktor/client/webrtc/rs/DataChannel.kt +++ b/ktor-client/ktor-client-webrtc/ktor-client-webrtc-rs/common/src/io/ktor/client/webrtc/rs/DataChannel.kt @@ -1,10 +1,11 @@ /* - * Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. */ package io.ktor.client.webrtc.rs import io.ktor.client.webrtc.* +import io.ktor.utils.io.InternalAPI import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.launch @@ -55,12 +56,21 @@ public class RustWebRtcDataChannel( override val protocol: String get() = inner.protocol() + private fun requireOpen() { + if (state.canSend()) return + throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.") + } + + @OptIn(InternalAPI::class) override suspend fun send(text: String) { - inner.sendText(text) + requireOpen() + withIOException { inner.sendText(text) } } + @OptIn(InternalAPI::class) override suspend fun send(bytes: ByteArray) { - inner.send(bytes) + requireOpen() + withIOException { inner.send(bytes) } } override fun setBufferedAmountLowThreshold(threshold: Long): Unit = runBlocking { diff --git a/ktor-client/ktor-client-webrtc/web/src/io/ktor/client/webrtc/DataChannel.kt b/ktor-client/ktor-client-webrtc/web/src/io/ktor/client/webrtc/DataChannel.kt index e15027effe0..cb5b58e1852 100644 --- a/ktor-client/ktor-client-webrtc/web/src/io/ktor/client/webrtc/DataChannel.kt +++ b/ktor-client/ktor-client-webrtc/web/src/io/ktor/client/webrtc/DataChannel.kt @@ -4,6 +4,7 @@ package io.ktor.client.webrtc +import io.ktor.utils.io.InternalAPI import js.buffer.ArrayBuffer import js.buffer.toByteArray import js.typedarrays.toInt8Array @@ -60,12 +61,21 @@ public class JsWebRtcDataChannel( override val protocol: String get() = channel.protocol + private fun requireOpen() { + if (state.canSend()) return + throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.") + } + + @OptIn(InternalAPI::class) override suspend fun send(text: String) { - channel.send(text) + requireOpen() + withIOException { channel.send(text) } } + @OptIn(InternalAPI::class) override suspend fun send(bytes: ByteArray) { - channel.send(bytes.toInt8Array()) + requireOpen() + withIOException { channel.send(bytes.toInt8Array()) } } override fun setBufferedAmountLowThreshold(threshold: Long) {