Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,17 @@ public class AndroidWebRtcDataChannel(
override suspend fun send(text: String) {
assertOpen()
val buffer = DataChannel.Buffer(Charsets.UTF_8.encode(text), false)
nativeChannel.send(buffer)
if (!nativeChannel.send(buffer)) {
throw WebRtc.IOException("Failed to send text message over data channel '$label'.")
}
}

override suspend fun send(bytes: ByteArray) {
assertOpen()
val buffer = DataChannel.Buffer(ByteBuffer.wrap(bytes), true)
nativeChannel.send(buffer)
if (!nativeChannel.send(buffer)) {
throw WebRtc.IOException("Failed to send binary message over data channel '$label'.")
}
}

override fun setBufferedAmountLowThreshold(threshold: Long) {
Expand Down
15 changes: 15 additions & 0 deletions ktor-client/ktor-client-webrtc/api/jvm/ktor-client-webrtc.api
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ public final class io/ktor/client/webrtc/WebRtc$DataChannel$State : java/lang/En
public static fun values ()[Lio/ktor/client/webrtc/WebRtc$DataChannel$State;
}

public class io/ktor/client/webrtc/WebRtc$DataChannelClosedException : io/ktor/client/webrtc/WebRtc$IOException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/client/webrtc/WebRtc$DegradationPreference : java/lang/Enum {
public static final field BALANCED Lio/ktor/client/webrtc/WebRtc$DegradationPreference;
public static final field DISABLED Lio/ktor/client/webrtc/WebRtc$DegradationPreference;
Expand All @@ -190,6 +195,11 @@ public abstract interface class io/ktor/client/webrtc/WebRtc$DtmfSender {
public abstract fun insertDtmf (Ljava/lang/String;II)V
}

public class io/ktor/client/webrtc/WebRtc$IOException : java/io/IOException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/client/webrtc/WebRtc$IceCandidate {
public static final field Companion Lio/ktor/client/webrtc/WebRtc$IceCandidate$Companion;
public fun <init> (Ljava/lang/String;Ljava/lang/String;I)V
Expand Down Expand Up @@ -487,6 +497,7 @@ public abstract class io/ktor/client/webrtc/WebRtcDataChannel : io/ktor/client/w
public fun <init> (Lio/ktor/client/webrtc/DataChannelReceiveOptions;)V
public fun close ()V
protected final fun emitMessage-JP2dKIU (Lio/ktor/client/webrtc/WebRtc$DataChannel$Message;)Ljava/lang/Object;
public final fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun receiveBinary (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun receiveText (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand All @@ -496,6 +507,10 @@ public abstract class io/ktor/client/webrtc/WebRtcDataChannel : io/ktor/client/w
public fun tryReceiveText ()Ljava/lang/String;
}

public final class io/ktor/client/webrtc/WebRtcDataChannelKt {
public static final fun withIOException (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/client/webrtc/WebRtcDataChannelOptions {
public fun <init> ()V
public final fun getId ()Ljava/lang/Integer;
Expand Down
10 changes: 10 additions & 0 deletions ktor-client/ktor-client-webrtc/api/ktor-client-webrtc.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ abstract class io.ktor.client.webrtc/WebRtcDataChannel : io.ktor.client.webrtc/W
constructor <init>(io.ktor.client.webrtc/DataChannelReceiveOptions) // io.ktor.client.webrtc/WebRtcDataChannel.<init>|<init>(io.ktor.client.webrtc.DataChannelReceiveOptions){}[0]

final fun emitMessage(io.ktor.client.webrtc/WebRtc.DataChannel.Message): kotlinx.coroutines.channels/ChannelResult<kotlin/Unit> // io.ktor.client.webrtc/WebRtcDataChannel.emitMessage|emitMessage(io.ktor.client.webrtc.WebRtc.DataChannel.Message){}[0]
final fun iterator(): kotlinx.coroutines.channels/ChannelIterator<io.ktor.client.webrtc/WebRtc.DataChannel.Message> // io.ktor.client.webrtc/WebRtcDataChannel.iterator|iterator(){}[0]
final fun stopReceivingMessages() // io.ktor.client.webrtc/WebRtcDataChannel.stopReceivingMessages|stopReceivingMessages(){}[0]
open fun tryReceive(): io.ktor.client.webrtc/WebRtc.DataChannel.Message? // io.ktor.client.webrtc/WebRtcDataChannel.tryReceive|tryReceive(){}[0]
open fun tryReceiveBinary(): kotlin/ByteArray? // io.ktor.client.webrtc/WebRtcDataChannel.tryReceiveBinary|tryReceiveBinary(){}[0]
Expand Down Expand Up @@ -689,6 +690,14 @@ final object io.ktor.client.webrtc/WebRtc { // io.ktor.client.webrtc/WebRtc|null
final fun hashCode(): kotlin/Int // io.ktor.client.webrtc/WebRtc.Stats.hashCode|hashCode(){}[0]
final fun toString(): kotlin/String // io.ktor.client.webrtc/WebRtc.Stats.toString|toString(){}[0]
}

open class DataChannelClosedException : io.ktor.client.webrtc/WebRtc.IOException { // io.ktor.client.webrtc/WebRtc.DataChannelClosedException|null[0]
constructor <init>(kotlin/String, kotlin/Throwable? = ...) // io.ktor.client.webrtc/WebRtc.DataChannelClosedException.<init>|<init>(kotlin.String;kotlin.Throwable?){}[0]
}

open class IOException : kotlinx.io/IOException { // io.ktor.client.webrtc/WebRtc.IOException|null[0]
constructor <init>(kotlin/String, kotlin/Throwable? = ...) // io.ktor.client.webrtc/WebRtc.IOException.<init>|<init>(kotlin.String;kotlin.Throwable?){}[0]
}
}

final object io.ktor.client.webrtc/WebRtcMedia { // io.ktor.client.webrtc/WebRtcMedia|null[0]
Expand Down Expand Up @@ -828,6 +837,7 @@ final object io.ktor.client.webrtc/WebRtcMedia { // io.ktor.client.webrtc/WebRtc
}

final fun <#A: io.ktor.client.webrtc/WebRtcConfig> io.ktor.client.webrtc/WebRtcClient(io.ktor.client.webrtc/WebRtcClientEngineFactory<#A>, kotlin/Function1<#A, kotlin/Unit> = ...): io.ktor.client.webrtc/WebRtcClient // io.ktor.client.webrtc/WebRtcClient|WebRtcClient(io.ktor.client.webrtc.WebRtcClientEngineFactory<0:0>;kotlin.Function1<0:0,kotlin.Unit>){0§<io.ktor.client.webrtc.WebRtcConfig>}[0]
final suspend inline fun <#A: kotlin/Any?> io.ktor.client.webrtc/withIOException(crossinline kotlin.coroutines/SuspendFunction0<#A>): #A // io.ktor.client.webrtc/withIOException|withIOException(kotlin.coroutines.SuspendFunction0<0:0>){0§<kotlin.Any?>}[0]

// Targets: [ios]
abstract interface io.ktor.client.webrtc.media/Capturer : kotlin/AutoCloseable { // io.ktor.client.webrtc.media/Capturer|null[0]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ public object WebRtc {
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.send)
*
* @param text The text message to send.
* @throws DataChannelClosedException if the channel is closed or not open for sending.
* @throws IOException if the message cannot be sent by the underlying WebRTC transport.
* @see [MDN RTCDataChannel.send()](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/send)
*/
public suspend fun send(text: String)
Expand All @@ -486,6 +488,8 @@ public object WebRtc {
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.send)
*
* @param bytes The binary data to send.
* @throws DataChannelClosedException if the channel is closed or not open for sending.
* @throws IOException if the message cannot be sent by the underlying WebRTC transport.
* @see [MDN RTCDataChannel.send()](https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel/send)
*/
public suspend fun send(bytes: ByteArray)
Expand All @@ -496,6 +500,8 @@ public object WebRtc {
* This method will suspend the current coroutine until a message is received.
* The message can be either text or binary data.
*
* @throws DataChannelClosedException if the channel is closed by this or remote peer.
*
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannel.receive)
*/
public suspend fun receive(): Message
Expand Down Expand Up @@ -593,4 +599,21 @@ public object WebRtc {
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.IceException)
*/
public class IceException(message: String?, cause: Throwable? = null) : RuntimeException(message, cause)

/**
* Signals that some I/O exception has occurred.
*
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.IOException)
*/
public open class IOException(message: String, cause: Throwable? = null) : kotlinx.io.IOException(message, cause)

/**
* Exception thrown when trying to send to or read from a closed [DataChannel].
*
* [Report a problem](https://ktor.io/feedback/?fqname=io.ktor.client.webrtc.WebRtc.DataChannelClosedException)
*/
public open class DataChannelClosedException(
message: String,
cause: Throwable? = null
) : IOException(message, cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,24 @@ public abstract class WebRtcDataChannel private constructor(
receiveChannel = Channel(options = receiveOptions)
)

override suspend fun receive(): WebRtc.DataChannel.Message = receiveChannel.receive()
/**
* Returns an iterator over received messages.
*
* If the data channel is closed, the iterator keeps returning messages already stored in the buffer
* and completes when no buffered messages remain.
*/
public operator fun iterator(): ChannelIterator<WebRtc.DataChannel.Message> {
return receiveChannel.iterator()
}

override suspend fun receive(): WebRtc.DataChannel.Message {
val result = receiveChannel.receiveCatching()
if (result.isSuccess) {
return result.getOrThrow()
}
val message = "Data channel '$label' is closed and no more messages will be received."
throw WebRtc.DataChannelClosedException(message, result.exceptionOrNull())
}

override suspend fun receiveBinary(): ByteArray = receive().binaryOrThrow()

Expand All @@ -225,3 +242,16 @@ public abstract class WebRtcDataChannel private constructor(
receiveChannel.close()
}
}

@InternalAPI
public suspend inline fun <R> withIOException(crossinline block: suspend () -> R): R {
return try {
block()
} catch (cause: kotlinx.coroutines.CancellationException) {
throw cause
} catch (cause: WebRtc.IOException) {
throw cause
} catch (cause: Exception) {
throw WebRtc.IOException("Error in WebRtcDataChannel operation", cause)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.webrtc

import kotlinx.coroutines.test.*
import kotlin.test.*

class WebRtcDataChannelIteratorTest {

@Test
fun testIteratorDrainsBufferedMessagesAfterChannelIsClosed() = runTest {
val channel = TestWebRtcDataChannel()
channel.emit(WebRtc.DataChannel.Message.Text("first"))
channel.emit(WebRtc.DataChannel.Message.Text("second"))
channel.closeReceiving()

val messages = channel.toList()

assertEquals(listOf("first", "second"), messages.map { it.textOrThrow() })
}

@Test
fun testIteratorStopsOnClosedChannelWithoutBufferedMessages() = runTest {
val channel = TestWebRtcDataChannel()
channel.closeReceiving()

assertTrue(channel.toList().isEmpty())
}

@Test
fun testIteratorPreservesMessageOrderAndType() = runTest {
val channel = TestWebRtcDataChannel()
val bytes = byteArrayOf(1, 2, 3)
channel.emit(WebRtc.DataChannel.Message.Text("text"))
channel.emit(WebRtc.DataChannel.Message.Binary(bytes))
channel.closeReceiving()

val messages = channel.toList()

assertEquals("text", messages[0].textOrThrow())
assertContentEquals(bytes, messages[1].binaryOrThrow())
}

private suspend fun WebRtcDataChannel.toList(): List<WebRtc.DataChannel.Message> = buildList {
for (message in this@toList) add(message)
}

private class TestWebRtcDataChannel : WebRtcDataChannel(DataChannelReceiveOptions()) {
override val id: Int? = null
override val label: String = "test"
override var state: WebRtc.DataChannel.State = WebRtc.DataChannel.State.OPEN
override val bufferedAmount: Long = 0
override val bufferedAmountLowThreshold: Long = 0
override val maxPacketLifeTime: Int? = null
override val maxRetransmits: Int? = null
override val negotiated: Boolean = false
override val ordered: Boolean = true
override val protocol: String = ""

fun emit(message: WebRtc.DataChannel.Message) {
check(emitMessage(message).isSuccess)
}

override fun setBufferedAmountLowThreshold(threshold: Long) = TODO()

override suspend fun send(text: String) = TODO()

override suspend fun send(bytes: ByteArray) = TODO()

fun closeReceiving() {
state = WebRtc.DataChannel.State.CLOSED
stopReceivingMessages()
}

override fun closeTransport() {
closeReceiving()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,9 @@ class WebRtcDataChannelTest {
dataChannel1.waitForClose(dataChannelEvents1)
dataChannel2.waitForClose(dataChannelEvents2)

assertFails { dataChannel1.send("Hello") }
assertFails { dataChannel2.send("Hello") }
assertFails { dataChannel1.receive() }
assertEquals(null, dataChannel1.tryReceive())
assertFailsWith<WebRtc.DataChannelClosedException> { dataChannel1.send("Hello") }
assertFailsWith<WebRtc.DataChannelClosedException> { dataChannel2.send("Hello") }
assertFailsWith<WebRtc.DataChannelClosedException> { dataChannel1.receive() }
assertEquals(null, dataChannel2.tryReceive())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,22 @@ public class IosWebRtcDataChannel(
override val protocol: String
get() = nativeChannel.protocol()

private fun assertOpen() {
if (!state.canSend()) {
error("Data channel is closed.")
}
private fun requireOpen() {
if (state.canSend()) return
throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.")
}

override suspend fun send(text: String) {
assertOpen()
requireOpen()
if (!nativeChannel.sendData(data = text.toRTCDataBuffer())) {
error("Failed to send text message over DataChannel.")
throw WebRtc.IOException("Failed to send text message over data channel '$label'.")
}
}

override suspend fun send(bytes: ByteArray) {
assertOpen()
requireOpen()
if (!nativeChannel.sendData(data = bytes.toRTCDataBuffer())) {
error("Failed to send binary message over DataChannel.")
throw WebRtc.IOException("Failed to send binary message over data channel '$label'.")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/*
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2026 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.webrtc.rs

import io.ktor.client.webrtc.*
import io.ktor.utils.io.InternalAPI
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -55,12 +56,21 @@ public class RustWebRtcDataChannel(
override val protocol: String
get() = inner.protocol()

private fun requireOpen() {
if (state.canSend()) return
throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.")
}

@OptIn(InternalAPI::class)
override suspend fun send(text: String) {
inner.sendText(text)
requireOpen()
withIOException { inner.sendText(text) }
}

@OptIn(InternalAPI::class)
override suspend fun send(bytes: ByteArray) {
inner.send(bytes)
requireOpen()
withIOException { inner.send(bytes) }
}

override fun setBufferedAmountLowThreshold(threshold: Long): Unit = runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.ktor.client.webrtc

import io.ktor.utils.io.InternalAPI
import js.buffer.ArrayBuffer
import js.buffer.toByteArray
import js.typedarrays.toInt8Array
Expand Down Expand Up @@ -60,12 +61,21 @@ public class JsWebRtcDataChannel(
override val protocol: String
get() = channel.protocol

private fun requireOpen() {
if (state.canSend()) return
throw WebRtc.DataChannelClosedException("Data channel '$label' cannot send.")
}

@OptIn(InternalAPI::class)
override suspend fun send(text: String) {
channel.send(text)
requireOpen()
withIOException { channel.send(text) }
}

@OptIn(InternalAPI::class)
override suspend fun send(bytes: ByteArray) {
channel.send(bytes.toInt8Array())
requireOpen()
withIOException { channel.send(bytes.toInt8Array()) }
}

override fun setBufferedAmountLowThreshold(threshold: Long) {
Expand Down