Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,3 +182,42 @@ func isPSIComplete(ps []*Packet) bool {

return i.Len() >= i.Offset()
}

// isPESComplete checks whether payload fully contains PES packet
func isPESComplete(ps []*Packet) bool {
// Get payload length
var l int
for _, p := range ps {
l += len(p.Payload)
}

// Get the slice for payload from pool
payload := bytesPool.get(l)
defer bytesPool.put(payload)

// Append payload
var o int
for _, p := range ps {
o += copy(payload.s[o:], p.Payload)
}

// Create reader
i := astikit.NewBytesIterator(payload.s)

// Skip first 3 bytes that are there to identify the PES payload
i.Seek(3)

// Parse header
h, _, dataEnd, err := parsePESHeader(i)
if err != nil {
err = fmt.Errorf("astits: parsing PES header failed: %w", err)
return false
}

if h.PacketLength == 0 {
// There's no other way to know whether the packet is complete
return false
}

return i.Len() >= dataEnd
}
3 changes: 3 additions & 0 deletions packet_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func (b *packetAccumulator) add(p *Packet) (ps []*Packet) {
isPSIComplete(mps) {
ps = mps
mps = nil
} else if isPESPayload(mps[0].Payload) && isPESComplete(mps) { // Check if PES payload is complete
ps = mps
mps = nil
}

b.q = mps
Expand Down
35 changes: 35 additions & 0 deletions packet_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,38 @@ func TestPacketPool(t *testing.T) {
ps = b.dumpUnlocked()
assert.Len(t, ps, 0)
}

func TestPacketPoolWithRarePackets(t *testing.T) {
payloadDVBTeletext := hexToBytes(`000001bd00b2848024293c972af5ffffffffffffffffffffffffffffffff
ffffffffffffffffffffffffffffff10032cf5e4a8a80b0ba80ba80b2692
040404040404040404040404040404040404040404040404040404040404
0404032cd5e4a8a85757a8a8a80b26a80404040404040404040404040404
040404040404040404040404040404040404ff2cffffffffffffffffffff
ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
ffffffff`)
b := newPacketPool(nil)
ps := b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 0, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 1, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 2, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 3, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 4, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 6, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 7, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 9, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.addUnlocked(&Packet{Header: PacketHeader{ContinuityCounter: 10, HasPayload: true, PayloadUnitStartIndicator: true, PID: 1004}, Payload: payloadDVBTeletext})
assert.Len(t, ps, 1)
ps = b.dumpUnlocked()
assert.Len(t, ps, 0)
}