This repository has been archived by the owner on Sep 14, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster.go
145 lines (118 loc) · 2.76 KB
/
master.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
import (
"launchpad.net/tomb"
_ "log"
"sync"
"time"
)
type SummaryEmitter interface {
PublishSummaryEvent(d time.Duration, throughput, responseTime, efficiency float64)
}
type SummaryEvent struct {
Duration time.Duration
MeanResponseTimeMs float64
OpsPerSecond float64
Efficiency float64
}
type master struct {
t tomb.Tomb
conf *AppConfig
t0 time.Time
wg *sync.WaitGroup
tm *taskmaster
hosts []*sandbox
stats *calculator
statsChan chan *SummaryEvent
factory BehaviorFactory
}
func NewMaster(conf *AppConfig, factory BehaviorFactory) *master {
wg := &sync.WaitGroup{}
return &master{
conf: conf,
wg: wg,
tm: nil,
hosts: make([]*sandbox, conf.Clients),
stats: nil,
statsChan: make(chan *SummaryEvent),
factory: factory,
}
}
func (this *master) Start() {
go this.loop()
}
func (this *master) Stop() (err error) {
this.t.Kill(nil)
return this.t.Wait()
}
func (this *master) SummaryEvents() <-chan *SummaryEvent {
return this.statsChan
}
func (this *master) PublishSummaryEvent(d time.Duration, throughput, responseTime, activeLoad float64) {
this.statsChan <- &SummaryEvent{d, responseTime, throughput, activeLoad}
}
// Only call this after the goroutine is dead.
func (this *master) Statistics() Statistics {
return this.stats
}
func (this *master) loop() {
const (
ProgressInterval = 1 * time.Second
MaxChannelReads = 64
)
defer this.t.Done()
this.setup()
prChan := time.After(ProgressInterval)
for {
select {
case <-this.t.Dying():
this.shutdown()
return
case <-this.tm.t.Dead():
this.t.Kill(nil)
case <-prChan:
this.stats.summarize()
prChan = time.After(ProgressInterval)
default:
// Attempt to read from the channel a bunch of times
// between each death check.
this.stats.capture(MaxChannelReads)
}
}
}
func (this *master) setup() {
// Record approximate start time
this.t0 = time.Now()
// Initialize the taskmaster
this.tm = NewTaskMaster(&TaskMasterInfo{
WaitGroup: this.wg,
Properties: this.conf.Properties,
})
// Initialize the stats recorder
this.stats = NewCalculator(
this.conf, this.tm.ResponseTimes(), this, this.t0)
// Initialize client sandboxes
count := this.conf.Clients
for i := 0; i < count; i += 1 {
info := &SandboxInfo{
Id: i,
Properties: this.conf.Properties,
Duration: this.conf.d,
StartTime: this.t0,
Emitter: this.tm,
WaitGroup: this.wg,
Factory: this.factory,
}
this.hosts[i] = NewSandbox(info)
this.wg.Add(1)
}
// Spawn the taskmaster
this.tm.Start()
// Spawn each sandbox
for _, host := range this.hosts {
host.Start()
}
}
func (this *master) shutdown() {
this.stats.summarize()
close(this.statsChan)
}