Skip to content

Commit

Permalink
Implement a simple logging facility
Browse files Browse the repository at this point in the history
  • Loading branch information
heikkiorsila committed Dec 11, 2020
1 parent 183d2aa commit b43b8d8
Showing 1 changed file with 144 additions and 74 deletions.
218 changes: 144 additions & 74 deletions vtsaggregator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# A monitoring tool for aggregating stats from nginx-module-vts plugin
#
# @TODO: Aggregate statistics from multiple backends, i.e. cluster support
# @TODO: Online plotting for debuggin/development
# @TODO: Online plotting for debugging/development
# @TODO: Test aggregation by plotting scraped data from the test cluster
# @TODO: Logging support

import argparse
import ast
Expand All @@ -14,13 +13,15 @@
import os
import pprint
from sortedcontainers import SortedDict
import sys
import time
import traceback
from typevalidator import ZERO_OR_MORE, OPTIONAL_KEY, validate2
import urllib
import urllib3
from typing import List, Set

from typevalidator import ZERO_OR_MORE, OPTIONAL_KEY, validate2

DESCRIPTION = """
A monitoring tool for aggregating stats from Nginx Vhost Traffic Status plugin.
See https://github.com/vozlt/nginx-module-vts.
Expand Down Expand Up @@ -129,11 +130,64 @@
't_prev': float,
}

LOG_DIR = None


class ConfigError(Exception):
pass


def _write_log_entry(log_entry):
if LOG_DIR is None:
return
log_name = os.path.join(LOG_DIR, 'vtsaggregator.log')
log_line = repr(log_entry) + '\n'
try:
with open(log_name, 'a') as f:
f.write(log_line)
except OSError as e:
print('Unable to open or write a log entry:', e)
return


def _args_to_prefix(*args):
return ' '.join([str(x) for x in args])


def log_exception(e, *args):
msg = _args_to_prefix(*args)
log_entry = {
'type': 'exception',
'argv': sys.argv,
'exc': traceback.format_exc(),
'msg': msg,
}
print('{}\n\n{}'.format(msg, log_entry['exc']))
_write_log_entry(log_entry)


def log_error(*args):
msg = _args_to_prefix(*args)
log_entry = {
'type': 'error',
'argv': sys.argv,
'msg': msg,
}
print('Error: {}'.format(msg))
_write_log_entry(log_entry)


def log_warning(*args):
msg = _args_to_prefix(*args)
log_entry = {
'type': 'warning',
'argv': sys.argv,
'msg': msg,
}
print('Warning: {}'.format(msg))
_write_log_entry(log_entry)


def key_dict_to_tuple(key_dict):
return tuple(sorted(key_dict.items()))

Expand Down Expand Up @@ -403,8 +457,8 @@ def _get_stat_zones(self, backend, backend_stats):
if len(zones) == 0:
try:
zones = list(backend_stats['serverZones'])
except KeyError:
print('serverZones is not defined for', backend)
except KeyError as e:
log_exception(e, 'serverZones is not defined for', backend)
zones = None
return zones

Expand Down Expand Up @@ -468,8 +522,7 @@ def load(self):
try:
data = open(self._checkpoint).read()
except FileNotFoundError:
print('Warning: Checkpoint {} does not exist.'.format(
self._checkpoint))
log_warning('Checkpoint', self._checkpoint, 'does not exist')
return
try:
d = ast.literal_eval(data)
Expand Down Expand Up @@ -963,7 +1016,77 @@ def _log_stats(stat_dir, backend_data, step):
with open(dest_name, 'wb') as f:
f.write(data)
except OSError as e:
print('Unable to write to stats dir', e)
log_error('Unable to write to stats dir', e)


def run_vtsaggregator(args):
if args.interval <= 0:
raise ConfigError('--interval value must be a positive integer')
late_margin = args.late_margin
if late_margin is None:
late_margin = min(10.0, args.interval / 2)
if late_margin <= 0 or late_margin > (args.interval / 2):
raise ConfigError('--late-margin value must be a positive float '
'not greater than interval/2')
state = State(args)
state.load()

t = time.time()
if args.test_mode:
t_next = t + args.interval
else:
t_next = _get_start_time(t, args.interval)

CLOCK_DRIFT_MARGIN = 0.1

step = 0
while args.test_limit < 0 or step < args.test_limit:
t = time.time()

if args.test_mode:
# No sleeping in test mode
t = t_next

if t < t_next:
time.sleep(t_next + CLOCK_DRIFT_MARGIN - t)
continue

t_deadline = t_next + late_margin
if t < t_deadline:
if args.test_mode:
if step >= len(args.urls):
print('No more test data. Stopping.')
break
urls = [args.urls[step]]
else:
urls = args.urls

# Fetch backend data from monitoring end-points
backend_data = state.fetch_backend_data(urls, args)
t_end = time.time()
if t_end >= t_deadline:
log_warning('Scraping was late by {} seconds. '
'Results are not counted.'.format(
t_end - t_deadline))

_log_stats(args.stat_dir, backend_data, step)

# Aggregate backend data
state.aggregate_backend_data(t, backend_data, args)
state.save()
if args.verbose:
t_analysis_end = time.time()
print('Fetch_backend_data() duration: {:.3f}s'.format(
t_end - t))
print('Aggregate and save duration: {:.3f}s'.format(
t_analysis_end - t_end))
else:
log_warning('Missed a minute interval: time_t', t)
t_next += args.interval
step += 1

if args.plot:
_plot(args, state)


def main():
Expand Down Expand Up @@ -995,6 +1118,11 @@ def main():
parser.add_argument(
'--latency-percentiles', default='0,1,5,10,50,90,95,99,100',
help='Comma separated floats of latency percentiles to monitor for.')
parser.add_argument(
'--log-dir',
help=('Write logs of errors and exceptional at '
'{log_dir}/vtsaggregator.log where log_dir is the given '
'argument.'))
parser.add_argument(
'--milliseconds', required=True,
help=('Write millisecond json to a given target file. The target '
Expand Down Expand Up @@ -1041,73 +1169,15 @@ def main():
'Defaults to monitoring all zones.'))
args = parser.parse_args()

if args.interval <= 0:
raise ConfigError('--interval value must be a positive integer')
late_margin = args.late_margin
if late_margin is None:
late_margin = min(10.0, args.interval / 2)
if late_margin <= 0 or late_margin > (args.interval / 2):
raise ConfigError('--late-margin value must be a positive float '
'not greater than interval/2')
state = State(args)
state.load()

t = time.time()
if args.test_mode:
t_next = t + args.interval
else:
t_next = _get_start_time(t, args.interval)

CLOCK_DRIFT_MARGIN = 0.1

step = 0
while args.test_limit < 0 or step < args.test_limit:
t = time.time()

if args.test_mode:
# No sleeping in test mode
t = t_next

if t < t_next:
time.sleep(t_next + CLOCK_DRIFT_MARGIN - t)
continue

t_deadline = t_next + late_margin
if t < t_deadline:
if args.test_mode:
if step >= len(args.urls):
print('No more test data. Stopping.')
break
urls = [args.urls[step]]
else:
urls = args.urls

# Fetch backend data from monitoring end-points
backend_data = state.fetch_backend_data(urls, args)
t_end = time.time()
if t_end >= t_deadline:
print('Scraping was late by {} seconds. '
'Results are not counted.'.format(t_end - t_deadline))

_log_stats(args.stat_dir, backend_data, step)

# Aggregate backend data
state.aggregate_backend_data(t, backend_data, args)
state.save()
if args.verbose:
t_analysis_end = time.time()
print('Fetch_backend_data() duration: {:.3f}s'.format(
t_end - t))
print('Aggregate and save duration: {:.3f}s'.format(
t_analysis_end - t_end))
else:
print('Warning: Missed a minute interval: time_t', t)
t_next += args.interval
step += 1
global LOG_DIR
LOG_DIR = args.log_dir

if args.plot:
_plot(args, state)
try:
run_vtsaggregator(args)
except Exception as e:
log_exception(e, 'Exception not catched')
return 0


if __name__ == '__main__':
main()
sys.exit(main())

0 comments on commit b43b8d8

Please sign in to comment.