Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@ public class ByteChannel(public override val autoFlush: Boolean = false) : ByteR
sleepWhile(Slot::Read) {
flushBufferSize + _readBuffer.size < min && _closedCause.value == null
}
rethrowCloseCauseIfNeeded()

if (_readBuffer.size < CHANNEL_MAX_SIZE) moveFlushToReadBuffer()
return _readBuffer.size >= min
}

private fun rethrowCloseCauseIfNeeded() {
closedCause?.let { throw it }
}

@OptIn(InternalAPI::class)
private fun moveFlushToReadBuffer() {
synchronized(flushBufferMutex) {
Expand Down
52 changes: 52 additions & 0 deletions ktor-io/common/test/ByteReadChannelOperationsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import kotlinx.io.IOException
import kotlinx.io.InternalIoApi
import kotlinx.io.bytestring.ByteString
Expand Down Expand Up @@ -115,6 +116,57 @@ class ByteReadChannelOperationsTest {
}
}

@Test
fun `copyTo propagates closedCause cancelled mid-await`() = runTest {
val src = ByteChannel()
val dst = ByteChannel()
launch {
yield()
src.cancel(IOException("source cancelled"))
}
assertFailsWith<IOException> {
src.copyTo(dst)
}
assertTrue(src.isClosedForRead)
}

@Test
fun `copyTo with limit propagates closedCause cancelled mid-await`() = runTest {
val src = ByteChannel()
val dst = ByteChannel()
launch {
yield()
src.cancel(IOException("source cancelled"))
}
assertFailsWith<IOException> {
src.copyTo(dst, limit = 1024L)
}
assertTrue(src.isClosedForRead)
}

@Test
fun `copyTo does not throw on normal close`() = runTest {
val src = ByteChannel()
val dst = ByteChannel()
src.writeFully(byteArrayOf(1, 2, 3))
src.flushAndClose()
val copied = src.copyTo(dst)
assertEquals(3, copied)
}

@Test
fun `awaitContent rethrows closedCause after suspension`() = runTest {
val channel = ByteChannel()
launch {
yield()
channel.cancel(IOException("cancelled mid-await"))
}
assertFailsWith<IOException> {
channel.awaitContent()
}
assertTrue(channel.isClosedForRead)
}

@Test
fun readFully() = runTest {
val expected = ByteArray(10) { it.toByte() }
Expand Down