diff --git a/cli/parsefile.go b/cli/parsefile.go index 1e1a48d..014afa9 100644 --- a/cli/parsefile.go +++ b/cli/parsefile.go @@ -26,6 +26,7 @@ SOFTWARE. package main import ( + "bufio" "bytes" "flag" "fmt" @@ -61,18 +62,13 @@ func main() { } }(tsFile) // Verify if sync-byte is present and seek to the first sync-byte - syncIndex, err := sync(tsFile) - if err == nil { - _, err = tsFile.Seek(syncIndex, 0) - if err != nil { - fmt.Println(err) - return - } - } else { + reader := bufio.NewReader(tsFile) + _, err = packet.Sync(reader) + if err != nil { fmt.Println(err) return } - pat, err := extractPat(tsFile) + pat, err := psi.ReadPAT(reader) if err != nil { println(err) return @@ -82,7 +78,7 @@ func main() { if *showPmt { pm := pat.ProgramMap() for pn, pid := range pm { - pmt, err := extractPmt(tsFile, pid) + pmt, err := psi.ReadPMT(reader, pid) if err != nil { panic(err) } @@ -90,16 +86,17 @@ func main() { } } - pkt := make([]byte, packet.PacketSize, packet.PacketSize) - var offset int64 + pkt := make(packet.Packet, packet.PacketSize) var numPackets uint64 ebps := make(map[uint64]ebp.EncoderBoundaryPoint) for { - read, err := tsFile.ReadAt(pkt, offset) - if err == io.EOF || read == 0 { - break + if _, err := io.ReadFull(reader, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + println(err) + return } - offset += packet.PacketSize numPackets++ if *showEbp { ebpBytes, err := adaptationfield.EncoderBoundaryPoint(pkt) @@ -150,99 +147,6 @@ func printPat(pat psi.PAT) { printlnf("\tNumber of Programs %v", pat.NumPrograms()) } -func extractPat(buf io.Reader) (psi.PAT, error) { - pkt := make([]byte, packet.PacketSize) - var pat psi.PAT - for read, err := buf.Read(pkt); pat == nil; read, err = buf.Read(pkt) { - if err != nil { - return nil, err - } - if read <= 0 { - return nil, fmt.Errorf("Reached EOF without PAT") - } - pid, err := packet.Pid(pkt) - if err != nil { - return nil, err - } - if pid == 0 { - pay, err := packet.Payload(pkt) - if err != nil { - println(err) - continue - } - cp := make([]byte, len(pay)) - copy(cp, pay) - pat, err := psi.NewPAT(cp) - if err != nil { - println(err) - continue - } - return pat, nil - } - } - return nil, fmt.Errorf("No pat found") -} func printlnf(format string, a ...interface{}) { fmt.Printf(format+"\n", a...) } -func extractPmt(buf io.Reader, pid uint16) (psi.PMT, error) { - pkt := make([]byte, packet.PacketSize) - pmtAcc := packet.NewAccumulator(psi.PmtAccumulatorDoneFunc) - var pmt psi.PMT - for read, err := buf.Read(pkt); pmt == nil && read > 0; read, err = buf.Read(pkt) { - if err != nil { - return nil, err - } - currPid, err := packet.Pid(pkt) - if err != nil { - return nil, err - } - if currPid == pid { - done, err := pmtAcc.Add(pkt) - if err != nil { - return nil, err - } - if done { - b, err := pmtAcc.Parse() - if err != nil { - return nil, err - } - pmt, err = psi.NewPMT(b) - if err != nil { - return nil, err - } - - } - - } - } - return pmt, nil -} -func sync(buf io.Reader) (int64, error) { - // function find the first sync byte of the array - data := make([]byte, 1) - for i := int64(0); ; i++ { - read, err := buf.Read(data) - if err != nil && err != io.EOF { - println(err) - } - if read == 0 { - break - } - if int(data[0]) == packet.SyncByte { - // check next 188th byte - nextData := make([]byte, packet.PacketSize) - nextRead, err := buf.Read(nextData) - if err != nil && err != io.EOF { - println(err) - } - if nextRead == 0 { - break - } - if nextData[187] == packet.SyncByte { - return i, nil - } - } - } - return 0, fmt.Errorf("Sync-byte not found.") -} diff --git a/errors.go b/errors.go index c142a35..de65697 100644 --- a/errors.go +++ b/errors.go @@ -43,6 +43,12 @@ var ( ErrNoPCR = errors.New("adaptation field has no Program Clock Reference") // ErrNoOPCR is returned when an attempt is made to access an adaptation field OPCR that does not exist ErrNoOPCR = errors.New("adaptation field has no Original Program Clock Reference") + // ErrPATNotFound is returned when expected PAT packet is not found when + // reading TS packets. + ErrPATNotFound = errors.New("No PAT was found while reading TS") + // ErrPMTNotFound is returned when expected PMT packet(s) are not found when + // reading TS packets. + ErrPMTNotFound = errors.New("No PMT was found while reading TS") // ErrParsePMTDescriptor is returned when a PMT descriptor cannot be parsed ErrParsePMTDescriptor = errors.New("unable to parse PMT descriptor") // ErrInvalidPATLength is returned when a PAT cannot be parsed because there are not enough bytes @@ -70,4 +76,7 @@ var ( ErrSCTE35DescriptorNotFound = errors.New("Cannot close descriptor that's not in the open list") // ErrNilPAT is returned when a PAT is passed into a function for which it cannot be nil. ErrNilPAT = errors.New("Nil PAT not allowed here.") + // ErrSyncByteNotFound is returned when a packet sync byte could not be found + // when reading. + ErrSyncByteNotFound = errors.New("Sync-byte not found.") ) diff --git a/packet/io.go b/packet/io.go new file mode 100644 index 0000000..a96d5b4 --- /dev/null +++ b/packet/io.go @@ -0,0 +1,65 @@ +/* +MIT License + +Copyright 2016 Comcast Cable Communications Management, LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package packet + +import ( + "bufio" + "io" + + "github.com/Comcast/gots" +) + +// Sync finds the offset of the next packet sync byte and advances the reader +// to the packet start. It also checks the next 188th byte to ensure a sync is +// found. It returns the offset of the sync w.r.t. the original reader +// position. +func Sync(r *bufio.Reader) (int64, error) { + data := make([]byte, 1) + for i := int64(0); ; i++ { + _, err := io.ReadFull(r, data) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return 0, err + } + if int(data[0]) == SyncByte { + // check next 188th byte + rp := bufio.NewReaderSize(r, PacketSize) // extends only if needed + nextData, err := rp.Peek(PacketSize) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return 0, err + } + if nextData[187] == SyncByte { + r.UnreadByte() + return i, nil + } + } + } + return 0, gots.ErrSyncByteNotFound +} diff --git a/packet/io_test.go b/packet/io_test.go new file mode 100644 index 0000000..b081b94 --- /dev/null +++ b/packet/io_test.go @@ -0,0 +1,107 @@ +/* +MIT License + +Copyright 2016 Comcast Cable Communications Management, LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package packet + +import ( + "bufio" + "bytes" + "encoding/hex" + "testing" +) + +func TestSyncForSmoke(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bufio.NewReader(bytes.NewReader(bs)) + + offset, err := Sync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if offset != 0 { + t.Errorf("Incorrect offset returned for next sync marker") + } +} + +func TestSyncNonZeroOffset(t *testing.T) { + bs, _ := hex.DecodeString("ffffff474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bufio.NewReader(bytes.NewReader(bs)) + + offset, err := Sync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if offset != 3 { + t.Errorf("Incorrect offset returned for next sync marker") + } +} + +func TestSyncNotFound(t *testing.T) { + // no sync byte here + bs, _ := hex.DecodeString("ff4000100000b00d0001c100000001e256f803e71bfffffff") + r := bufio.NewReader(bytes.NewReader(bs)) + + _, err := Sync(r) + if err == nil { + t.Errorf("Expected there to be an error, but there was not") + } +} + +func TestSyncReaderPosAtPacketStart(t *testing.T) { + bs, _ := hex.DecodeString("ffffff474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bufio.NewReader(bytes.NewReader(bs)) + + _, err := Sync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + bytes := make([]byte, 3) + _, err = r.Read(bytes) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + wantHex := "474000" + gotHex := hex.EncodeToString(bytes) + if gotHex != wantHex { + t.Errorf("Reader not left in correct spot. Wanted next read %v, got %v", wantHex, gotHex) + } +} diff --git a/psi/pat.go b/psi/pat.go index 63ea46e..a3efb3d 100644 --- a/psi/pat.go +++ b/psi/pat.go @@ -25,6 +25,8 @@ SOFTWARE. package psi import ( + "io" + "github.com/Comcast/gots" "github.com/Comcast/gots/packet" ) @@ -124,3 +126,38 @@ func (pat pat) ProgramMap() map[uint16]uint16 { return m } + +// ReadPAT extracts a PAT from a reader of a TS stream. It will read until a +// PAT packet is found or EOF is reached. +// It returns a new PAT object parsed from the packet, if found, and otherwise +// returns an error. +func ReadPAT(r io.Reader) (PAT, error) { + pkt := make(packet.Packet, packet.PacketSize) + var pat PAT + for pat == nil { + if _, err := io.ReadFull(r, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + return nil, err + } + isPat, err := packet.IsPat(pkt) + if err != nil { + return nil, err + } + if isPat { + pay, err := packet.Payload(pkt) + if err != nil { + return nil, err + } + cp := make([]byte, len(pay)) + copy(cp, pay) + pat, err := NewPAT(cp) + if err != nil { + return nil, err + } + return pat, nil + } + } + return nil, gots.ErrPATNotFound +} diff --git a/psi/pat_test.go b/psi/pat_test.go index 83570d3..ae53842 100644 --- a/psi/pat_test.go +++ b/psi/pat_test.go @@ -24,6 +24,7 @@ SOFTWARE. package psi import ( + "bytes" "encoding/hex" "reflect" "testing" @@ -105,3 +106,40 @@ func TestProgramMap(t *testing.T) { } } } + +func TestReadPATForSmoke(t *testing.T) { + // requires full packets so cannot use test data above + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff474256100002b0300001c10000e131f0060504435545491be121f0042a027e1" + + "f86e225f00f52012a9700e9080c001f41850fa041ee3f6580ffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffff") + r := bytes.NewReader(bs) + pat, err := ReadPAT(r) + if err != nil { + t.Errorf("Unexpected error reading PAT: %v", err) + } + // sanity check (tests integration a bit) + gotMap := pat.ProgramMap() + wantMap := map[uint16]uint16{1: 598} + if !reflect.DeepEqual(wantMap, gotMap) { + t.Errorf("PAT read is invalid, did not have expected program map") + } +} + +func TestReadPATIncomplete(t *testing.T) { + bs, _ := hex.DecodeString("47400") // incomplete PAT packet + r := bytes.NewReader(bs) + + _, err := ReadPAT(r) + if err == nil { + t.Errorf("Expected to get error reading PAT, but did not") + } +} diff --git a/psi/pmt.go b/psi/pmt.go index be17eda..38d2368 100644 --- a/psi/pmt.go +++ b/psi/pmt.go @@ -28,6 +28,7 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "github.com/Comcast/gots" "github.com/Comcast/gots/packet" @@ -304,3 +305,44 @@ func pidIn(pids []uint16, target uint16) bool { return false } + +// ReadPMT extracts a PMT from a reader of a TS stream. It will read until PMT +// packet(s) are found or EOF is reached. +// It returns a new PMT object parsed from the packet(s), if found, and +// otherwise returns an error. +func ReadPMT(r io.Reader, pid uint16) (PMT, error) { + pkt := make(packet.Packet, packet.PacketSize) + pmtAcc := packet.NewAccumulator(PmtAccumulatorDoneFunc) + done := false + var pmt PMT + for !done { + if _, err := io.ReadFull(r, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return nil, gots.ErrPMTNotFound + } + return nil, err + } + currPid, err := packet.Pid(pkt) + if err != nil { + return nil, err + } + if currPid != pid { + continue + } + done, err = pmtAcc.Add(pkt) + if err != nil { + return nil, err + } + if done { + b, err := pmtAcc.Parse() + if err != nil { + return nil, err + } + pmt, err = NewPMT(b) + if err != nil { + return nil, err + } + } + } + return pmt, nil +} diff --git a/psi/pmt_test.go b/psi/pmt_test.go index f6dc6b1..153941a 100644 --- a/psi/pmt_test.go +++ b/psi/pmt_test.go @@ -24,6 +24,7 @@ SOFTWARE. package psi import ( + "bytes" "encoding/hex" "fmt" "testing" @@ -627,3 +628,44 @@ func TestIsPMTErrorConditions(t *testing.T) { t.Error("Bad PMT Length should return an error, probably invalid packet length") } } +func TestReadPMTForSmoke(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff474256100002b0300001c10000e131f0060504435545491be121f0042a027e1" + + "f86e225f00f52012a9700e9080c001f41850fa041ee3f6580ffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffff") + r := bytes.NewReader(bs) + + pid := uint16(598) + pmt, err := ReadPMT(r, pid) + if err != nil { + t.Errorf("Unexpected error reading PMT: %v", err) + } + // sanity check (tests integration a bit) + if len(pmt.ElementaryStreams()) != 2 { + t.Errorf("PMT read is invalid, did not have expected number of streams") + } +} +func TestReadPMTIncomplete(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") // incomplete PMT packet + r := bytes.NewReader(bs) + + pid := uint16(598) + _, err := ReadPMT(r, pid) + if err == nil { + t.Errorf("Expected to get error reading PMT, but did not") + } +}