-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathjob.go
236 lines (195 loc) · 5.21 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package corral
import (
"bufio"
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
"github.com/bcongdon/corral/internal/pkg/corfs"
humanize "github.com/dustin/go-humanize"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
// Job is the logical container for a MapReduce job
type Job struct {
Map Mapper
Reduce Reducer
PartitionFunc PartitionFunc
fileSystem corfs.FileSystem
config *config
intermediateBins uint
outputPath string
bytesRead int64
bytesWritten int64
}
// Logic for running a single map task
func (j *Job) runMapper(mapperID uint, splits []inputSplit) error {
emitter := newMapperEmitter(j.intermediateBins, mapperID, j.outputPath, j.fileSystem)
if j.PartitionFunc != nil {
emitter.partitionFunc = j.PartitionFunc
}
for _, split := range splits {
err := j.runMapperSplit(split, &emitter)
if err != nil {
return err
}
}
atomic.AddInt64(&j.bytesWritten, emitter.bytesWritten())
return emitter.close()
}
func splitInputRecord(record string) *keyValue {
fields := strings.Split(record, "\t")
if len(fields) == 2 {
return &keyValue{
Key: fields[0],
Value: fields[1],
}
}
return &keyValue{
Value: record,
}
}
// runMapperSplit runs the mapper on a single inputSplit
func (j *Job) runMapperSplit(split inputSplit, emitter Emitter) error {
offset := split.StartOffset
if split.StartOffset != 0 {
offset--
}
inputSource, err := j.fileSystem.OpenReader(split.Filename, split.StartOffset)
if err != nil {
return err
}
scanner := bufio.NewScanner(inputSource)
var bytesRead int64
splitter := countingSplitFunc(bufio.ScanLines, &bytesRead)
scanner.Split(splitter)
if split.StartOffset != 0 {
scanner.Scan()
}
for scanner.Scan() {
record := scanner.Text()
kv := splitInputRecord(record)
j.Map.Map(kv.Key, kv.Value, emitter)
// Stop reading when end of inputSplit is reached
pos := bytesRead
if split.Size() > 0 && pos > split.Size() {
break
}
}
atomic.AddInt64(&j.bytesRead, bytesRead)
return nil
}
// Logic for running a single reduce task
func (j *Job) runReducer(binID uint) error {
// Determine the intermediate data files this reducer is responsible for
path := j.fileSystem.Join(j.outputPath, fmt.Sprintf("map-bin%d-*", binID))
files, err := j.fileSystem.ListFiles(path)
if err != nil {
return err
}
// Open emitter for output data
path = j.fileSystem.Join(j.outputPath, fmt.Sprintf("output-part-%d", binID))
emitWriter, err := j.fileSystem.OpenWriter(path)
defer emitWriter.Close()
if err != nil {
return err
}
data := make(map[string][]string, 0)
var bytesRead int64
for _, file := range files {
reader, err := j.fileSystem.OpenReader(file.Name, 0)
bytesRead += file.Size
if err != nil {
return err
}
// Feed intermediate data into reducers
decoder := json.NewDecoder(reader)
for decoder.More() {
var kv keyValue
if err := decoder.Decode(&kv); err != nil {
return err
}
if _, ok := data[kv.Key]; !ok {
data[kv.Key] = make([]string, 0)
}
data[kv.Key] = append(data[kv.Key], kv.Value)
}
reader.Close()
// Delete intermediate map data
if j.config.Cleanup {
err := j.fileSystem.Delete(file.Name)
if err != nil {
log.Error(err)
}
}
}
var waitGroup sync.WaitGroup
sem := semaphore.NewWeighted(10)
emitter := newReducerEmitter(emitWriter)
for key, values := range data {
sem.Acquire(context.Background(), 1)
waitGroup.Add(1)
go func(key string, values []string) {
defer sem.Release(1)
keyChan := make(chan string)
keyIter := newValueIterator(keyChan)
go func() {
defer waitGroup.Done()
j.Reduce.Reduce(key, keyIter, emitter)
}()
for _, value := range values {
// Pass current value to the appropriate key channel
keyChan <- value
}
close(keyChan)
}(key, values)
}
waitGroup.Wait()
atomic.AddInt64(&j.bytesWritten, emitter.bytesWritten())
atomic.AddInt64(&j.bytesRead, bytesRead)
return nil
}
// inputSplits calculates all input files' inputSplits.
// inputSplits also determines and saves the number of intermediate bins that will be used during the shuffle.
func (j *Job) inputSplits(inputs []string, maxSplitSize int64) []inputSplit {
files := make([]string, 0)
for _, inputPath := range inputs {
fileInfos, err := j.fileSystem.ListFiles(inputPath)
if err != nil {
log.Warn(err)
continue
}
for _, fInfo := range fileInfos {
files = append(files, fInfo.Name)
}
}
splits := make([]inputSplit, 0)
var totalSize int64
for _, inputFileName := range files {
fInfo, err := j.fileSystem.Stat(inputFileName)
if err != nil {
log.Warnf("Unable to load input file: %s (%s)", inputFileName, err)
continue
}
totalSize += fInfo.Size
splits = append(splits, splitInputFile(fInfo, maxSplitSize)...)
}
if len(files) > 0 {
log.Debugf("Average split size: %s bytes", humanize.Bytes(uint64(totalSize)/uint64(len(splits))))
}
j.intermediateBins = uint(float64(totalSize/j.config.ReduceBinSize) * 1.25)
if j.intermediateBins == 0 {
j.intermediateBins = 1
}
return splits
}
// NewJob creates a new job from a Mapper and Reducer.
func NewJob(mapper Mapper, reducer Reducer) *Job {
return &Job{
Map: mapper,
Reduce: reducer,
config: &config{},
}
}