From 304b4429c4e57247c5cc92d91a19461a908b334f Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Tue, 29 Nov 2016 23:14:34 -0500 Subject: [PATCH 1/8] move cli PAT and PMT extract functions into psi package --- cli/parsefile.go | 68 ++---------------------------------------------- psi/pat.go | 40 ++++++++++++++++++++++++++++ psi/pmt.go | 38 +++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 66 deletions(-) diff --git a/cli/parsefile.go b/cli/parsefile.go index 1e1a48d..36847b1 100644 --- a/cli/parsefile.go +++ b/cli/parsefile.go @@ -72,7 +72,7 @@ func main() { fmt.Println(err) return } - pat, err := extractPat(tsFile) + pat, err := psi.ReadPAT(tsFile) if err != nil { println(err) return @@ -82,7 +82,7 @@ func main() { if *showPmt { pm := pat.ProgramMap() for pn, pid := range pm { - pmt, err := extractPmt(tsFile, pid) + pmt, err := psi.ReadPMT(tsFile, pid) if err != nil { panic(err) } @@ -150,74 +150,10 @@ func printPat(pat psi.PAT) { printlnf("\tNumber of Programs %v", pat.NumPrograms()) } -func extractPat(buf io.Reader) (psi.PAT, error) { - pkt := make([]byte, packet.PacketSize) - var pat psi.PAT - for read, err := buf.Read(pkt); pat == nil; read, err = buf.Read(pkt) { - if err != nil { - return nil, err - } - if read <= 0 { - return nil, fmt.Errorf("Reached EOF without PAT") - } - pid, err := packet.Pid(pkt) - if err != nil { - return nil, err - } - if pid == 0 { - pay, err := packet.Payload(pkt) - if err != nil { - println(err) - continue - } - cp := make([]byte, len(pay)) - copy(cp, pay) - pat, err := psi.NewPAT(cp) - if err != nil { - println(err) - continue - } - return pat, nil - } - } - return nil, fmt.Errorf("No pat found") -} func printlnf(format string, a ...interface{}) { fmt.Printf(format+"\n", a...) } -func extractPmt(buf io.Reader, pid uint16) (psi.PMT, error) { - pkt := make([]byte, packet.PacketSize) - pmtAcc := packet.NewAccumulator(psi.PmtAccumulatorDoneFunc) - var pmt psi.PMT - for read, err := buf.Read(pkt); pmt == nil && read > 0; read, err = buf.Read(pkt) { - if err != nil { - return nil, err - } - currPid, err := packet.Pid(pkt) - if err != nil { - return nil, err - } - if currPid == pid { - done, err := pmtAcc.Add(pkt) - if err != nil { - return nil, err - } - if done { - b, err := pmtAcc.Parse() - if err != nil { - return nil, err - } - pmt, err = psi.NewPMT(b) - if err != nil { - return nil, err - } - - } - } - } - return pmt, nil -} func sync(buf io.Reader) (int64, error) { // function find the first sync byte of the array data := make([]byte, 1) diff --git a/psi/pat.go b/psi/pat.go index 63ea46e..cdce84f 100644 --- a/psi/pat.go +++ b/psi/pat.go @@ -25,6 +25,9 @@ SOFTWARE. package psi import ( + "fmt" + "io" + "github.com/Comcast/gots" "github.com/Comcast/gots/packet" ) @@ -124,3 +127,40 @@ func (pat pat) ProgramMap() map[uint16]uint16 { return m } + +// ReadPAT extracts a PAT from a reader. It will read until a PAT packet +// is found or EOF is reached. +// It returns a new PAT object parsed from the packet, if found, and otherwise +// returns an error. +func ReadPAT(buf io.Reader) (PAT, error) { + pkt := make([]byte, packet.PacketSize) + var pat PAT + for read, err := buf.Read(pkt); pat == nil; read, err = buf.Read(pkt) { + if err != nil { + return nil, err + } + if read <= 0 { + return nil, fmt.Errorf("Reached EOF without PAT") + } + pid, err := packet.Pid(pkt) + if err != nil { + return nil, err + } + if pid == 0 { + pay, err := packet.Payload(pkt) + if err != nil { + //println(err) TODO ? + continue + } + cp := make([]byte, len(pay)) + copy(cp, pay) + pat, err := NewPAT(cp) + if err != nil { + //println(err) TODO ? + continue + } + return pat, nil + } + } + return nil, fmt.Errorf("No pat found") +} diff --git a/psi/pmt.go b/psi/pmt.go index be17eda..6bbeb61 100644 --- a/psi/pmt.go +++ b/psi/pmt.go @@ -28,6 +28,7 @@ import ( "bytes" "encoding/binary" "fmt" + "io" "github.com/Comcast/gots" "github.com/Comcast/gots/packet" @@ -304,3 +305,40 @@ func pidIn(pids []uint16, target uint16) bool { return false } + +// ReadPMT extracts a PMT from a reader. It will read until PMT +// packet(s) are found or EOF is reached. +// It returns a new PMT object parsed from the packet(s). +func ReadPMT(buf io.Reader, pid uint16) (PMT, error) { + pkt := make([]byte, packet.PacketSize) + pmtAcc := packet.NewAccumulator(PmtAccumulatorDoneFunc) + var pmt PMT + for read, err := buf.Read(pkt); pmt == nil && read > 0; read, err = buf.Read(pkt) { + if err != nil { + return nil, err + } + currPid, err := packet.Pid(pkt) + if err != nil { + return nil, err + } + if currPid == pid { + done, err := pmtAcc.Add(pkt) + if err != nil { + return nil, err + } + if done { + b, err := pmtAcc.Parse() + if err != nil { + return nil, err + } + pmt, err = NewPMT(b) + if err != nil { + return nil, err + } + + } + + } + } + return pmt, nil +} From e2811fd2da1302a9495ebc3e3b6ee4179d8bd721 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Wed, 30 Nov 2016 10:01:13 -0500 Subject: [PATCH 2/8] cleanup PMT/PAT reader functions In particular, use io.ReadFull to ensure we don't have partial reads. --- errors.go | 6 ++++++ psi/pat.go | 31 ++++++++++++++----------------- psi/pmt.go | 42 +++++++++++++++++++++++------------------- 3 files changed, 43 insertions(+), 36 deletions(-) diff --git a/errors.go b/errors.go index c142a35..93e75d2 100644 --- a/errors.go +++ b/errors.go @@ -43,6 +43,12 @@ var ( ErrNoPCR = errors.New("adaptation field has no Program Clock Reference") // ErrNoOPCR is returned when an attempt is made to access an adaptation field OPCR that does not exist ErrNoOPCR = errors.New("adaptation field has no Original Program Clock Reference") + // ErrPATNotFound is returned when expected PAT packet is not found when + // reading TS packets. + ErrPATNotFound = errors.New("No PAT was found while reading TS") + // ErrPMTNotFound is returned when expected PMT packet(s) are not found when + // reading TS packets. + ErrPMTNotFound = errors.New("No PMT was found while reading TS") // ErrParsePMTDescriptor is returned when a PMT descriptor cannot be parsed ErrParsePMTDescriptor = errors.New("unable to parse PMT descriptor") // ErrInvalidPATLength is returned when a PAT cannot be parsed because there are not enough bytes diff --git a/psi/pat.go b/psi/pat.go index cdce84f..e8e2787 100644 --- a/psi/pat.go +++ b/psi/pat.go @@ -25,7 +25,6 @@ SOFTWARE. package psi import ( - "fmt" "io" "github.com/Comcast/gots" @@ -128,39 +127,37 @@ func (pat pat) ProgramMap() map[uint16]uint16 { return m } -// ReadPAT extracts a PAT from a reader. It will read until a PAT packet -// is found or EOF is reached. +// ReadPAT extracts a PAT from a reader of a TS stream. It will read until a +// PAT packet is found or EOF is reached. // It returns a new PAT object parsed from the packet, if found, and otherwise // returns an error. -func ReadPAT(buf io.Reader) (PAT, error) { - pkt := make([]byte, packet.PacketSize) +func ReadPAT(r io.Reader) (PAT, error) { + pkt := make(packet.Packet, packet.PacketSize) var pat PAT - for read, err := buf.Read(pkt); pat == nil; read, err = buf.Read(pkt) { - if err != nil { + for pat == nil { + if _, err := io.ReadFull(r, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return nil, gots.ErrPATNotFound + } return nil, err } - if read <= 0 { - return nil, fmt.Errorf("Reached EOF without PAT") - } - pid, err := packet.Pid(pkt) + isPat, err := packet.IsPat(pkt) if err != nil { return nil, err } - if pid == 0 { + if isPat { pay, err := packet.Payload(pkt) if err != nil { - //println(err) TODO ? - continue + return nil, err } cp := make([]byte, len(pay)) copy(cp, pay) pat, err := NewPAT(cp) if err != nil { - //println(err) TODO ? - continue + return nil, err } return pat, nil } } - return nil, fmt.Errorf("No pat found") + return nil, gots.ErrPATNotFound } diff --git a/psi/pmt.go b/psi/pmt.go index 6bbeb61..38d2368 100644 --- a/psi/pmt.go +++ b/psi/pmt.go @@ -306,38 +306,42 @@ func pidIn(pids []uint16, target uint16) bool { return false } -// ReadPMT extracts a PMT from a reader. It will read until PMT +// ReadPMT extracts a PMT from a reader of a TS stream. It will read until PMT // packet(s) are found or EOF is reached. -// It returns a new PMT object parsed from the packet(s). -func ReadPMT(buf io.Reader, pid uint16) (PMT, error) { - pkt := make([]byte, packet.PacketSize) +// It returns a new PMT object parsed from the packet(s), if found, and +// otherwise returns an error. +func ReadPMT(r io.Reader, pid uint16) (PMT, error) { + pkt := make(packet.Packet, packet.PacketSize) pmtAcc := packet.NewAccumulator(PmtAccumulatorDoneFunc) + done := false var pmt PMT - for read, err := buf.Read(pkt); pmt == nil && read > 0; read, err = buf.Read(pkt) { - if err != nil { + for !done { + if _, err := io.ReadFull(r, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + return nil, gots.ErrPMTNotFound + } return nil, err } currPid, err := packet.Pid(pkt) if err != nil { return nil, err } - if currPid == pid { - done, err := pmtAcc.Add(pkt) + if currPid != pid { + continue + } + done, err = pmtAcc.Add(pkt) + if err != nil { + return nil, err + } + if done { + b, err := pmtAcc.Parse() if err != nil { return nil, err } - if done { - b, err := pmtAcc.Parse() - if err != nil { - return nil, err - } - pmt, err = NewPMT(b) - if err != nil { - return nil, err - } - + pmt, err = NewPMT(b) + if err != nil { + return nil, err } - } } return pmt, nil From d6f3f792ab0ff8571c778004d0cb6e11de17478e Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Wed, 30 Nov 2016 11:18:02 -0500 Subject: [PATCH 3/8] move cli sync function to packet package --- cli/parsefile.go | 31 +------------------------------ packet/packet.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/cli/parsefile.go b/cli/parsefile.go index 36847b1..0428ed5 100644 --- a/cli/parsefile.go +++ b/cli/parsefile.go @@ -61,7 +61,7 @@ func main() { } }(tsFile) // Verify if sync-byte is present and seek to the first sync-byte - syncIndex, err := sync(tsFile) + syncIndex, err := packet.FindNextSync(tsFile) if err == nil { _, err = tsFile.Seek(syncIndex, 0) if err != nil { @@ -153,32 +153,3 @@ func printPat(pat psi.PAT) { func printlnf(format string, a ...interface{}) { fmt.Printf(format+"\n", a...) } - -func sync(buf io.Reader) (int64, error) { - // function find the first sync byte of the array - data := make([]byte, 1) - for i := int64(0); ; i++ { - read, err := buf.Read(data) - if err != nil && err != io.EOF { - println(err) - } - if read == 0 { - break - } - if int(data[0]) == packet.SyncByte { - // check next 188th byte - nextData := make([]byte, packet.PacketSize) - nextRead, err := buf.Read(nextData) - if err != nil && err != io.EOF { - println(err) - } - if nextRead == 0 { - break - } - if nextData[187] == packet.SyncByte { - return i, nil - } - } - } - return 0, fmt.Errorf("Sync-byte not found.") -} diff --git a/packet/packet.go b/packet/packet.go index 57a02f9..2cfb806 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -26,6 +26,8 @@ package packet import ( "bytes" + "fmt" + "io" "github.com/Comcast/gots" ) @@ -228,3 +230,34 @@ func Header(packet Packet) ([]byte, error) { func Equal(a, b Packet) bool { return bytes.Equal(a, b) } + +// Sync finds the offset of the next packet sync byte and returns the offset of +// the sync w.r.t. the original reader position. It also checks the next 188th +// byte to ensure a sync is found. +func FindNextSync(buf io.Reader) (int64, error) { + data := make([]byte, 1) + for i := int64(0); ; i++ { + read, err := buf.Read(data) + if err != nil && err != io.EOF { + println(err) + } + if read == 0 { + break + } + if int(data[0]) == SyncByte { + // check next 188th byte + nextData := make([]byte, PacketSize) + nextRead, err := buf.Read(nextData) + if err != nil && err != io.EOF { + println(err) + } + if nextRead == 0 { + break + } + if nextData[187] == SyncByte { + return i, nil + } + } + } + return 0, fmt.Errorf("Sync-byte not found.") +} From 78eb3e93c5db7c1b175899628bdbea52f706a841 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Wed, 30 Nov 2016 11:20:42 -0500 Subject: [PATCH 4/8] cleanup packet FindNextSync --- errors.go | 3 +++ packet/packet.go | 25 ++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/errors.go b/errors.go index 93e75d2..de65697 100644 --- a/errors.go +++ b/errors.go @@ -76,4 +76,7 @@ var ( ErrSCTE35DescriptorNotFound = errors.New("Cannot close descriptor that's not in the open list") // ErrNilPAT is returned when a PAT is passed into a function for which it cannot be nil. ErrNilPAT = errors.New("Nil PAT not allowed here.") + // ErrSyncByteNotFound is returned when a packet sync byte could not be found + // when reading. + ErrSyncByteNotFound = errors.New("Sync-byte not found.") ) diff --git a/packet/packet.go b/packet/packet.go index 2cfb806..d980519 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -26,7 +26,6 @@ package packet import ( "bytes" - "fmt" "io" "github.com/Comcast/gots" @@ -234,30 +233,30 @@ func Equal(a, b Packet) bool { // Sync finds the offset of the next packet sync byte and returns the offset of // the sync w.r.t. the original reader position. It also checks the next 188th // byte to ensure a sync is found. -func FindNextSync(buf io.Reader) (int64, error) { +func FindNextSync(r io.Reader) (int64, error) { data := make([]byte, 1) for i := int64(0); ; i++ { - read, err := buf.Read(data) - if err != nil && err != io.EOF { - println(err) - } - if read == 0 { + _, err := io.ReadFull(r, data) + if err == io.EOF || err == io.ErrUnexpectedEOF { break } + if err != nil { + return 0, err + } if int(data[0]) == SyncByte { // check next 188th byte nextData := make([]byte, PacketSize) - nextRead, err := buf.Read(nextData) - if err != nil && err != io.EOF { - println(err) - } - if nextRead == 0 { + _, err := io.ReadFull(r, nextData) + if err == io.EOF || err == io.ErrUnexpectedEOF { break } + if err != nil { + return 0, err + } if nextData[187] == SyncByte { return i, nil } } } - return 0, fmt.Errorf("Sync-byte not found.") + return 0, gots.ErrSyncByteNotFound } From eb806eb53411ca307007e2e452e122e6b10a04f8 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Wed, 30 Nov 2016 13:26:37 -0500 Subject: [PATCH 5/8] add tests for functions moved out of cli --- packet/packet_test.go | 19 +++++++++++++++++++ psi/pat_test.go | 38 ++++++++++++++++++++++++++++++++++++++ psi/pmt_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/packet/packet_test.go b/packet/packet_test.go index fd007ee..0700c78 100644 --- a/packet/packet_test.go +++ b/packet/packet_test.go @@ -351,3 +351,22 @@ func TestIsPat(t *testing.T) { t.Error("Non PAT Packet shouldn't be counted as a PAT") } } + +func TestFindNextSyncForSmoke(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bytes.NewReader(bs) + + offset, err := FindNextSync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if offset != 0 { + t.Errorf("Incorrect offset returned for next sync marker") + } +} diff --git a/psi/pat_test.go b/psi/pat_test.go index 83570d3..ae53842 100644 --- a/psi/pat_test.go +++ b/psi/pat_test.go @@ -24,6 +24,7 @@ SOFTWARE. package psi import ( + "bytes" "encoding/hex" "reflect" "testing" @@ -105,3 +106,40 @@ func TestProgramMap(t *testing.T) { } } } + +func TestReadPATForSmoke(t *testing.T) { + // requires full packets so cannot use test data above + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff474256100002b0300001c10000e131f0060504435545491be121f0042a027e1" + + "f86e225f00f52012a9700e9080c001f41850fa041ee3f6580ffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffff") + r := bytes.NewReader(bs) + pat, err := ReadPAT(r) + if err != nil { + t.Errorf("Unexpected error reading PAT: %v", err) + } + // sanity check (tests integration a bit) + gotMap := pat.ProgramMap() + wantMap := map[uint16]uint16{1: 598} + if !reflect.DeepEqual(wantMap, gotMap) { + t.Errorf("PAT read is invalid, did not have expected program map") + } +} + +func TestReadPATIncomplete(t *testing.T) { + bs, _ := hex.DecodeString("47400") // incomplete PAT packet + r := bytes.NewReader(bs) + + _, err := ReadPAT(r) + if err == nil { + t.Errorf("Expected to get error reading PAT, but did not") + } +} diff --git a/psi/pmt_test.go b/psi/pmt_test.go index f6dc6b1..153941a 100644 --- a/psi/pmt_test.go +++ b/psi/pmt_test.go @@ -24,6 +24,7 @@ SOFTWARE. package psi import ( + "bytes" "encoding/hex" "fmt" "testing" @@ -627,3 +628,44 @@ func TestIsPMTErrorConditions(t *testing.T) { t.Error("Bad PMT Length should return an error, probably invalid packet length") } } +func TestReadPMTForSmoke(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff474256100002b0300001c10000e131f0060504435545491be121f0042a027e1" + + "f86e225f00f52012a9700e9080c001f41850fa041ee3f6580ffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffff") + r := bytes.NewReader(bs) + + pid := uint16(598) + pmt, err := ReadPMT(r, pid) + if err != nil { + t.Errorf("Unexpected error reading PMT: %v", err) + } + // sanity check (tests integration a bit) + if len(pmt.ElementaryStreams()) != 2 { + t.Errorf("PMT read is invalid, did not have expected number of streams") + } +} +func TestReadPMTIncomplete(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") // incomplete PMT packet + r := bytes.NewReader(bs) + + pid := uint16(598) + _, err := ReadPMT(r, pid) + if err == nil { + t.Errorf("Expected to get error reading PMT, but did not") + } +} From 464a66b9ae56b38e9e92fca1c96c08b5d16643d2 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Wed, 30 Nov 2016 16:20:59 -0500 Subject: [PATCH 6/8] move FindNextSync to a new file, add more tests --- packet/io.go | 62 +++++++++++++++++++++++++++++++++ packet/io_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++ packet/packet.go | 32 ----------------- packet/packet_test.go | 19 ---------- 4 files changed, 142 insertions(+), 51 deletions(-) create mode 100644 packet/io.go create mode 100644 packet/io_test.go diff --git a/packet/io.go b/packet/io.go new file mode 100644 index 0000000..4e538f2 --- /dev/null +++ b/packet/io.go @@ -0,0 +1,62 @@ +/* +MIT License + +Copyright 2016 Comcast Cable Communications Management, LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package packet + +import ( + "io" + + "github.com/Comcast/gots" +) + +// Sync finds the offset of the next packet sync byte and returns the offset of +// the sync w.r.t. the original reader position. It also checks the next 188th +// byte to ensure a sync is found. +func FindNextSync(r io.Reader) (int64, error) { + data := make([]byte, 1) + for i := int64(0); ; i++ { + _, err := io.ReadFull(r, data) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return 0, err + } + if int(data[0]) == SyncByte { + // check next 188th byte + nextData := make([]byte, PacketSize) + _, err := io.ReadFull(r, nextData) + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + if err != nil { + return 0, err + } + if nextData[187] == SyncByte { + return i, nil + } + } + } + return 0, gots.ErrSyncByteNotFound +} diff --git a/packet/io_test.go b/packet/io_test.go new file mode 100644 index 0000000..5ec2a3e --- /dev/null +++ b/packet/io_test.go @@ -0,0 +1,80 @@ +/* +MIT License + +Copyright 2016 Comcast Cable Communications Management, LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +package packet + +import ( + "bytes" + "encoding/hex" + "testing" +) + +func TestFindNextSyncForSmoke(t *testing.T) { + bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bytes.NewReader(bs) + + offset, err := FindNextSync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if offset != 0 { + t.Errorf("Incorrect offset returned for next sync marker") + } +} + +func TestFindNextSyncNonZeroOffset(t *testing.T) { + bs, _ := hex.DecodeString("ffffff474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bytes.NewReader(bs) + + offset, err := FindNextSync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if offset != 3 { + t.Errorf("Incorrect offset returned for next sync marker") + } +} + +func TestFindNextSyncNotFound(t *testing.T) { + // no sync byte here + bs, _ := hex.DecodeString("ff4000100000b00d0001c100000001e256f803e71bfffffff") + r := bytes.NewReader(bs) + + _, err := FindNextSync(r) + if err == nil { + t.Errorf("Expected there to be an error, but there was not") + } +} diff --git a/packet/packet.go b/packet/packet.go index d980519..57a02f9 100644 --- a/packet/packet.go +++ b/packet/packet.go @@ -26,7 +26,6 @@ package packet import ( "bytes" - "io" "github.com/Comcast/gots" ) @@ -229,34 +228,3 @@ func Header(packet Packet) ([]byte, error) { func Equal(a, b Packet) bool { return bytes.Equal(a, b) } - -// Sync finds the offset of the next packet sync byte and returns the offset of -// the sync w.r.t. the original reader position. It also checks the next 188th -// byte to ensure a sync is found. -func FindNextSync(r io.Reader) (int64, error) { - data := make([]byte, 1) - for i := int64(0); ; i++ { - _, err := io.ReadFull(r, data) - if err == io.EOF || err == io.ErrUnexpectedEOF { - break - } - if err != nil { - return 0, err - } - if int(data[0]) == SyncByte { - // check next 188th byte - nextData := make([]byte, PacketSize) - _, err := io.ReadFull(r, nextData) - if err == io.EOF || err == io.ErrUnexpectedEOF { - break - } - if err != nil { - return 0, err - } - if nextData[187] == SyncByte { - return i, nil - } - } - } - return 0, gots.ErrSyncByteNotFound -} diff --git a/packet/packet_test.go b/packet/packet_test.go index 0700c78..fd007ee 100644 --- a/packet/packet_test.go +++ b/packet/packet_test.go @@ -351,22 +351,3 @@ func TestIsPat(t *testing.T) { t.Error("Non PAT Packet shouldn't be counted as a PAT") } } - -func TestFindNextSyncForSmoke(t *testing.T) { - bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + - "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + - "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + - "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + - "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + - "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + - "ff4742") - r := bytes.NewReader(bs) - - offset, err := FindNextSync(r) - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - if offset != 0 { - t.Errorf("Incorrect offset returned for next sync marker") - } -} From 6f36c125e53c34f28c82b7f9b8992df17fb295f5 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Fri, 2 Dec 2016 11:07:24 -0500 Subject: [PATCH 7/8] change sync method to place reader at pkt start --- cli/parsefile.go | 29 +++++++++++++---------------- packet/io.go | 15 +++++++++------ packet/io_test.go | 45 ++++++++++++++++++++++++++++++++++++--------- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/cli/parsefile.go b/cli/parsefile.go index 0428ed5..014afa9 100644 --- a/cli/parsefile.go +++ b/cli/parsefile.go @@ -26,6 +26,7 @@ SOFTWARE. package main import ( + "bufio" "bytes" "flag" "fmt" @@ -61,18 +62,13 @@ func main() { } }(tsFile) // Verify if sync-byte is present and seek to the first sync-byte - syncIndex, err := packet.FindNextSync(tsFile) - if err == nil { - _, err = tsFile.Seek(syncIndex, 0) - if err != nil { - fmt.Println(err) - return - } - } else { + reader := bufio.NewReader(tsFile) + _, err = packet.Sync(reader) + if err != nil { fmt.Println(err) return } - pat, err := psi.ReadPAT(tsFile) + pat, err := psi.ReadPAT(reader) if err != nil { println(err) return @@ -82,7 +78,7 @@ func main() { if *showPmt { pm := pat.ProgramMap() for pn, pid := range pm { - pmt, err := psi.ReadPMT(tsFile, pid) + pmt, err := psi.ReadPMT(reader, pid) if err != nil { panic(err) } @@ -90,16 +86,17 @@ func main() { } } - pkt := make([]byte, packet.PacketSize, packet.PacketSize) - var offset int64 + pkt := make(packet.Packet, packet.PacketSize) var numPackets uint64 ebps := make(map[uint64]ebp.EncoderBoundaryPoint) for { - read, err := tsFile.ReadAt(pkt, offset) - if err == io.EOF || read == 0 { - break + if _, err := io.ReadFull(reader, pkt); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF { + break + } + println(err) + return } - offset += packet.PacketSize numPackets++ if *showEbp { ebpBytes, err := adaptationfield.EncoderBoundaryPoint(pkt) diff --git a/packet/io.go b/packet/io.go index 4e538f2..a96d5b4 100644 --- a/packet/io.go +++ b/packet/io.go @@ -25,15 +25,17 @@ SOFTWARE. package packet import ( + "bufio" "io" "github.com/Comcast/gots" ) -// Sync finds the offset of the next packet sync byte and returns the offset of -// the sync w.r.t. the original reader position. It also checks the next 188th -// byte to ensure a sync is found. -func FindNextSync(r io.Reader) (int64, error) { +// Sync finds the offset of the next packet sync byte and advances the reader +// to the packet start. It also checks the next 188th byte to ensure a sync is +// found. It returns the offset of the sync w.r.t. the original reader +// position. +func Sync(r *bufio.Reader) (int64, error) { data := make([]byte, 1) for i := int64(0); ; i++ { _, err := io.ReadFull(r, data) @@ -45,8 +47,8 @@ func FindNextSync(r io.Reader) (int64, error) { } if int(data[0]) == SyncByte { // check next 188th byte - nextData := make([]byte, PacketSize) - _, err := io.ReadFull(r, nextData) + rp := bufio.NewReaderSize(r, PacketSize) // extends only if needed + nextData, err := rp.Peek(PacketSize) if err == io.EOF || err == io.ErrUnexpectedEOF { break } @@ -54,6 +56,7 @@ func FindNextSync(r io.Reader) (int64, error) { return 0, err } if nextData[187] == SyncByte { + r.UnreadByte() return i, nil } } diff --git a/packet/io_test.go b/packet/io_test.go index 5ec2a3e..b081b94 100644 --- a/packet/io_test.go +++ b/packet/io_test.go @@ -25,12 +25,13 @@ SOFTWARE. package packet import ( + "bufio" "bytes" "encoding/hex" "testing" ) -func TestFindNextSyncForSmoke(t *testing.T) { +func TestSyncForSmoke(t *testing.T) { bs, _ := hex.DecodeString("474000100000b00d0001c100000001e256f803e71bfffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + @@ -38,9 +39,9 @@ func TestFindNextSyncForSmoke(t *testing.T) { "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "ff4742") - r := bytes.NewReader(bs) + r := bufio.NewReader(bytes.NewReader(bs)) - offset, err := FindNextSync(r) + offset, err := Sync(r) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -49,7 +50,7 @@ func TestFindNextSyncForSmoke(t *testing.T) { } } -func TestFindNextSyncNonZeroOffset(t *testing.T) { +func TestSyncNonZeroOffset(t *testing.T) { bs, _ := hex.DecodeString("ffffff474000100000b00d0001c100000001e256f803e71bfffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + @@ -57,9 +58,9 @@ func TestFindNextSyncNonZeroOffset(t *testing.T) { "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + "ff4742") - r := bytes.NewReader(bs) + r := bufio.NewReader(bytes.NewReader(bs)) - offset, err := FindNextSync(r) + offset, err := Sync(r) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -68,13 +69,39 @@ func TestFindNextSyncNonZeroOffset(t *testing.T) { } } -func TestFindNextSyncNotFound(t *testing.T) { +func TestSyncNotFound(t *testing.T) { // no sync byte here bs, _ := hex.DecodeString("ff4000100000b00d0001c100000001e256f803e71bfffffff") - r := bytes.NewReader(bs) + r := bufio.NewReader(bytes.NewReader(bs)) - _, err := FindNextSync(r) + _, err := Sync(r) if err == nil { t.Errorf("Expected there to be an error, but there was not") } } + +func TestSyncReaderPosAtPacketStart(t *testing.T) { + bs, _ := hex.DecodeString("ffffff474000100000b00d0001c100000001e256f803e71bfffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + + "ff4742") + r := bufio.NewReader(bytes.NewReader(bs)) + + _, err := Sync(r) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + bytes := make([]byte, 3) + _, err = r.Read(bytes) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + wantHex := "474000" + gotHex := hex.EncodeToString(bytes) + if gotHex != wantHex { + t.Errorf("Reader not left in correct spot. Wanted next read %v, got %v", wantHex, gotHex) + } +} From e34e1171d3cfaeff894ce72162aa73ede0aa24d7 Mon Sep 17 00:00:00 2001 From: Dave Thompson Date: Fri, 2 Dec 2016 11:14:49 -0500 Subject: [PATCH 8/8] read pat: break from loop so required return not useless --- psi/pat.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psi/pat.go b/psi/pat.go index e8e2787..a3efb3d 100644 --- a/psi/pat.go +++ b/psi/pat.go @@ -137,7 +137,7 @@ func ReadPAT(r io.Reader) (PAT, error) { for pat == nil { if _, err := io.ReadFull(r, pkt); err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { - return nil, gots.ErrPATNotFound + break } return nil, err }