forked from thinkst/canarytokens
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel_output_webhook.py
70 lines (55 loc) · 2.36 KB
/
channel_output_webhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""
Output channel that sends to webhooks.
"""
from zope.interface import implementer
from twisted.web.iweb import IBodyProducer
from twisted.internet.defer import succeed
from twisted.logger import Logger
log = Logger()
from twisted.internet import reactor
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
import simplejson
from channel import OutputChannel
from constants import OUTPUT_CHANNEL_WEBHOOK
@implementer(IBodyProducer)
class BytesProducer:
def __init__(self, body):
self.body = body
self.length = len(body)
def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass
class WebhookOutputChannel(OutputChannel):
CHANNEL = OUTPUT_CHANNEL_WEBHOOK
def do_send_alert(self, input_channel=None, canarydrop=None, **kwargs):
slack = "https://hooks.slack.com"
try:
if (slack in canarydrop['alert_webhook_url']):
payload = input_channel.format_slack_canaryalert(
canarydrop=canarydrop,
**kwargs)
else:
payload = input_channel.format_webhook_canaryalert(
canarydrop=canarydrop,
**kwargs)
self.generic_webhook_send(simplejson.dumps(payload), canarydrop)
except Exception as e:
log.error(e)
def generic_webhook_send(self, payload=None, canarydrop=None):
def handle_response(response):
if response.code != 200:
log.error("Failed sending request to webhook {url} with code {error}".format(url=canarydrop['alert_webhook_url'],error=response.code))
else:
log.info('Webhook sent to {url}'.format(url=canarydrop['alert_webhook_url']))
def handle_error(result):
log.error("Failed sending request to webhook {url} with error {error}".format(url=canarydrop['alert_webhook_url'],error=result))
agent = Agent(reactor)
body = BytesProducer(payload)
d = agent.request("POST", canarydrop['alert_webhook_url'], Headers({'content-type': ['application/json']}), body)
d.addCallback(handle_response)
d.addErrback(handle_error)