-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdirreader.go
151 lines (131 loc) · 3.34 KB
/
dirreader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package dedup
import (
"path/filepath"
"sync"
)
// dirReader concurrently reads the directory located at root and sends file
// paths on out, errors on err.
type dirReader struct {
root string // Path of directory to be read.
opts *Options
numProcs int // Number of worker goroutines to start.
busyProcs sync.WaitGroup // Coordinate active worker goroutines.
queue chan string // Paths of directories to be read.
busyDirs sync.WaitGroup // Coordinate active directories.
out chan string // Outgoing file paths.
err chan error // Outgoing errors.
done chan struct{} // Signal worker goroutines to return.
cancel *signal // Signal cancellation.
}
func newDirReader(path string, numProcs int, opts *Options) *dirReader {
r := new(dirReader)
r.root = path
r.opts = opts
r.numProcs = numProcs
r.queue = make(chan string, r.numProcs)
r.out = make(chan string, r.numProcs)
r.err = make(chan error)
r.done = make(chan struct{})
r.cancel = newSignal()
return r
}
// Start launches worker goroutines and begins reading the configured
// root directory. Not to be called more than once on the same instance.
func (r *dirReader) Start() {
r.busyProcs.Add(r.numProcs)
for i := 0; i < r.numProcs; i++ {
go r.worker()
}
go func() {
r.enqueue(r.root)
r.busyDirs.Wait()
close(r.done) // r.queue is empty: signal worker goroutines to return
r.busyProcs.Wait() // and wait for them.
close(r.queue)
close(r.out)
close(r.err)
}()
}
// Cancel signals worker goroutines to return immediately the first time it is
// called. Subsequent calls to Cancel have no effect.
func (r *dirReader) Cancel() {
r.cancel.Once()
r.busyDirs.Wait()
r.busyProcs.Wait()
}
func (r *dirReader) worker() {
defer r.busyProcs.Done()
for {
select {
case <-r.cancel.C():
return
case <-r.done:
return
case path := <-r.queue:
r.handle(path)
}
}
}
func (r *dirReader) enqueue(path string) {
r.busyDirs.Add(1)
select {
case <-r.cancel.C():
r.busyDirs.Done()
case r.queue <- path:
default: // r.queue is full: visit path synchronously.
r.handle(path)
}
}
// handle reads file names from the directory located at path and sends file
// paths on r.out. If path is "/dir" and a file is named "file1", "/dir/file1"
// is sent on r.out. If the Recursive option is set and a sub-directory is
// encountered, it is enqueued for reading. If path is the location of a
// regular file instead of a directory, that file is sent on r.out and handle
// returns.
func (r *dirReader) handle(path string) {
defer r.busyDirs.Done()
info, path, err := lstat(r.opts.fs, path, r.opts.FollowSymlinks)
if err != nil {
r.emitErr(err)
return
}
if !info.IsDir() {
r.emit(path)
return
}
names, err := r.opts.fs.Readdirnames(path)
if err != nil {
r.emitErr(err)
return
}
for _, name := range names {
select {
case <-r.cancel.C():
return
default:
}
fullPath := filepath.Join(path, name)
info, fullPath, err = lstat(r.opts.fs, fullPath, r.opts.FollowSymlinks)
if err != nil {
r.emitErr(err)
continue
}
if !info.IsDir() {
r.emit(fullPath)
} else if r.opts.Recursive {
r.enqueue(fullPath)
}
}
}
func (r *dirReader) emit(path string) {
select {
case <-r.cancel.C():
case r.out <- path:
}
}
func (r *dirReader) emitErr(err error) {
select {
case <-r.cancel.C():
case r.err <- err:
}
}