diff --git a/.gitignore b/.gitignore index 51ea81e..718e640 100644 --- a/.gitignore +++ b/.gitignore @@ -547,6 +547,8 @@ ASALocalRun/ # End of https://www.gitignore.io/api/linux,osx,windows,python,vim,visualstudio +metREx/favicon.ico + # Docker docker-compose.yml diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml index 177e26e..bbc2e5d 100644 --- a/docker-compose-dev.yml +++ b/docker-compose-dev.yml @@ -8,4 +8,5 @@ services: ports: - 5000:5000 volumes: - - .:/usr/src \ No newline at end of file + - .:/usr/src + - ./favicon.ico:/usr/src/metREx/favicon.ico \ No newline at end of file diff --git a/metREx/app/__init__.py b/metREx/app/__init__.py index b7d2491..726e4a9 100644 --- a/metREx/app/__init__.py +++ b/metREx/app/__init__.py @@ -9,7 +9,7 @@ from .main.controller.scheduler_controller import api as scheduler_ns __title__ = 'metREx' -__version__ = '0.4.0.post3' +__version__ = '0.5.0' __description__ = 'SQL query and monitoring system metrics exporter for Prometheus' blueprint = Blueprint('api', __name__) diff --git a/metREx/app/main/__init__.py b/metREx/app/main/__init__.py index 5275993..1846a78 100644 --- a/metREx/app/main/__init__.py +++ b/metREx/app/main/__init__.py @@ -1,4 +1,5 @@ import json +import logging from datetime import datetime, timedelta @@ -51,6 +52,12 @@ def create_app(config_name): app.config.from_object(config_obj) + if not app.debug: + logger = logging.getLogger('werkzeug') + logger.setLevel(logging.ERROR) + + # app.logger.setLevel(logging.ERROR) + db.init_app(app) aa.init_app(app) @@ -73,10 +80,14 @@ def create_app(config_name): def delete_job(job_name, pushgateways): if job_name in registered_collectors.keys(): if len(pushgateways): - options = {} + options = { + 'grouping_key': { + 'job': job_name + } + } if registered_collectors[job_name].instance: - options['grouping_key'] = {'instance': registered_collectors[job_name].instance} + options['grouping_key']['instance'] = registered_collectors[job_name].instance for service, pushgateway in pushgateways.items(): try: @@ -143,10 +154,14 @@ def set_job_collector_metrics(job_name, collector_metrics): if registered_collectors[job_name].instance is None: generate_latest(job_collector_registry) - options = {} + options = { + 'grouping_key': { + 'job': job_name + } + } if registered_collectors[job_name].instance: - options['grouping_key'] = {'instance': registered_collectors[job_name].instance} + options['grouping_key']['instance'] = registered_collectors[job_name].instance for service, pushgateway in pushgateways.items(): try: diff --git a/metREx/app/main/config.py b/metREx/app/main/config.py index 260c21a..e636bb0 100644 --- a/metREx/app/main/config.py +++ b/metREx/app/main/config.py @@ -8,6 +8,8 @@ from dotenv import load_dotenv +from sqlalchemy.pool import NullPool, SingletonThreadPool + from .util import api_helper from .util import apscheduler_helper from .util.misc_helper import str_to_bool @@ -87,6 +89,10 @@ class Config: PUSHGATEWAYS = {} + SQLALCHEMY_ENGINE_OPTIONS = { + 'poolclass': NullPool + } + SQLALCHEMY_TRACK_MODIFICATIONS = False SCHEDULER_API_ENABLED = False @@ -181,6 +187,10 @@ class TestingConfig(Config): TESTING = True PRESERVE_CONTEXT_ON_EXCEPTION = False + SQLALCHEMY_ENGINE_OPTIONS = { + 'poolclass': SingletonThreadPool + } + def __init__(self): database_service_name = service_prefix['SQLALCHEMY'] + 'TEST' @@ -208,6 +218,8 @@ def __init__(self): class ProductionConfig(Config): DEBUG = str_to_bool(os.getenv('DEBUG', False)) + ERROR_INCLUDE_MESSAGE = str_to_bool(os.getenv('ERROR_INCLUDE_MESSAGE', False)) + config_by_name = dict( dev=DevelopmentConfig, diff --git a/metREx/app/main/database/__init__.py b/metREx/app/main/database/__init__.py index bd16c17..41bed7c 100644 --- a/metREx/app/main/database/__init__.py +++ b/metREx/app/main/database/__init__.py @@ -2,26 +2,21 @@ class DatabaseAccessLayer: - _db_session = None - _engine = None + _session = None def __init__(self, db): self._db = db def execute(self, statement): - result = self._db_session.execute(text(statement).execution_options(autocommit=False)) - - self._db_session.rollback() + result = self._session.execute(text(statement).execution_options(autocommit=False)) return result def init_db(self, bind): - self._engine = self._db.get_engine(bind=bind) + engine = self._db.get_engine(bind=bind) - self._db_session = self._db.create_scoped_session({ - 'bind': self._engine + self._session = self._db.create_scoped_session({ + 'autocommit': False, + 'autoflush': False, + 'bind': engine }) - - def __del__(self): - if self._engine is not None: - self._engine.dispose() diff --git a/metREx/app/main/service/metrics_service.py b/metREx/app/main/service/metrics_service.py index ca34dd5..6879045 100644 --- a/metREx/app/main/service/metrics_service.py +++ b/metREx/app/main/service/metrics_service.py @@ -49,416 +49,417 @@ def format_label(string): return label.lower() -def generate_metrics(*args): - if len(args) >= 2: - category = args[0] - job_name = args[1] +def get_metric_info(service): + service_name_pattern = re.compile(r'\[(?P[^\[\]]+)\]', re.X) - func_name = 'get_' + category + '_metrics' + prefix = re.sub(service_name_pattern, '', service) - if func_name in globals(): - with aps.app.app_context(): - try: - aps.app.logger.info("Job '" + job_name + "' started.") + instance = None - collector_metrics = globals()[func_name](*args[1:]) + m = service_name_pattern.search(service) - set_job_collector_metrics(job_name, collector_metrics) + if m is not None: + components = m.groupdict() - aps.app.logger.info("Job '" + job_name + "' completed.") - except Exception as e: - aps.app.logger.warning("Job '" + job_name + "' failed. " + type(e).__name__ + ": " + str(e)) + instance = components['instance'] - if aps.app.config.get('SUSPEND_JOB_ON_FAILURE'): - aps.pause_job(job_name) + return prefix.lower(), instance - aps.app.logger.warning("Job '" + job_name + "' suspended.") - if job_name in registered_collectors.keys(): - unregister_collector(job_name, registered_collectors[job_name]) +def is_number(n): + try: + float(n) + except ValueError: + return False - del registered_collectors[job_name] + return True -def get_appdynamics_metrics(job_name, services, application, metric_path, minutes, static_labels=()): - collector_metrics = {} +def test_aggregation_match(value, aggregation): + if 'threshold' in aggregation.keys(): + return eval('%d %s %d' % (value, aggregation['threshold']['operator'], int(aggregation['threshold']['value']))) - pushgateways = aps.app.config.get('PUSHGATEWAYS') + return True - for service_name in services: - prefix, instance = get_metric_info(service_name) - dal = AppD(aa) - dal.init_aa(service_name) +def test_aggregation_settings(aggregation, job_name): + if 'funcs' in aggregation.keys(): + valid_funcs = [ + 'avg', + 'count', + 'min', + 'max', + 'sum' + ] - options = { - 'time_range_type': 'BEFORE_NOW', - 'duration_in_mins': int(minutes), - 'rollup': True - } + for func in aggregation['funcs']: + if func not in valid_funcs: + if is_number(func): + if int(func) not in range(0, 101): + raise ValueError("Invalid aggregation percentile '" + func + "' in job '" + job_name + "'.") + else: + raise ValueError("Unsupported aggregation function '" + func + "' in job '" + job_name + "'.") + else: + aggregation['funcs'] = [ + 'count' + ] - result = dal.client.get_metrics(metric_path, application, **options) + if 'threshold' in aggregation.keys(): + if 'operator' in aggregation['threshold'].keys(): + valid_operator = [ + '>', + '<', + '>=', + '<=', + '=', + '<>', + '!=' + ] - metrics = [] + if aggregation['threshold']['operator'] not in valid_operator: + raise ValueError("Unsupported aggregation threshold operator '" + aggregation['threshold']['operator'] + "' in job '" + job_name + "'.") + else: + raise ValueError("No operator specified for aggregation threshold in job '" + job_name + "'.") - for metric_obj in result: - components = metric_obj.path.split('|') + if 'value' in aggregation['threshold'].keys(): + if is_number(aggregation['threshold']['value']): + aggregation['threshold']['value'] = aggregation['threshold']['value'] + else: + raise ValueError("Invalid value '" + aggregation['threshold']['value'] + "' specified for aggregation threshold in job '" + job_name + "'.") + else: + raise ValueError("No value specified for aggregation threshold in job '" + job_name + "'.") - if components is not None: - label_dict = OrderedDict() + return aggregation - if len(pushgateways): - if instance is not None: - label_dict['instance'] = instance - if application == 'Database Monitoring': - label_dict['collector'] = components.pop(1) +def to_string(n): + return str(n) if n is not None else '' - while len(components) > 3: - node = format_label(components.pop(1)) - label_dict[node] = components.pop(1) - else: - label_dict['application'] = application +class Metrics: + @staticmethod + def _get_appdynamics_metrics(job_name, services, application, metric_path, minutes, static_labels=()): + collector_metrics = {} - if components[0] == 'Backends': - pattern = re.compile(r'^(?PDiscovered backend call) - (?P.+)$', re.X) + pushgateways = aps.app.config.get('PUSHGATEWAYS') - m = pattern.match(components.pop(1)) + for service_name in services: + prefix, instance = get_metric_info(service_name) - if m is not None: - subcomponents = m.groupdict() + dal = AppD(aa) + dal.init_aa(service_name) - node = format_label(subcomponents['node']) + options = { + 'time_range_type': 'BEFORE_NOW', + 'duration_in_mins': int(minutes), + 'rollup': True + } - label_dict[node] = subcomponents['value'] - elif components[0] == 'Service Endpoints': - label_dict['tier'] = components.pop(1) - label_dict['service_endpoint'] = components.pop(1) + result = dal.client.get_metrics(metric_path, application, **options) - if components[1] == 'Individual Nodes': - node = format_label(components.pop(1)) + metrics = [] - label_dict[node] = components.pop(1) - elif components[0] == 'Overall Application Performance': - if len(components) > 2: - label_dict['tier'] = components.pop(1) + for metric_obj in result: + components = metric_obj.path.split('|') - if components[1] == 'Individual Nodes': - node = format_label(components.pop(1)) + if components is not None: + label_dict = OrderedDict() - label_dict[node] = components.pop(1) + if len(pushgateways): + if instance is not None: + label_dict['instance'] = instance - while len(components) > 3: - if components[1] == 'External Calls': - pattern = re.compile(r'^(?P.+)(?= to Discovered) to Discovered backend call - (?P.+)$', re.X) + if application == 'Database Monitoring': + label_dict['collector'] = components.pop(1) - m = pattern.match(components.pop(2)) + while len(components) > 3: + node = format_label(components.pop(1)) - if m is not None: - subcomponents = m.groupdict() + label_dict[node] = components.pop(1) + else: + label_dict['application'] = application - node = format_label(subcomponents['node']) + if components[0] == 'Backends': + pattern = re.compile(r'^(?PDiscovered backend call) - (?P.+)$', re.X) - label_dict[node] = subcomponents['value'] - break - else: - node = format_label(components.pop(1)) + m = pattern.match(components.pop(1)) - label_dict[node] = components.pop(1) - elif components[0] == 'Business Transaction Performance': - if components[1] == 'Business Transaction Groups': - node = format_label(components.pop(1)) + if m is not None: + subcomponents = m.groupdict() - label_dict[node] = components.pop(1) - elif components[1] == 'Business Transactions': - node = format_label(components.pop(1)) + node = format_label(subcomponents['node']) + label_dict[node] = subcomponents['value'] + elif components[0] == 'Service Endpoints': label_dict['tier'] = components.pop(1) - label_dict[node] = components.pop(1) + label_dict['service_endpoint'] = components.pop(1) if components[1] == 'Individual Nodes': node = format_label(components.pop(1)) label_dict[node] = components.pop(1) + elif components[0] == 'Overall Application Performance': + if len(components) > 2: + label_dict['tier'] = components.pop(1) - while len(components) > 3: - if components[1] == 'External Calls': - pattern = re.compile(r'^(?P.+)(?= to Discovered) to Discovered backend call - (?P.+)$', re.X) - - m = pattern.match(components.pop(2)) - - if m is not None: - subcomponents = m.groupdict() - - node = format_label(subcomponents['node']) - - label_dict[node] = subcomponents['value'] - break - else: + if components[1] == 'Individual Nodes': node = format_label(components.pop(1)) label_dict[node] = components.pop(1) - elif components[0] == 'Application Infrastructure Performance': - label_dict['tier'] = components.pop(1) - if components[1] == 'Individual Nodes': - node = format_label(components.pop(1)) + while len(components) > 3: + if components[1] == 'External Calls': + pattern = re.compile(r'^(?P.+)(?= to Discovered) to Discovered backend call - (?P.+)$', re.X) - label_dict[node] = components.pop(1) + m = pattern.match(components.pop(2)) - while len(components) > 3: - node = format_label(components.pop(1)) + if m is not None: + subcomponents = m.groupdict() - label_dict[node] = components.pop(1) - else: - break - - metric_profile = '_'.join([ - format_label(component) for component in components - ]) - - if metric_obj.values: - row = metric_obj.values[0].__dict__ - - metric_dict = OrderedDict([ - (key, value) for key, value in row.items() if key != 'start_time_ms' - ]) - - if not metrics: - metrics = [ - metric for metric, value in metric_dict.items() if is_number(value) - ] + node = format_label(subcomponents['node']) - label_dict.update(OrderedDict([ - (format_label(label), value) for label, value in static_labels if format_label(label) not in label_dict.keys() - ])) - - timestamp = row['start_time_ms'] / 1000 + (int(minutes) * 60) - - json_label_data = json.dumps(label_dict) + label_dict[node] = subcomponents['value'] + break + else: + node = format_label(components.pop(1)) - for metric in metrics: - metric_name = prefix + '_' + metric_profile + '_' + metric.lower() - - if metric_name not in collector_metrics.keys(): - collector_metrics[metric_name] = {} - - if json_label_data not in collector_metrics[metric_name]: - collector_metrics[metric_name][json_label_data] = (int(metric_dict[metric]), timestamp) - - return collector_metrics + label_dict[node] = components.pop(1) + elif components[0] == 'Business Transaction Performance': + if components[1] == 'Business Transaction Groups': + node = format_label(components.pop(1)) + label_dict[node] = components.pop(1) + elif components[1] == 'Business Transactions': + node = format_label(components.pop(1)) -def get_database_metrics(job_name, services, statement, value_columns, static_labels=(), timestamp_column=None, timezones={}): - collector_metrics = {} + label_dict['tier'] = components.pop(1) + label_dict[node] = components.pop(1) - pushgateways = aps.app.config.get('PUSHGATEWAYS') + if components[1] == 'Individual Nodes': + node = format_label(components.pop(1)) - for service_name in services: - prefix, instance = get_metric_info(service_name) + label_dict[node] = components.pop(1) - dal = DatabaseAccessLayer(db) - dal.init_db(service_name) + while len(components) > 3: + if components[1] == 'External Calls': + pattern = re.compile(r'^(?P.+)(?= to Discovered) to Discovered backend call - (?P.+)$', re.X) - result = dal.execute(statement) + m = pattern.match(components.pop(2)) - timestamp = datetime.now(timezone.utc).timestamp() + if m is not None: + subcomponents = m.groupdict() - db_tzinfo = None + node = format_label(subcomponents['node']) - if timestamp_column is not None and service_name in timezones.keys(): - db_tzinfo = pytz.timezone(timezones[service_name]) + label_dict[node] = subcomponents['value'] + break + else: + node = format_label(components.pop(1)) - for row in result: - label_dict = OrderedDict() + label_dict[node] = components.pop(1) + elif components[0] == 'Application Infrastructure Performance': + label_dict['tier'] = components.pop(1) - if len(pushgateways): - if instance is not None: - label_dict['instance'] = instance + if components[1] == 'Individual Nodes': + node = format_label(components.pop(1)) - label_dict.update(OrderedDict([ - (format_label(column), to_string(row[column])) for column in row.keys() if column not in value_columns and column != timestamp_column - ])) + label_dict[node] = components.pop(1) - label_dict.update(OrderedDict([ - (format_label(label), value) for label, value in static_labels if format_label(label) not in label_dict.keys() - ])) + while len(components) > 3: + node = format_label(components.pop(1)) - if db_tzinfo is not None and timestamp_column in row.keys(): - if isinstance(row[timestamp_column], datetime): - if row[timestamp_column].tzinfo is not None and row[timestamp_column].tzinfo.utcoffset(row[timestamp_column]) is not None: - timestamp = row[timestamp_column].timestamp() - else: - timestamp = row[timestamp_column].astimezone(db_tzinfo).timestamp() + label_dict[node] = components.pop(1) + else: + break - json_label_data = json.dumps(label_dict) + metric_profile = '_'.join([ + format_label(component) for component in components + ]) - for column in value_columns: - metric_name = prefix + '_' + column.lower() + if metric_obj.values: + row = metric_obj.values[0].__dict__ - if metric_name not in collector_metrics.keys(): - collector_metrics[metric_name] = {} + metric_dict = OrderedDict([ + (key, value) for key, value in row.items() if key != 'start_time_ms' + ]) - if json_label_data not in collector_metrics[metric_name]: - collector_metrics[metric_name][json_label_data] = (row[column], timestamp) + if not metrics: + metrics = [ + metric for metric, value in metric_dict.items() if is_number(value) + ] - return collector_metrics + label_dict.update(OrderedDict([ + (format_label(label), value) for label, value in static_labels if format_label(label) not in label_dict.keys() + ])) + timestamp = row['start_time_ms'] / 1000 + (int(minutes) * 60) -def get_extrahop_metrics(job_name, services, params, metric, aggregation, minutes, static_labels=()): - collector_metrics = {} + json_label_data = json.dumps(label_dict) - pushgateways = aps.app.config.get('PUSHGATEWAYS') + for metric in metrics: + metric_name = prefix + '_' + metric_profile + '_' + metric.lower() - for service_name in services: - prefix, instance = get_metric_info(service_name) + if metric_name not in collector_metrics.keys(): + collector_metrics[metric_name] = {} - aggregation = test_aggregation_settings(aggregation, job_name) + if json_label_data not in collector_metrics[metric_name]: + collector_metrics[metric_name][json_label_data] = (int(metric_dict[metric]), timestamp) - dal = ExtraHop(aa) - dal.init_aa(service_name) + return collector_metrics - options = {**params, **{ - 'from': '-%sm' % minutes, - 'until': 0 - }} + @staticmethod + def _get_database_metrics(job_name, services, statement, value_columns, static_labels=(), timestamp_column=None, timezones={}): + collector_metrics = {} - result = dal.client.get_metrics(**options) + pushgateways = aps.app.config.get('PUSHGATEWAYS') - timestamp = datetime.now(timezone.utc).timestamp() + for service_name in services: + prefix, instance = get_metric_info(service_name) - if 'stats' in result.keys(): - metric_dict = OrderedDict() + dal = DatabaseAccessLayer(db) + dal.init_db(service_name) - for row in result['stats']: - if row['values']: - if row['values'][0]: - metric_spec_name = row['values'][0][0]['key']['str'] + result = dal.execute(statement) - if metric_spec_name not in metric_dict.keys(): - metric_dict[metric_spec_name] = [] + timestamp = datetime.now(timezone.utc).timestamp() - value = int(row['values'][0][0]['value']) + db_tzinfo = None - if test_aggregation_match(value, aggregation): - metric_dict[metric_spec_name].append(value) + if timestamp_column is not None and service_name in timezones.keys(): + db_tzinfo = pytz.timezone(timezones[service_name]) - for metric_spec_name, values in metric_dict.items(): - label_dict = OrderedDict([ - ('metric_spec_name', metric_spec_name.lower()) - ]) + for row in result: + label_dict = OrderedDict() if len(pushgateways): if instance is not None: label_dict['instance'] = instance + label_dict.update(OrderedDict([ + (format_label(column), to_string(row[column])) for column in row.keys() if column not in value_columns and column != timestamp_column + ])) + label_dict.update(OrderedDict([ (format_label(label), value) for label, value in static_labels if format_label(label) not in label_dict.keys() ])) + if db_tzinfo is not None and timestamp_column in row.keys(): + if isinstance(row[timestamp_column], datetime): + if row[timestamp_column].tzinfo is not None and row[timestamp_column].tzinfo.utcoffset(row[timestamp_column]) is not None: + timestamp = row[timestamp_column].timestamp() + else: + timestamp = row[timestamp_column].astimezone(db_tzinfo).timestamp() + json_label_data = json.dumps(label_dict) - for func in aggregation['funcs']: - metric_name = prefix + '_' + metric.lower() + '_' + func.lower() + for column in value_columns: + metric_name = prefix + '_' + column.lower() if metric_name not in collector_metrics.keys(): collector_metrics[metric_name] = {} if json_label_data not in collector_metrics[metric_name]: - collector_metrics[metric_name][json_label_data] = (aggregate_values_by_func(func, values), timestamp) + collector_metrics[metric_name][json_label_data] = (row[column], timestamp) - return collector_metrics + return collector_metrics + @staticmethod + def _get_extrahop_metrics(job_name, services, params, metric, aggregation, minutes, static_labels=()): + collector_metrics = {} -def get_metric_info(service): - service_name_pattern = re.compile(r'\[(?P[^\[\]]+)\]', re.X) + pushgateways = aps.app.config.get('PUSHGATEWAYS') - prefix = re.sub(service_name_pattern, '', service) + for service_name in services: + prefix, instance = get_metric_info(service_name) - instance = None + aggregation = test_aggregation_settings(aggregation, job_name) - m = service_name_pattern.search(service) + dal = ExtraHop(aa) + dal.init_aa(service_name) - if m is not None: - components = m.groupdict() + options = {**params, **{ + 'from': '-%sm' % minutes, + 'until': 0 + }} - instance = components['instance'] + result = dal.client.get_metrics(**options) - return prefix.lower(), instance + timestamp = datetime.now(timezone.utc).timestamp() + if 'stats' in result.keys(): + metric_dict = OrderedDict() -def is_number(n): - try: - float(n) - except ValueError: - return False + for row in result['stats']: + if row['values']: + if row['values'][0]: + metric_spec_name = row['values'][0][0]['key']['str'] - return True + if metric_spec_name not in metric_dict.keys(): + metric_dict[metric_spec_name] = [] + value = int(row['values'][0][0]['value']) -def test_aggregation_match(value, aggregation): - if 'threshold' in aggregation.keys(): - return eval('%d %s %d' % (value, aggregation['threshold']['operator'], int(aggregation['threshold']['value']))) + if test_aggregation_match(value, aggregation): + metric_dict[metric_spec_name].append(value) - return True + for metric_spec_name, values in metric_dict.items(): + label_dict = OrderedDict([ + ('metric_spec_name', metric_spec_name.lower()) + ]) + if len(pushgateways): + if instance is not None: + label_dict['instance'] = instance -def test_aggregation_settings(aggregation, job_name): - if 'funcs' in aggregation.keys(): - valid_funcs = [ - 'avg', - 'count', - 'min', - 'max', - 'sum' - ] + label_dict.update(OrderedDict([ + (format_label(label), value) for label, value in static_labels if format_label(label) not in label_dict.keys() + ])) - for func in aggregation['funcs']: - if func not in valid_funcs: - if is_number(func): - if int(func) not in range(0, 101): - raise ValueError("Invalid aggregation percentile '" + func + "' in job '" + job_name + "'.") - else: - raise ValueError("Unsupported aggregation function '" + func + "' in job '" + job_name + "'.") - else: - aggregation['funcs'] = [ - 'count' - ] + json_label_data = json.dumps(label_dict) - if 'threshold' in aggregation.keys(): - if 'operator' in aggregation['threshold'].keys(): - valid_operator = [ - '>', - '<', - '>=', - '<=', - '=', - '<>', - '!=' - ] + for func in aggregation['funcs']: + metric_name = prefix + '_' + metric.lower() + '_' + func.lower() - if aggregation['threshold']['operator'] not in valid_operator: - raise ValueError("Unsupported aggregation threshold operator '" + aggregation['threshold']['operator'] + "' in job '" + job_name + "'.") - else: - raise ValueError("No operator specified for aggregation threshold in job '" + job_name + "'.") + if metric_name not in collector_metrics.keys(): + collector_metrics[metric_name] = {} - if 'value' in aggregation['threshold'].keys(): - if is_number(aggregation['threshold']['value']): - aggregation['threshold']['value'] = aggregation['threshold']['value'] - else: - raise ValueError("Invalid value '" + aggregation['threshold']['value'] + "' specified for aggregation threshold in job '" + job_name + "'.") - else: - raise ValueError("No value specified for aggregation threshold in job '" + job_name + "'.") + if json_label_data not in collector_metrics[metric_name]: + collector_metrics[metric_name][json_label_data] = (aggregate_values_by_func(func, values), timestamp) - return aggregation + return collector_metrics + @staticmethod + def generate_metrics(*args): + if len(args) >= 2: + category = args[0] + job_name = args[1] -def to_string(n): - return str(n) if n is not None else '' + func_name = '_get_' + category + '_metrics' + if hasattr(Metrics, func_name): + with aps.app.app_context(): + try: + aps.app.logger.info("Job '" + job_name + "' started.") + + func = getattr(Metrics, func_name) + collector_metrics = func(*args[1:]) + + set_job_collector_metrics(job_name, collector_metrics) + + aps.app.logger.info("Job '" + job_name + "' completed.") + except Exception as e: + aps.app.logger.warning("Job '" + job_name + "' failed. " + type(e).__name__ + ": " + str(e)) + + if aps.app.config.get('SUSPEND_JOB_ON_FAILURE'): + aps.pause_job(job_name) + + aps.app.logger.warning("Job '" + job_name + "' suspended.") + + if job_name in registered_collectors.keys(): + unregister_collector(job_name, registered_collectors[job_name]) + + del registered_collectors[job_name] -class Metrics: @staticmethod def read_prometheus_metrics(job_name): registry = get_registry(job_name) diff --git a/metREx/app/main/util/apscheduler_helper.py b/metREx/app/main/util/apscheduler_helper.py index ba63747..8da57ef 100644 --- a/metREx/app/main/util/apscheduler_helper.py +++ b/metREx/app/main/util/apscheduler_helper.py @@ -30,7 +30,7 @@ def apply_job_templates(jobs, templates): def build_job(category, name, bind, seconds, *args): return { - 'func': job_func_root + '.service.metrics_service:generate_metrics', + 'func': job_func_root + '.service.metrics_service:Metrics.generate_metrics', 'trigger': 'interval', 'args': (category, name, bind) + args, 'id': name, diff --git a/metREx/app/main/util/sqlalchemy_helper.py b/metREx/app/main/util/sqlalchemy_helper.py index d9b2fdb..7bcd927 100644 --- a/metREx/app/main/util/sqlalchemy_helper.py +++ b/metREx/app/main/util/sqlalchemy_helper.py @@ -1,10 +1,9 @@ import json +import pytz import re from urllib import parse -import pytz - from cryptofy import encoding, decrypt from .misc_helper import str_to_bool diff --git a/pip/bigquery-requirements.txt b/pip/bigquery-requirements.txt index 924fa04..7f3d6de 100644 --- a/pip/bigquery-requirements.txt +++ b/pip/bigquery-requirements.txt @@ -1 +1,2 @@ +google-cloud-bigquery-storage[pyarrow] pybigquery \ No newline at end of file diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py index 5b74a30..ef8dc5b 100644 --- a/tests/unit/test_database.py +++ b/tests/unit/test_database.py @@ -7,7 +7,7 @@ from faker import Faker from ..base import BaseTestCase -from metREx.app.main.service.metrics_service import db, get_database_metrics, get_metric_info +from metREx.app.main.service.metrics_service import db, Metrics, get_metric_info class Metric(db.Model): @@ -43,7 +43,7 @@ def test_get_database_metrics(self): self.assertIsInstance(value_columns, list) if category == 'database': - database_metrics = get_database_metrics(*job['args'][1:]) + database_metrics = Metrics._get_database_metrics(*job['args'][1:]) self.assertIsInstance(database_metrics, dict)