From 972e0d6315b6b72303dfcf4c7d665caa14f5b864 Mon Sep 17 00:00:00 2001 From: cobeq Date: Wed, 3 Dec 2025 15:59:06 +0300 Subject: [PATCH 1/2] fix(packet_pool): reducing latency for single-packet PES by using a heuristic to determine when a single-packet PES is received and to send eventually --- data.go | 23 +++++++++++++++++++++++ packet_pool.go | 21 ++++++++++++++++++++- packet_pool_test.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) diff --git a/data.go b/data.go index 47cd619..53f289a 100644 --- a/data.go +++ b/data.go @@ -182,3 +182,26 @@ func isPSIComplete(ps []*Packet) bool { return i.Len() >= i.Offset() } + +// isPESComplete checks whether payload fully contains PES packet +func isPESComplete(payload []byte) bool { + i := astikit.NewBytesIterator(payload) + + i.Seek(4) + + // Get next bytes + var bs []byte + var err error + if bs, err = i.NextBytesNoCopy(2); err != nil { + return false + } + + pesLength := uint16(bs[0])<<8 | uint16(bs[1]) + + if pesLength == 0 { + // any length + return false + } + + return int(pesLength)+pesHeaderLength <= len(payload) +} diff --git a/packet_pool.go b/packet_pool.go index 166fe00..6617188 100644 --- a/packet_pool.go +++ b/packet_pool.go @@ -23,6 +23,7 @@ func newPacketAccumulator(pid uint16, programMap *programMap) *packetAccumulator func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { mps := b.q + var needSkipBufferMPS bool // Empty buffer if we detect a discontinuity if hasDiscontinuity(mps, p) { // Reset current slice or make new @@ -31,6 +32,8 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { } else { mps = make([]*Packet, 0, 10) } + } else { + needSkipBufferMPS = len(mps) == 0 || isPacketsAlreadySent(mps) } // Throw away packet if it's the same as the previous one @@ -40,7 +43,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { // Flush buffer if new payload starts here if p.Header.PayloadUnitStartIndicator { - ps = mps + if !needSkipBufferMPS { + ps = mps + } mps = make([]*Packet, 0, cap(mps)) } @@ -52,6 +57,8 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { isPSIComplete(mps) { ps = mps mps = nil + } else if needSkipBufferMPS && isPayloadCompletePES(p) { + ps = mps } b.q = mps @@ -110,6 +117,10 @@ func (b *packetPool) dumpUnlocked() (ps []*Packet) { ps = b.b[uint32(k)].q delete(b.b, uint32(k)) if len(ps) > 0 { + if isPacketsAlreadySent(ps) { + ps = nil + continue + } return } } @@ -150,3 +161,11 @@ func isSameAsPrevious(ps []*Packet, p *Packet) bool { l := len(ps) return l > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[l-1].Header.ContinuityCounter } + +func isPayloadCompletePES(packet *Packet) bool { + return packet.Header.PayloadUnitStartIndicator && isPESPayload(packet.Payload) && isPESComplete(packet.Payload) +} + +func isPacketsAlreadySent(ps []*Packet) bool { + return len(ps) == 1 && isPayloadCompletePES(ps[0]) +} diff --git a/packet_pool_test.go b/packet_pool_test.go index 79eafee..41ef4c8 100644 --- a/packet_pool_test.go +++ b/packet_pool_test.go @@ -49,3 +49,38 @@ func TestPacketPool(t *testing.T) { ps = b.dumpUnlocked() assert.Len(t, ps, 0) } + +func TestPacketPoolWithRarePackets(t *testing.T) { + payloadDVBTeletext := hexToBytes(`000001bd00b2848024293c972af5ffffffffffffffffffffffffffffffff + ffffffffffffffffffffffffffffff10032cf5e4a8a80b0ba80ba80b2692 + 040404040404040404040404040404040404040404040404040404040404 + 0404032cd5e4a8a85757a8a8a80b26a80404040404040404040404040404 + 040404040404040404040404040404040404ff2cffffffffffffffffffff + ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff + ffffffff`) + b := newPacketPool(nil) + ps := b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 0) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 0) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 0) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 0) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.dumpUnlocked() + assert.Len(t, ps, 0) +} From ddf8a4a98c47b1c75e2431e305bf147390c2480e Mon Sep 17 00:00:00 2001 From: cobeq Date: Thu, 4 Dec 2025 23:37:56 +0300 Subject: [PATCH 2/2] fix issues and simplify --- data.go | 40 ++++++++++++++++++++++++++++------------ packet_pool.go | 22 +++------------------- packet_pool_test.go | 8 ++++---- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/data.go b/data.go index 53f289a..5e68378 100644 --- a/data.go +++ b/data.go @@ -184,24 +184,40 @@ func isPSIComplete(ps []*Packet) bool { } // isPESComplete checks whether payload fully contains PES packet -func isPESComplete(payload []byte) bool { - i := astikit.NewBytesIterator(payload) +func isPESComplete(ps []*Packet) bool { + // Get payload length + var l int + for _, p := range ps { + l += len(p.Payload) + } - i.Seek(4) + // Get the slice for payload from pool + payload := bytesPool.get(l) + defer bytesPool.put(payload) - // Get next bytes - var bs []byte - var err error - if bs, err = i.NextBytesNoCopy(2); err != nil { - return false + // Append payload + var o int + for _, p := range ps { + o += copy(payload.s[o:], p.Payload) } - pesLength := uint16(bs[0])<<8 | uint16(bs[1]) + // Create reader + i := astikit.NewBytesIterator(payload.s) + + // Skip first 3 bytes that are there to identify the PES payload + i.Seek(3) + + // Parse header + h, _, dataEnd, err := parsePESHeader(i) + if err != nil { + err = fmt.Errorf("astits: parsing PES header failed: %w", err) + return false + } - if pesLength == 0 { - // any length + if h.PacketLength == 0 { + // There's no other way to know whether the packet is complete return false } - return int(pesLength)+pesHeaderLength <= len(payload) + return i.Len() >= dataEnd } diff --git a/packet_pool.go b/packet_pool.go index 6617188..1f7402d 100644 --- a/packet_pool.go +++ b/packet_pool.go @@ -23,7 +23,6 @@ func newPacketAccumulator(pid uint16, programMap *programMap) *packetAccumulator func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { mps := b.q - var needSkipBufferMPS bool // Empty buffer if we detect a discontinuity if hasDiscontinuity(mps, p) { // Reset current slice or make new @@ -32,8 +31,6 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { } else { mps = make([]*Packet, 0, 10) } - } else { - needSkipBufferMPS = len(mps) == 0 || isPacketsAlreadySent(mps) } // Throw away packet if it's the same as the previous one @@ -43,9 +40,7 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { // Flush buffer if new payload starts here if p.Header.PayloadUnitStartIndicator { - if !needSkipBufferMPS { - ps = mps - } + ps = mps mps = make([]*Packet, 0, cap(mps)) } @@ -57,8 +52,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { isPSIComplete(mps) { ps = mps mps = nil - } else if needSkipBufferMPS && isPayloadCompletePES(p) { + } else if isPESPayload(mps[0].Payload) && isPESComplete(mps) { // Check if PES payload is complete ps = mps + mps = nil } b.q = mps @@ -117,10 +113,6 @@ func (b *packetPool) dumpUnlocked() (ps []*Packet) { ps = b.b[uint32(k)].q delete(b.b, uint32(k)) if len(ps) > 0 { - if isPacketsAlreadySent(ps) { - ps = nil - continue - } return } } @@ -161,11 +153,3 @@ func isSameAsPrevious(ps []*Packet, p *Packet) bool { l := len(ps) return l > 0 && p.Header.HasPayload && p.Header.ContinuityCounter == ps[l-1].Header.ContinuityCounter } - -func isPayloadCompletePES(packet *Packet) bool { - return packet.Header.PayloadUnitStartIndicator && isPESPayload(packet.Payload) && isPESComplete(packet.Payload) -} - -func isPacketsAlreadySent(ps []*Packet) bool { - return len(ps) == 1 && isPayloadCompletePES(ps[0]) -} diff --git a/packet_pool_test.go b/packet_pool_test.go index 41ef4c8..a9fe9ac 100644 --- a/packet_pool_test.go +++ b/packet_pool_test.go @@ -68,17 +68,17 @@ func TestPacketPoolWithRarePackets(t *testing.T) { ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) - assert.Len(t, ps, 0) + assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) - assert.Len(t, ps, 0) + assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) - assert.Len(t, ps, 0) + assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) - assert.Len(t, ps, 0) + assert.Len(t, ps, 1) ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) assert.Len(t, ps, 1) ps = b.dumpUnlocked()