-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
98 lines (79 loc) · 2.28 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
var Deque = require('double-ended-queue');
function Flow() {
this.queue = new Deque();
this.deficit = 0;
}
function DRRQueue(options) {
if (!options) {
options = {};
}
this.length = 0;
// A queue of flows that have some data to process
this.activeList = new Deque();
this.activeFlow = null;
// A map of flows from id -> Flow
this.flows = {};
this.quantumSize = options.quantumSize || 1;
this.onUnidle = options.onUnidle || function () {};
};
function lookupFlow(drrqueue, flowId) {
var flow = drrqueue.flows[flowId];
if (!flow) {
flow = new Flow();
drrqueue.flows[flowId] = flow;
}
return flow;
}
DRRQueue.prototype.push = function (flowId, data, size) {
var flow = lookupFlow(this, flowId);
if (typeof(size) !== 'number' || !isFinite(size)) {
throw new Error('The size must be a finite number');
}
var count = flow.queue.push({data: data, size: size});
// If the flow is transitioning from 'no data' to 'some data'
// then push the flow into the active list
if (count === 1 && this.activeFlow !== flow) {
this.activeList.push(flow);
}
this.length++;
if (this.length === 1) {
this.onUnidle();
}
};
DRRQueue.prototype._processActiveFlow = function () {
var activeFlow = this.activeFlow;
var queue = activeFlow.queue;
if (!queue.isEmpty()) {
var next = queue.peekFront();
if (next.size <= activeFlow.deficit) {
activeFlow.deficit -= next.size;
queue.shift();
this.length--;
return next.data;
} else {
this.activeList.push(activeFlow);
}
}
this.activeFlow = null;
return null;
};
DRRQueue.prototype.pop = function () {
var result;
if (this.activeFlow) {
result = this._processActiveFlow();
if (result) {
return result;
}
}
while (!this.activeList.isEmpty()) {
this.activeFlow = this.activeList.shift();
this.activeFlow.deficit += this.quantumSize;
result = this._processActiveFlow();
if (result) {
return result;
}
}
// If no work was in the queue, then return undefined
return undefined;
};
module.exports = DRRQueue;