forked from wavded/graygelf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.js
98 lines (79 loc) · 2.7 KB
/
server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
var zlib = require('zlib')
var dgram = require('dgram')
var Stream = require('stream')
var GrayGelfServer = function() {
if (!(this instanceof GrayGelfServer)) {
return new GrayGelfServer()
}
this.pendingChunks = Object.create(null)
this.readable = true
}
GrayGelfServer.prototype = Object.create(Stream.prototype)
GrayGelfServer.prototype._checkError = function(er) { this.emit('error', er) }
GrayGelfServer.prototype.listen = function(port, address) {
if (this._udp) throw new Error('GrayGelf is already listening on a port')
this.port = port || 12201
this.address = address || '0.0.0.0'
this._udp = dgram.createSocket('udp4')
this._udp.on('error', this._checkError.bind(this))
this._udp.on('message', this._message.bind(this))
this._udp.bind(this.port, this.address)
// clean incomplete chunked messages
this._chunkInterval = setInterval(this._checkExpired.bind(this), 60000)
return this
}
GrayGelfServer.prototype.unref = function() {
this._udp.unref()
this._chunkInterval.unref()
return this
}
GrayGelfServer.prototype.close = function() {
this._udp.close()
this._udp = null
clearInterval(this._chunkInterval)
}
GrayGelfServer.prototype._checkExpired = function() {
var now = Date.now()
for (var id in this.pendingChunks)
if (now - this.pendingChunks[id].lastReceived >= 60000)
delete this.pendingChunks[id]
}
GrayGelfServer.prototype._handleChunk = function(chunk) {
var id = chunk.toString('ascii', 2, 8)
var index = chunk[10]
var total = chunk[11]
var chunks = this.pendingChunks[id] || {data: []}
chunks.data[index] = chunk.slice(12) // store without chunk header
chunks.lastReceived = Date.now()
this.pendingChunks[id] = chunks
if (chunks.data.length === total) { // last index has been filled
while (total--) if (!Buffer.isBuffer(chunks.data[total])) return // make sure the array is filled
this._message(Buffer.concat(chunks.data)) // create complete buffer
delete this.pendingChunks[id]
}
}
GrayGelfServer.prototype._message = function(buf, details) {
if (details) this.emit('data', buf) // from udp.on('message')
switch (buf[0]) {
case 0x78: // zlib (deflate) message
zlib.inflate(buf, this._broadcast.bind(this))
break
case 0x1f: // gzip message
zlib.gunzip(buf, this._broadcast.bind(this))
break
case 0x1e: // chunked message
this._handleChunk(buf)
break
case 0x7b: // json message (not compressed)
this._broadcast(null, buf)
break
default: // unknown message
}
}
GrayGelfServer.prototype._broadcast = function(er, buf) {
/* istanbul ignore next */
if (er) return this.emit('error', er)
var data = JSON.parse(buf.toString())
this.emit('message', data)
}
module.exports = GrayGelfServer