diff --git a/data.go b/data.go index 47cd619..5e68378 100644 --- a/data.go +++ b/data.go @@ -182,3 +182,42 @@ func isPSIComplete(ps []*Packet) bool { return i.Len() >= i.Offset() } + +// isPESComplete checks whether payload fully contains PES packet +func isPESComplete(ps []*Packet) bool { + // Get payload length + var l int + for _, p := range ps { + l += len(p.Payload) + } + + // Get the slice for payload from pool + payload := bytesPool.get(l) + defer bytesPool.put(payload) + + // Append payload + var o int + for _, p := range ps { + o += copy(payload.s[o:], p.Payload) + } + + // Create reader + i := astikit.NewBytesIterator(payload.s) + + // Skip first 3 bytes that are there to identify the PES payload + i.Seek(3) + + // Parse header + h, _, dataEnd, err := parsePESHeader(i) + if err != nil { + err = fmt.Errorf("astits: parsing PES header failed: %w", err) + return false + } + + if h.PacketLength == 0 { + // There's no other way to know whether the packet is complete + return false + } + + return i.Len() >= dataEnd +} diff --git a/packet_pool.go b/packet_pool.go index 166fe00..1f7402d 100644 --- a/packet_pool.go +++ b/packet_pool.go @@ -52,6 +52,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) { isPSIComplete(mps) { ps = mps mps = nil + } else if isPESPayload(mps[0].Payload) && isPESComplete(mps) { // Check if PES payload is complete + ps = mps + mps = nil } b.q = mps diff --git a/packet_pool_test.go b/packet_pool_test.go index 79eafee..a9fe9ac 100644 --- a/packet_pool_test.go +++ b/packet_pool_test.go @@ -49,3 +49,38 @@ func TestPacketPool(t *testing.T) { ps = b.dumpUnlocked() assert.Len(t, ps, 0) } + +func TestPacketPoolWithRarePackets(t *testing.T) { + payloadDVBTeletext := hexToBytes(`000001bd00b2848024293c972af5ffffffffffffffffffffffffffffffff + ffffffffffffffffffffffffffffff10032cf5e4a8a80b0ba80ba80b2692 + 040404040404040404040404040404040404040404040404040404040404 + 0404032cd5e4a8a85757a8a8a80b26a80404040404040404040404040404 + 040404040404040404040404040404040404ff2cffffffffffffffffffff + ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff + ffffffff`) + b := newPacketPool(nil) + ps := b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext}) + assert.Len(t, ps, 1) + ps = b.dumpUnlocked() + assert.Len(t, ps, 0) +}