-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathread.go
137 lines (119 loc) · 3.25 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package warc
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"strconv"
"github.com/CorentinB/warc/pkg/spooledtempfile"
)
// Reader store the bufio.Reader and gzip.Reader for a WARC file
type Reader struct {
bufReader *bufio.Reader
record *Record
threshold int
}
type reader interface {
ReadString(delim byte) (line string, err error)
}
// NewReader returns a new WARC reader
func NewReader(reader io.ReadCloser) (*Reader, error) {
decReader, err := NewDecompressionReader(reader)
if err != nil {
return nil, err
}
bufioReader := bufio.NewReader(decReader)
thresholdString := os.Getenv("WARCMaxInMemorySize")
threshold := -1
if thresholdString != "" {
threshold, err = strconv.Atoi(thresholdString)
if err != nil {
return nil, err
}
}
return &Reader{
bufReader: bufioReader,
threshold: threshold,
}, nil
}
func readUntilDelim(r reader, delim []byte) (line []byte, err error) {
for {
s := ""
s, err = r.ReadString(delim[len(delim)-1])
if err != nil {
line = append(line, []byte(s)...)
return line, err
}
line = append(line, []byte(s)...)
if bytes.HasSuffix(line, delim) {
return line[:len(line)-len(delim)], nil
}
}
}
// ReadRecord reads the next record from the opened WARC file
// returns:
// - Record: if an error occurred, record **may be** nil. if eol is true, record **must be** nil.
// - bool (eol): if true, we readed all records successfully.
// - error: error
func (r *Reader) ReadRecord() (*Record, bool, error) {
var (
err error
tempReader *bufio.Reader
)
tempReader = bufio.NewReader(r.bufReader)
// first line: WARC version
var warcVer []byte
warcVer, err = readUntilDelim(tempReader, []byte("\r\n"))
if err != nil {
if err == io.EOF {
return nil, true, nil // EOF, no error
}
return nil, false, fmt.Errorf("reading WARC version: %w", err)
}
// Parse the record headers
header := NewHeader()
for {
line, err := readUntilDelim(tempReader, []byte("\r\n"))
if err != nil {
return nil, false, fmt.Errorf("reading header: %w", err)
}
if len(line) == 0 {
break
}
if key, value := splitKeyValue(string(line)); key != "" {
header.Set(key, value)
}
}
// Get the Content-Length
length, err := strconv.ParseInt(header.Get("Content-Length"), 10, 64)
if err != nil {
return nil, false, fmt.Errorf("parsing Content-Length: %w", err)
}
// reading doesn't really need to be in TempDir, nor can we access it as it's on the client.
buf := spooledtempfile.NewSpooledTempFile("warc", "", r.threshold, false, -1)
_, err = io.CopyN(buf, tempReader, length)
if err != nil {
return nil, false, fmt.Errorf("copying record content: %w", err)
}
r.record = &Record{
Header: header,
Content: buf,
Version: string(warcVer),
}
// Skip two empty lines
for i := 0; i < 2; i++ {
boundary, _, err := r.bufReader.ReadLine()
if err != nil {
if err == io.EOF {
// record shall consist of a record header followed by a record content block and two newlines
return r.record, false, fmt.Errorf("early EOF record boundary: %w", err)
}
return r.record, false, fmt.Errorf("reading record boundary: %w", err)
}
if len(boundary) != 0 {
return r.record, false, fmt.Errorf("non-empty record boundary [boundary: %s]", boundary)
}
}
return r.record, false, nil // ok
}