From 512696309f254a03310a6246579d89bc5b6e39d7 Mon Sep 17 00:00:00 2001 From: Ryan Goltry Date: Mon, 3 Oct 2022 09:59:26 -0400 Subject: [PATCH] Msg filter (#1144) Message filter support for nebula_osquery --- .gitignore | 4 + .pre-commit-config.yaml | 2 +- hubblestack/config.py | 4 +- hubblestack/daemon.py | 795 +++++++++++------- hubblestack/files/filter_chain.yaml | 9 + hubblestack/filter/__init__.py | 0 hubblestack/filter/base.py | 19 + hubblestack/filter/filter_chain.py | 96 +++ hubblestack/filter/hubble_version.py | 24 + hubblestack/filter/seq_id.py | 42 + hubblestack/filter/static.py | 27 + hubblestack/loader.py | 13 +- hubblestack/returners/splunk_nebula_return.py | 161 ++-- .../returners/splunk_osqueryd_return.py | 5 +- pkg/hubble.service | 6 + pytest.ini | 2 - requirements.txt | 4 +- test-requirements.txt | 2 +- tests/unittests/resources/filter_chain.yaml | 6 + .../resources/filter_chain_ce_01.yaml | 4 + .../resources/filter_chain_ce_02.yaml | 2 + .../resources/filter_chain_load.yaml | 5 + tests/unittests/test_filter_chain.py | 89 ++ tests/unittests/test_nebula_osquery.py | 7 + tests/unittests/test_pulsar.py | 13 + 25 files changed, 956 insertions(+), 385 deletions(-) create mode 100644 hubblestack/files/filter_chain.yaml create mode 100644 hubblestack/filter/__init__.py create mode 100644 hubblestack/filter/base.py create mode 100644 hubblestack/filter/filter_chain.py create mode 100644 hubblestack/filter/hubble_version.py create mode 100644 hubblestack/filter/seq_id.py create mode 100644 hubblestack/filter/static.py create mode 100644 tests/unittests/resources/filter_chain.yaml create mode 100644 tests/unittests/resources/filter_chain_ce_01.yaml create mode 100644 tests/unittests/resources/filter_chain_ce_02.yaml create mode 100644 tests/unittests/resources/filter_chain_load.yaml create mode 100644 tests/unittests/test_filter_chain.py diff --git a/.gitignore b/.gitignore index 73591e54e..aa601f079 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,10 @@ var/ .installed.cfg *.egg +# vim +*.swp +*.swo + # PyInstaller # Usually these files are written by a python script from a template # before PyInstaller builds the exe, so as to inject date/other infos into it. diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c9c3ee598..20ffd5009 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -10,7 +10,7 @@ repos: - id: check-merge-conflict - id: check-ast - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 22.6.0 hooks: - id: black args: ['--line-length', '119'] diff --git a/hubblestack/config.py b/hubblestack/config.py index 541b4031d..a04b5d76a 100644 --- a/hubblestack/config.py +++ b/hubblestack/config.py @@ -1145,7 +1145,7 @@ def _validate_file_roots(file_roots): """ if not isinstance(file_roots, dict): log.warning( - "The file_roots parameter is not properly formatted," " using defaults" + "The file_roots parameter is not properly formatted, using defaults" ) return {"base": _expand_glob_path([hubblestack.syspaths.BASE_FILE_ROOTS_DIR])} return _normalize_roots(file_roots) @@ -1323,7 +1323,7 @@ def _absolute_path(path, relative_to=None): _abspath = os.path.join(relative_to, path) if os.path.isfile(_abspath): log.debug( - "Relative path '%s' converted to existing absolute path " "'%s'", + "Relative path '%s' converted to existing absolute path '%s'", path, _abspath, ) diff --git a/hubblestack/daemon.py b/hubblestack/daemon.py index 3d627152e..5b71efd38 100644 --- a/hubblestack/daemon.py +++ b/hubblestack/daemon.py @@ -3,7 +3,6 @@ Main entry point for the hubble daemon """ -# import lockfile import argparse import copy import json @@ -33,6 +32,8 @@ import hubblestack.loader import hubblestack.utils.signing +import hubblestack.filter +import hubblestack.filter.filter_chain import hubblestack.log import hubblestack.log.splunk import hubblestack.hec.opt @@ -47,7 +48,7 @@ import hubblestack.module_runner.fdg_runner log = logging.getLogger(__name__) -HSS = hubblestack.status.HubbleStatus(__name__, 'schedule', 'refresh_grains') +HSS = hubblestack.status.HubbleStatus(__name__, "schedule", "refresh_grains") # Importing syslog fails on windows if not hubblestack.utils.platform.is_windows(): @@ -57,6 +58,7 @@ # This should work fine until we go to multiprocessing SESSION_UUID = str(uuid.uuid4()) + def run(): """ Set up program, daemonize if needed @@ -64,11 +66,11 @@ def run(): try: load_config() except Exception as exc: - print('An Error occurred while loading the config: %s', exc) + print("An Error occurred while loading the config: %s", exc) raise # Create cache directory if not present - if not os.path.isdir(__opts__['cachedir']): - os.makedirs(__opts__['cachedir']) + if not os.path.isdir(__opts__["cachedir"]): + os.makedirs(__opts__["cachedir"]) try: main() except KeyboardInterrupt: @@ -78,56 +80,75 @@ def run(): def _clear_gitfs_locks(): - """ Clear old locks and log the changes """ + """Clear old locks and log the changes""" # Clear old locks - if 'gitfs' in __opts__['fileserver_backend'] or 'git' in __opts__['fileserver_backend']: + if ( + "gitfs" in __opts__["fileserver_backend"] + or "git" in __opts__["fileserver_backend"] + ): git_objects = [ hubblestack.utils.gitfs.GitFS( __opts__, - __opts__['gitfs_remotes'], + __opts__["gitfs_remotes"], per_remote_overrides=hubblestack.fileserver.gitfs.PER_REMOTE_OVERRIDES, - per_remote_only=hubblestack.fileserver.gitfs.PER_REMOTE_ONLY)] + per_remote_only=hubblestack.fileserver.gitfs.PER_REMOTE_ONLY, + ) + ] ret = {} for obj in git_objects: - lock_type = 'update' - cleared, errors = hubblestack.fileserver.clear_lock(obj.clear_lock, 'gitfs', remote=None, - lock_type=lock_type) + lock_type = "update" + cleared, errors = hubblestack.fileserver.clear_lock( + obj.clear_lock, "gitfs", remote=None, lock_type=lock_type + ) if cleared: - ret.setdefault('cleared', []).extend(cleared) + ret.setdefault("cleared", []).extend(cleared) if errors: - ret.setdefault('errors', []).extend(errors) + ret.setdefault("errors", []).extend(errors) if ret: - log.info('One or more gitfs locks were removed: %s', ret) + log.info("One or more gitfs locks were removed: %s", ret) def _emit_and_refresh_grains(): - """ When the grains refresh frequency has expired, refresh grains and emit to syslog """ - log.info('Refreshing grains') + """When the grains refresh frequency has expired, refresh grains and emit to syslog""" + log.info("Refreshing grains") refresh_grains() last_grains_refresh = time.time() # Emit syslog at grains refresh frequency - if not (hubblestack.utils.platform.is_windows()) and \ - __opts__.get('emit_grains_to_syslog', True): - default_grains_to_emit = ['system_uuid', 'hubble_uuid', 'session_uuid', - 'machine_id', 'splunkindex', 'cloud_details', - 'hubble_version', 'localhost', 'fqdn'] + if not (hubblestack.utils.platform.is_windows()) and __opts__.get( + "emit_grains_to_syslog", True + ): + default_grains_to_emit = [ + "system_uuid", + "hubble_uuid", + "session_uuid", + "machine_id", + "splunkindex", + "cloud_details", + "hubble_version", + "localhost", + "fqdn", + ] grains_to_emit = [] grains_to_emit.extend( - __opts__.get('emit_grains_to_syslog_list', default_grains_to_emit)) + __opts__.get("emit_grains_to_syslog_list", default_grains_to_emit) + ) emit_to_syslog(grains_to_emit) return last_grains_refresh def _update_fileserver(file_client): - """ Update the filserver and the last_fc_update time """ + """Update the filserver and the last_fc_update time""" try: file_client.channel.fs.update() last_fc_update = time.time() except Exception: - retry = __opts__.get('fileserver_retry_rate', 900) + retry = __opts__.get("fileserver_retry_rate", 900) last_fc_update += retry - log.exception('Exception thrown trying to update fileclient. ' - 'Trying again in %s seconds.', retry) + log.exception( + "Exception thrown trying to update fileclient. " + "Trying again in %s seconds.", + retry, + ) return last_fc_update @@ -138,9 +159,9 @@ def main(): # Initial fileclient setup _clear_gitfs_locks() # Setup fileclient - log.info('Setting up the fileclient/fileserver') - retry_count = __opts__.get('fileserver_retry_count_on_startup', None) - retry_time = __opts__.get('fileserver_retry_rate_on_startup', 30) + log.info("Setting up the fileclient/fileserver") + retry_count = __opts__.get("fileserver_retry_count_on_startup", None) + retry_time = __opts__.get("fileserver_retry_rate_on_startup", 30) count = 0 while True: try: @@ -149,42 +170,47 @@ def main(): last_fc_update = time.time() break except Exception: - if (retry_count is None or count < retry_count) and not __opts__['function']: - log.exception('Exception thrown trying to setup fileclient. ' - 'Trying again in %s seconds.', retry_time) + if (retry_count is None or count < retry_count) and not __opts__[ + "function" + ]: + log.exception( + "Exception thrown trying to setup fileclient. " + "Trying again in %s seconds.", + retry_time, + ) count += 1 time.sleep(retry_time) continue else: - log.exception('Exception thrown trying to setup fileclient. Exiting.') + log.exception("Exception thrown trying to setup fileclient. Exiting.") sys.exit(1) # Check for single function run - if __opts__['function']: + if __opts__["function"]: run_function() sys.exit(0) - last_grains_refresh = time.time() - __opts__['grains_refresh_frequency'] - log.info('Starting main loop') + last_grains_refresh = time.time() - __opts__["grains_refresh_frequency"] + log.info("Starting main loop") pidfile_count = 0 # pidfile_refresh in seconds, our scheduler deals in half-seconds - pidfile_refresh = int(__opts__.get('pidfile_refresh', 60)) * 2 + pidfile_refresh = int(__opts__.get("pidfile_refresh", 60)) * 2 while True: # Check if fileserver needs update - if time.time() - last_fc_update >= __opts__['fileserver_update_frequency']: + if time.time() - last_fc_update >= __opts__["fileserver_update_frequency"]: last_fc_update = _update_fileserver(file_client) pidfile_count += 1 - if __opts__['daemonize'] and pidfile_count > pidfile_refresh: + if __opts__["daemonize"] and pidfile_count > pidfile_refresh: pidfile_count = 0 create_pidfile() - if time.time() - last_grains_refresh >= __opts__['grains_refresh_frequency']: + if time.time() - last_grains_refresh >= __opts__["grains_refresh_frequency"]: last_grains_refresh = _emit_and_refresh_grains() try: - log.debug('Executing schedule') + log.debug("Executing schedule") sf_count = schedule() except Exception as exc: - log.exception('Error executing schedule: %s', exc) + log.exception("Error executing schedule: %s", exc) if isinstance(exc, KeyboardInterrupt): raise exc - time.sleep(__opts__.get('scheduler_sleep_frequency', 0.5)) + time.sleep(__opts__.get("scheduler_sleep_frequency", 0.5)) def getsecondsbycronexpression(base, cron_exp): @@ -221,11 +247,15 @@ def getlastrunbybuckets(buckets, seconds): """ buckets = int(buckets) if int(buckets) != 0 else 256 host_ip = socket.gethostbyname(socket.gethostname()) - ips = host_ip.split('.') - bucket_sum = (int(ips[0]) * 256 * 256 * 256) + (int(ips[1]) * 256 * 256) + \ - (int(ips[2]) * 256) + int(ips[3]) + ips = host_ip.split(".") + bucket_sum = ( + (int(ips[0]) * 256 * 256 * 256) + + (int(ips[1]) * 256 * 256) + + (int(ips[2]) * 256) + + int(ips[3]) + ) bucket = bucket_sum % buckets - log.debug('bucket number is %d out of %d', bucket, buckets) + log.debug("bucket number is %d out of %d", bucket, buckets) current_time = time.time() base_time = seconds * (math.floor(current_time / seconds)) splay = seconds / buckets @@ -305,38 +335,54 @@ def schedule(): """ sf_count = 0 base = datetime(2018, 1, 1, 0, 0) - schedule_config = __opts__.get('schedule', {}) - if 'user_schedule' in __opts__ and isinstance(__opts__['user_schedule'], dict): - schedule_config.update(__opts__['user_schedule']) + schedule_config = __opts__.get("schedule", {}) + if "user_schedule" in __opts__ and isinstance(__opts__["user_schedule"], dict): + schedule_config.update(__opts__["user_schedule"]) for jobname, jobdata in schedule_config.items(): try: # Error handling galore if not jobdata or not isinstance(jobdata, dict): - log.error('Scheduled job %s does not have valid data', jobname) + log.error("Scheduled job %s does not have valid data", jobname) continue - if 'function' not in jobdata or 'seconds' not in jobdata: - log.error('Scheduled job %s is missing a ``function`` or ``seconds`` argument', jobname) + if "function" not in jobdata or "seconds" not in jobdata: + log.error( + "Scheduled job %s is missing a ``function`` or ``seconds`` argument", + jobname, + ) continue - func = jobdata['function'] + func = jobdata["function"] if func not in __mods__: - log.error('Scheduled job %s has a function %s which could not be found.', jobname, func) + log.error( + "Scheduled job %s has a function %s which could not be found.", + jobname, + func, + ) continue try: - if 'cron' in jobdata: - seconds = getsecondsbycronexpression(base, jobdata['cron']) + if "cron" in jobdata: + seconds = getsecondsbycronexpression(base, jobdata["cron"]) else: - seconds = int(jobdata['seconds']) - splay = int(jobdata.get('splay', 0)) - min_splay = int(jobdata.get('min_splay', 0)) + seconds = int(jobdata["seconds"]) + splay = int(jobdata.get("splay", 0)) + min_splay = int(jobdata.get("min_splay", 0)) except ValueError: - log.error('Scheduled job %s has an invalid value for seconds or splay.', jobname) - args = jobdata.get('args', []) + log.error( + "Scheduled job %s has an invalid value for seconds or splay.", + jobname, + ) + args = jobdata.get("args", []) if not isinstance(args, list): - log.error('Scheduled job %s has args not formed as a list: %s', jobname, args) - kwargs = jobdata.get('kwargs', {}) + log.error( + "Scheduled job %s has args not formed as a list: %s", jobname, args + ) + kwargs = jobdata.get("kwargs", {}) if not isinstance(kwargs, dict): - log.error('Scheduled job %s has kwargs not formed as a dict: %s', jobname, kwargs) - returners = jobdata.get('returner', []) + log.error( + "Scheduled job %s has kwargs not formed as a dict: %s", + jobname, + kwargs, + ) + returners = jobdata.get("returner", []) if not isinstance(returners, list): returners = [returners] # Actually process the job @@ -345,59 +391,70 @@ def schedule(): _execute_function(jobdata, func, returners, args, kwargs) sf_count += 1 except: - log.error("Exception in running job: %s; continuing with next job...", jobname, exc_info=True) + log.error( + "Exception in running job: %s; continuing with next job...", + jobname, + exc_info=True, + ) return sf_count def _execute_function(jobdata, func, returners, args, kwargs): - """ Run the scheduled function """ - log.debug('Executing scheduled function %s', func) - jobdata['last_run'] = time.time() + """Run the scheduled function""" + log.debug("Executing scheduled function %s", func) + jobdata["last_run"] = time.time() + + # Actually run the function ret = __mods__[func](*args, **kwargs) - if __opts__['log_level'] == 'debug': - log.debug('Job returned:\n%s', ret) + + if __opts__["log_level"] == "debug": + log.debug("Job returned:\n%s", ret) for returner in returners: - returner = '{0}.returner'.format(returner) + returner = "{0}.returner".format(returner) if returner not in __returners__: - log.error('Could not find %s returner.', returner) + log.error("Could not find %s returner.", returner) continue - log.debug('Returning job data to %s', returner) - returner_ret = {'id': __grains__['id'], - 'jid': hubblestack.utils.jid.gen_jid(__opts__), - 'fun': func, - 'fun_args': args + ([kwargs] if kwargs else []), - 'return': ret} + log.debug("Returning job data to %s", returner) + returner_ret = { + "id": __grains__["id"], + "jid": hubblestack.utils.jid.gen_jid(__opts__), + "fun": func, + "fun_args": args + ([kwargs] if kwargs else []), + "return": ret, + } __returners__[returner](returner_ret) def _process_job(jobdata, splay, seconds, min_splay, base): run = False - if 'last_run' not in jobdata: - if jobdata.get('run_on_start', False): + if "last_run" not in jobdata: + if jobdata.get("run_on_start", False): if splay: # Run `splay` seconds in the future, by telling the scheduler we last ran it # `seconds - splay` seconds ago. - jobdata['last_run'] = time.time() - (seconds - random.randint(min_splay, splay)) + jobdata["last_run"] = time.time() - ( + seconds - random.randint(min_splay, splay) + ) else: # Run now run = True - jobdata['last_run'] = time.time() + jobdata["last_run"] = time.time() else: if splay: # Run `seconds + splay` seconds in the future by telling the scheduler we last # ran it at now + `splay` seconds. - jobdata['last_run'] = time.time() + random.randint(min_splay, splay) - elif 'buckets' in jobdata: + jobdata["last_run"] = time.time() + random.randint(min_splay, splay) + elif "buckets" in jobdata: # Place the host in a bucket and fix the execution time. - jobdata['last_run'] = getlastrunbybuckets(jobdata['buckets'], seconds) - log.debug('last_run according to bucket is %s', jobdata['last_run']) - elif 'cron' in jobdata: + jobdata["last_run"] = getlastrunbybuckets(jobdata["buckets"], seconds) + log.debug("last_run according to bucket is %s", jobdata["last_run"]) + elif "cron" in jobdata: # execute the hubble process based on cron expression - jobdata['last_run'] = getlastrunbycron(base, seconds) + jobdata["last_run"] = getlastrunbycron(base, seconds) else: # Run in `seconds` seconds. - jobdata['last_run'] = time.time() - if jobdata['last_run'] < time.time() - seconds: + jobdata["last_run"] = time.time() + if jobdata["last_run"] < time.time() - seconds: run = True return run @@ -410,37 +467,39 @@ def run_function(): # Parse the args args = [] kwargs = {} - for arg in __opts__['args']: - if '=' in arg: - kwarg, _, value = arg.partition('=') + for arg in __opts__["args"]: + if "=" in arg: + kwarg, _, value = arg.partition("=") kwargs[kwarg] = value else: args.append(arg) - log.debug('Parsed args: %s | Parsed kwargs: %s', args, kwargs) - log.info('Executing user-requested function %s', __opts__['function']) + log.debug("Parsed args: %s | Parsed kwargs: %s", args, kwargs) + log.info("Executing user-requested function %s", __opts__["function"]) - mod_fun = __mods__.get(__opts__['function']) + mod_fun = __mods__.get(__opts__["function"]) if not mod_fun or not callable(mod_fun): - log.error('Function %s is not available, or not valid.', __opts__['function']) + log.error("Function %s is not available, or not valid.", __opts__["function"]) sys.exit(1) ret = mod_fun(*args, **kwargs) - if __opts__['return']: - returner = '{0}.returner'.format(__opts__['return']) + if __opts__["return"]: + returner = "{0}.returner".format(__opts__["return"]) if returner not in __returners__: - log.error('Could not find %s returner.', returner) + log.error("Could not find %s returner.", returner) else: - log.info('Returning job data to %s', returner) - returner_ret = {'id': __grains__['id'], - 'jid': hubblestack.utils.jid.gen_jid(__opts__), - 'fun': __opts__['function'], - 'fun_args': args + ([kwargs] if kwargs else []), - 'return': ret} + log.info("Returning job data to %s", returner) + returner_ret = { + "id": __grains__["id"], + "jid": hubblestack.utils.jid.gen_jid(__opts__), + "fun": __opts__["function"], + "fun_args": args + ([kwargs] if kwargs else []), + "return": ret, + } __returners__[returner](returner_ret) # TODO instantiate the salt outputter system? - if __opts__['json_print']: + if __opts__["json_print"]: print(json.dumps(ret)) else: - if not __opts__['no_pprint']: + if not __opts__["no_pprint"]: pprint.pprint(ret) else: print(ret) @@ -458,49 +517,54 @@ def load_config(args=None): # NOTE: if configfile isn't specified and None is passed to hubblestack.config.get_config # it will default to a platform specific file (see get_config() and DEFAULT_OPTS in hs.config) - __opts__ = hubblestack.config.get_config(parsed_args.get('configfile')) - + __opts__ = hubblestack.config.get_config(parsed_args.get("configfile")) # Loading default included config options and updating them in the main __opts__ default_include_config_options = hubblestack.config.include_config( - __opts__.get('default_include'), __opts__.get('conf_file'), verbose=False + __opts__.get("default_include"), __opts__.get("conf_file"), verbose=False ) __opts__.update(default_include_config_options) # we seem to have mixed feelings about whether to use __opts__ or parsed_args and mixed feelings # about whether it's spelled 'configfile' or 'conf_file'; so we just make them all work - __opts__['configfile'] = parsed_args['configfile'] = __opts__['conf_file'] + __opts__["configfile"] = parsed_args["configfile"] = __opts__["conf_file"] __opts__.update(parsed_args) - __opts__['install_dir'] = hubblestack.syspaths.INSTALL_DIR - __opts__['extension_modules'] = os.path.join(hubblestack.syspaths.CACHE_DIR, 'extmods') + __opts__["install_dir"] = hubblestack.syspaths.INSTALL_DIR + __opts__["extension_modules"] = os.path.join( + hubblestack.syspaths.CACHE_DIR, "extmods" + ) - if __opts__['version']: + if __opts__["version"]: print(__version__) clean_up_process(None, None) sys.exit(0) - if __opts__['buildinfo']: + if __opts__["buildinfo"]: try: from hubblestack import __buildinfo__ except ImportError: - __buildinfo__ = 'NOT SET' + __buildinfo__ = "NOT SET" print(__buildinfo__) clean_up_process(None, None) sys.exit(0) - scan_proc = __opts__.get('scan_proc', False) - if __opts__['daemonize']: + scan_proc = __opts__.get("scan_proc", False) + if __opts__["daemonize"]: # before becoming a daemon, check for other procs and possibly send # them a signal 15 (otherwise refuse to run) - if not __opts__.get('ignore_running', False): + if not __opts__.get("ignore_running", False): check_pidfile(kill_other=True, scan_proc=scan_proc) hubblestack.utils.daemonize() create_pidfile() - elif not __opts__['function'] and not __opts__['version'] and not __opts__['buildinfo']: + elif ( + not __opts__["function"] + and not __opts__["version"] + and not __opts__["buildinfo"] + ): # check the pidfile and possibly refuse to run (assuming this isn't a single function call) - if not __opts__.get('ignore_running', False): + if not __opts__.get("ignore_running", False): check_pidfile(kill_other=False, scan_proc=scan_proc) # Optional sleep to wait for network - time.sleep(int(__opts__.get('startup_sleep', 0))) + time.sleep(int(__opts__.get("startup_sleep", 0))) _setup_signaling() # setup dirs for grains/returner/module _setup_dirs() @@ -508,12 +572,12 @@ def load_config(args=None): _setup_logging(parsed_args) _setup_cached_uuid() refresh_grains(initial=True) - if __mods__['config.get']('splunklogging', False): + if __mods__["config.get"]("splunklogging", False): hubblestack.log.setup_splunk_logger() - hubblestack.log.emit_to_splunk(__grains__, 'INFO', 'hubblestack.grains_report') - __mods__['conf_publisher.publish']() + hubblestack.log.emit_to_splunk(__grains__, "INFO", "hubblestack.grains_report") + __mods__["conf_publisher.publish"]() - return __opts__ # this is also a global, but the return is handy in tests/unittests + return __opts__ # this is also a global, but the return is handy in tests/unittests def _setup_signaling(): @@ -530,42 +594,75 @@ def _setup_signaling(): signal.signal(signal.SIGHUP, clean_up_process) signal.signal(signal.SIGQUIT, clean_up_process) + def _disable_boto_modules(): - """ Disable the unneeded boto modules because they cause issues with the loader """ + """Disable the unneeded boto modules because they cause issues with the loader""" # Disable all of salt's boto modules, they give nothing but trouble to the loader - disable_modules = __opts__.get('disable_modules', []) - disable_modules.extend(['boto3_elasticache', 'boto3_route53', 'boto3_sns', 'boto_apigateway', - 'boto_asg', 'boto_cfn', 'boto_cloudfront', 'boto_cloudtrail', - 'boto_cloudwatch_event', 'boto_cloudwatch', 'boto_cognitoidentity', - 'boto_datapipeline', 'boto_dynamodb', 'boto_ec2', 'boto_efs', - 'boto_elasticache', 'boto_elasticsearch_domain', 'boto_elb', - 'boto_elbv2', 'boto_iam', 'boto_iot', 'boto_kinesis', 'boto_kms', - 'boto_lambda', 'boto_rds', 'boto_route53', 'boto_s3_bucket', 'boto_s3', - 'boto_secgroup', 'boto_sns', 'boto_sqs', 'boto_ssm', 'boto_vpc', ]) - __opts__['disable_modules'] = disable_modules + disable_modules = __opts__.get("disable_modules", []) + disable_modules.extend( + [ + "boto3_elasticache", + "boto3_route53", + "boto3_sns", + "boto_apigateway", + "boto_asg", + "boto_cfn", + "boto_cloudfront", + "boto_cloudtrail", + "boto_cloudwatch_event", + "boto_cloudwatch", + "boto_cognitoidentity", + "boto_datapipeline", + "boto_dynamodb", + "boto_ec2", + "boto_efs", + "boto_elasticache", + "boto_elasticsearch_domain", + "boto_elb", + "boto_elbv2", + "boto_iam", + "boto_iot", + "boto_kinesis", + "boto_kms", + "boto_lambda", + "boto_rds", + "boto_route53", + "boto_s3_bucket", + "boto_s3", + "boto_secgroup", + "boto_sns", + "boto_sqs", + "boto_ssm", + "boto_vpc", + ] + ) + __opts__["disable_modules"] = disable_modules def _setup_cached_uuid(): - """ Get the cached uuid and cached system uui path, read the files - and remove the cached uuid """ + """Get the cached uuid and cached system uui path, read the files + and remove the cached uuid""" # Check for a cloned system with existing hubble_uuid def _get_uuid_from_system(): query = '"SELECT uuid AS system_uuid FROM osquery_info;" --header=false --csv' # Prefer our /opt/osquery/osqueryi if present - osqueryipaths = ('/opt/osquery/osqueryi', 'osqueryi', '/usr/bin/osqueryi') + osqueryipaths = ("/opt/osquery/osqueryi", "osqueryi", "/usr/bin/osqueryi") for path in osqueryipaths: if hubblestack.utils.path.which(path): - live_uuid = hubblestack.modules.cmdmod.run_stdout('{0} {1}'.format(path, query), - output_loglevel='quiet') + live_uuid = hubblestack.modules.cmdmod.run_stdout( + "{0} {1}".format(path, query), output_loglevel="quiet" + ) live_uuid = str(live_uuid).upper() if len(live_uuid) == 36: return live_uuid return None # If osquery isn't available, attempt to get uuid from /sys path (linux only) try: - with open('/sys/devices/virtual/dmi/id/product_uuid', 'r') as product_uuid_file: + with open( + "/sys/devices/virtual/dmi/id/product_uuid", "r" + ) as product_uuid_file: file_uuid = product_uuid_file.read() file_uuid = str(file_uuid).upper() if len(file_uuid) == 36: @@ -574,25 +671,33 @@ def _get_uuid_from_system(): except Exception: return None - cached_uuid_path = os.path.join(os.path.dirname(__opts__['configfile']), 'hubble_cached_uuid') - cached_system_uuid_path = os.path.join(os.path.dirname(__opts__['configfile']), - 'hubble_cached_system_uuid') + cached_uuid_path = os.path.join( + os.path.dirname(__opts__["configfile"]), "hubble_cached_uuid" + ) + cached_system_uuid_path = os.path.join( + os.path.dirname(__opts__["configfile"]), "hubble_cached_system_uuid" + ) try: if os.path.isfile(cached_uuid_path) and os.path.isfile(cached_system_uuid_path): - with open(cached_uuid_path, 'r') as cached_uuid_file, \ - open(cached_system_uuid_path, 'r') as cached_system_uuid_file: + with open(cached_uuid_path, "r") as cached_uuid_file, open( + cached_system_uuid_path, "r" + ) as cached_system_uuid_file: cached_uuid = cached_uuid_file.read() cached_system_uuid = cached_system_uuid_file.read() if cached_uuid != cached_system_uuid: live_uuid = _get_uuid_from_system() if live_uuid != cached_system_uuid: - log.error("potentially cloned system detected: System_uuid grain " - "previously saved on disk doesn't match live system value.\n" - "Resettig cached hubble_uuid value.") + log.error( + "potentially cloned system detected: System_uuid grain " + "previously saved on disk doesn't match live system value.\n" + "Resettig cached hubble_uuid value." + ) os.remove(cached_uuid_path) except Exception: - log.exception("Problem opening cache files while checking for previously cloned system") + log.exception( + "Problem opening cache files while checking for previously cloned system" + ) def _setup_logging(parsed_args): @@ -600,46 +705,50 @@ def _setup_logging(parsed_args): Get logging options and setup logging """ # Convert -vvv to log level - if __opts__['log_level'] is None: + if __opts__["log_level"] is None: # Default to 'error' - __opts__['log_level'] = 'error' + __opts__["log_level"] = "error" # Default to more verbose if we're daemonizing - if __opts__['daemonize']: - __opts__['log_level'] = 'info' + if __opts__["daemonize"]: + __opts__["log_level"] = "info" # Handle the explicit -vvv settings - if __opts__['verbose']: - if __opts__['verbose'] == 1: - __opts__['log_level'] = 'warning' - elif __opts__['verbose'] == 2: - __opts__['log_level'] = 'info' - elif __opts__['verbose'] >= 3: - __opts__['log_level'] = 'debug' + if __opts__["verbose"]: + if __opts__["verbose"] == 1: + __opts__["log_level"] = "warning" + elif __opts__["verbose"] == 2: + __opts__["log_level"] = "info" + elif __opts__["verbose"] >= 3: + __opts__["log_level"] = "debug" # Console logging is probably the same, but can be different console_logging_opts = { - 'log_level': __opts__.get('console_log_level', __opts__['log_level']), - 'log_format': __opts__.get('console_log_format', - '%(asctime)s [%(levelname)-5s] %(message)s'), - 'date_format': __opts__.get('console_log_date_format', '%H:%M:%S'), + "log_level": __opts__.get("console_log_level", __opts__["log_level"]), + "log_format": __opts__.get( + "console_log_format", "%(asctime)s [%(levelname)-5s] %(message)s" + ), + "date_format": __opts__.get("console_log_date_format", "%H:%M:%S"), } file_logging_opts = { - 'log_file': __opts__.get('log_file', '/var/log/hubble'), - 'log_level': __opts__['log_level'], - 'log_format': __opts__.get('log_format', '%(asctime)s,%(msecs)03d [%(levelname)-5s]' - ' [%(name)s:%(lineno)d] %(message)s'), - 'date_format': __opts__.get('log_date_format', '%Y-%m-%d %H:%M:%S'), - 'max_bytes': __opts__.get('logfile_maxbytes', 100000000), - 'backup_count': __opts__.get('logfile_backups', 1), + "log_file": __opts__.get("log_file", "/var/log/hubble"), + "log_level": __opts__["log_level"], + "log_format": __opts__.get( + "log_format", + "%(asctime)s,%(msecs)03d [%(levelname)-5s]" + " [%(name)s:%(lineno)d] %(message)s", + ), + "date_format": __opts__.get("log_date_format", "%Y-%m-%d %H:%M:%S"), + "max_bytes": __opts__.get("logfile_maxbytes", 100000000), + "backup_count": __opts__.get("logfile_backups", 1), } # Setup logging hubblestack.log.setup_console_logger(**console_logging_opts) - if not parsed_args['skip_file_logger']: + if not parsed_args["skip_file_logger"]: hubblestack.log.setup_file_logger(**file_logging_opts) - with open(__opts__['log_file'], 'a') as _logfile: + with open(__opts__["log_file"], "a") as _logfile: pass # ensure the file exists before we set perms on it - os.chmod(__opts__['log_file'], 0o600) + os.chmod(__opts__["log_file"], 0o600) - configfile = parsed_args.get('configfile') + configfile = parsed_args.get("configfile") if configfile and os.path.isfile(configfile): os.chmod(configfile, 0o600) @@ -654,20 +763,22 @@ def _setup_dirs(): # we have to uber-override and make sure our files dir is in root # and that root file systems are enabled - this_root_files = os.path.join(this_dir, 'files') + this_root_files = os.path.join(this_dir, "files") - if 'file_roots' not in __opts__: - __opts__['file_roots'] = dict(base=list()) + if "file_roots" not in __opts__: + __opts__["file_roots"] = dict(base=list()) - elif 'base' not in __opts__['file_roots']: - __opts__['file_roots']['base'] = [ this_root_files ] + elif "base" not in __opts__["file_roots"]: + __opts__["file_roots"]["base"] = [this_root_files] else: - __opts__['file_roots']['base'] = [this_root_files] \ - + [x for x in __opts__['file_roots']['base'] if x != this_root_files ] + __opts__["file_roots"]["base"] = [this_root_files] + [ + x for x in __opts__["file_roots"]["base"] if x != this_root_files + ] + + if "roots" not in __opts__["fileserver_backend"]: + __opts__["fileserver_backend"].append("roots") - if 'roots' not in __opts__['fileserver_backend']: - __opts__['fileserver_backend'].append('roots') # 600s is a long time to get stuck loading grains and *not* be doing things # like nova/pulsar. The SIGALRM will get caught by hubblestack.loader.raw_mod as an @@ -683,7 +794,7 @@ def _setup_dirs(): # # tag='hubble:rg' will appear in the logs to differentiate this from other # hangtime_wrapper timers (if any) -@hangtime_wrapper(timeout=600, repeats=True, tag='hubble:rg') +@hangtime_wrapper(timeout=600, repeats=True, tag="hubble:rg") @HSS.watch def refresh_grains(initial=False): """ @@ -698,35 +809,40 @@ def refresh_grains(initial=False): global __context__ # 'POP' is for tracking persistent opts protection - if os.environ.get('NOISY_POP_DEBUG'): - log.error('POP refreshing grains (id=%d)', id(__opts__)) + if os.environ.get("NOISY_POP_DEBUG"): + log.error("POP refreshing grains (id=%d)", id(__opts__)) persist, old_grains = {}, {} if initial: - if not os.environ.get('NO_PRESERVE_OPTS'): - if os.environ.get('NOISY_POP_DEBUG'): - log.error('POP setting __opts__ to preservable (id=%d)', id(__opts__)) + if not os.environ.get("NO_PRESERVE_OPTS"): + if os.environ.get("NOISY_POP_DEBUG"): + log.error("POP setting __opts__ to preservable (id=%d)", id(__opts__)) hubblestack.loader.set_preservable_opts(__opts__) - elif os.environ.get('NOISY_POP_DEBUG'): - log.error('POP we are not attemting to protect __opts__ from lazyloader reloads') + elif os.environ.get("NOISY_POP_DEBUG"): + log.error( + "POP we are not attemting to protect __opts__ from lazyloader reloads" + ) else: old_grains = copy.deepcopy(__grains__) - for grain in __opts__.get('grains_persist', []): + for grain in __opts__.get("grains_persist", []): if grain in __grains__: persist[grain] = __grains__[grain] # Hardcode these core grains as persisting - persist = {grain: __grains__[grain] for grain in ['hubble_version', 'buildinfo'] - if grain in __grains__} + persist = { + grain: __grains__[grain] + for grain in ["hubble_version", "buildinfo"] + if grain in __grains__ + } if initial: __context__ = {} - if 'grains' in __opts__: - __opts__.pop('grains') - if 'pillar' in __opts__: - __opts__.pop('pillar') + if "grains" in __opts__: + __opts__.pop("grains") + if "pillar" in __opts__: + __opts__.pop("pillar") __grains__ = hubblestack.loader.grains(__opts__) __grains__.update(persist) - __grains__['session_uuid'] = SESSION_UUID + __grains__["session_uuid"] = SESSION_UUID # This was a weird one. In older versions of hubble the version and # buildinfo were not persisted automatically which means that if you @@ -734,23 +850,28 @@ def refresh_grains(initial=False): # cause that old daemon to report grains as if it were the new version. # Now if this hubble_marker_3 grain is present you know you can trust the # hubble_version and buildinfo. - __grains__['hubble_marker_3'] = True + __grains__["hubble_marker_3"] = True old_grains.update(__grains__) __grains__ = old_grains # Check for default gateway and fall back if necessary - if __grains__.get('ip_gw', None) is False and 'fallback_fileserver_backend' in __opts__: - log.info('No default gateway detected; using fallback_fileserver_backend.') - __opts__['fileserver_backend'] = __opts__['fallback_fileserver_backend'] - - __opts__['hubble_uuid'] = __grains__.get('hubble_uuid', None) - __opts__['system_uuid'] = __grains__.get('system_uuid', None) + if ( + __grains__.get("ip_gw", None) is False + and "fallback_fileserver_backend" in __opts__ + ): + log.info("No default gateway detected; using fallback_fileserver_backend.") + __opts__["fileserver_backend"] = __opts__["fallback_fileserver_backend"] + + __opts__["hubble_uuid"] = __grains__.get("hubble_uuid", None) + __opts__["system_uuid"] = __grains__.get("system_uuid", None) __pillar__ = {} - __opts__['grains'] = __grains__ - __opts__['pillar'] = __pillar__ + __opts__["grains"] = __grains__ + __opts__["pillar"] = __pillar__ __utils__ = hubblestack.loader.utils(__opts__) - __mods__ = hubblestack.loader.modules(__opts__, utils=__utils__, context=__context__) + __mods__ = hubblestack.loader.modules( + __opts__, utils=__utils__, context=__context__ + ) __returners__ = hubblestack.loader.returners(__opts__, __mods__) # the only things that turn up in here (and that get preserved) @@ -764,6 +885,9 @@ def refresh_grains(initial=False): hubblestack.hec.opt.__mods__ = __mods__ hubblestack.hec.opt.__opts__ = __opts__ + hubblestack.filter.filter_chain.__mods__ = __mods__ + hubblestack.filter.filter_chain.__opts__ = __opts__ + hubblestack.log.splunk.__grains__ = __grains__ hubblestack.log.splunk.__mods__ = __mods__ hubblestack.log.splunk.__opts__ = __opts__ @@ -793,8 +917,8 @@ def refresh_grains(initial=False): hubblestack.log.refresh_handler_std_info() clear_selective_context() - if not initial and __mods__['config.get']('splunklogging', False): - hubblestack.log.emit_to_splunk(__grains__, 'INFO', 'hubblestack.grains_report') + if not initial and __mods__["config.get"]("splunklogging", False): + hubblestack.log.emit_to_splunk(__grains__, "INFO", "hubblestack.grains_report") def emit_to_syslog(grains_to_emit): @@ -804,20 +928,21 @@ def emit_to_syslog(grains_to_emit): try: # Avoid a syslog line to be longer than 1024 characters # Build syslog message - syslog_list = ['hubble_syslog_message:'] + syslog_list = ["hubble_syslog_message:"] for grain in grains_to_emit: if grain in __grains__: if bool(__grains__[grain]) and isinstance(__grains__[grain], dict): for key, value in __grains__[grain].items(): - syslog_list.append('{0}={1}'.format(key, value)) + syslog_list.append("{0}={1}".format(key, value)) else: - syslog_list.append('{0}={1}'.format(grain, __grains__[grain])) - syslog_message = ' '.join(syslog_list) - log.info('Emitting some grains to syslog') + syslog_list.append("{0}={1}".format(grain, __grains__[grain])) + syslog_message = " ".join(syslog_list) + log.info("Emitting some grains to syslog") syslog.openlog(logoption=syslog.LOG_PID) syslog.syslog(syslog_message) except Exception as exc: - log.exception('An exception occurred on emitting a message to syslog: %s', exc) + log.exception("An exception occurred on emitting a message to syslog: %s", exc) + def clear_selective_context(): """ @@ -829,39 +954,79 @@ def clear_selective_context(): # Fixing bug: Package list is not refreshed # clear the package list so that pkg module can fetch it as fresh in next cycle - __context__.pop('pkg.list_pkgs', None) + __context__.pop("pkg.list_pkgs", None) + def parse_args(args=None): """ Parse command line arguments """ parser = argparse.ArgumentParser() - parser.add_argument('-d', '--daemonize', action='store_true', - help='Whether to daemonize and background the process') parser.add_argument( - '-c', '--configfile', default=None, - help='Pass in an alternative configuration file. Default: /etc/hubble/hubble') - parser.add_argument('-p', '--no-pprint', help='Turn off pprint for single-function output', - action='store_true') - parser.add_argument('--skip-file-logger', + "-d", + "--daemonize", + action="store_true", + help="Whether to daemonize and background the process", + ) + parser.add_argument( + "-c", + "--configfile", + default=None, + help="Pass in an alternative configuration file. Default: /etc/hubble/hubble", + ) + parser.add_argument( + "-p", + "--no-pprint", + help="Turn off pprint for single-function output", + action="store_true", + ) + parser.add_argument( + "--skip-file-logger", help="Prevent logger from writing to /var/log/hubble.log", - action='store_true') - parser.add_argument('-v', '--verbose', action='count', - help=('Verbosity level. Use -v or -vv or -vvv for ' - 'varying levels of verbosity. Note that -vv ' - 'will be used by default in daemon mode.')) - parser.add_argument('-r', '--return', default=None, - help='Pass in a returner for single-function runs') - parser.add_argument('--version', action='store_true', help='Show version information') - parser.add_argument('--buildinfo', action='store_true', help='Show build information') - parser.add_argument('function', nargs='?', default=None, - help='Optional argument for the single function to be run') - parser.add_argument('args', nargs='*', help='Any arguments necessary for a single function run') + action="store_true", + ) + parser.add_argument( + "-v", + "--verbose", + action="count", + help=( + "Verbosity level. Use -v or -vv or -vvv for " + "varying levels of verbosity. Note that -vv " + "will be used by default in daemon mode." + ), + ) + parser.add_argument( + "-r", + "--return", + default=None, + help="Pass in a returner for single-function runs", + ) + parser.add_argument( + "--version", action="store_true", help="Show version information" + ) parser.add_argument( - '-j', '--json-print', action='store_true', - help='Optional argument to print the output of single run function in json format') - parser.add_argument('--ignore_running', action='store_true', - help='Ignore any running hubble processes. This disables the pidfile.') + "--buildinfo", action="store_true", help="Show build information" + ) + parser.add_argument( + "function", + nargs="?", + default=None, + help="Optional argument for the single function to be run", + ) + parser.add_argument( + "args", nargs="*", help="Any arguments necessary for a single function run" + ) + parser.add_argument( + "-j", + "--json-print", + action="store_true", + help="Optional argument to print the output of single run function in json format", + ) + parser.add_argument( + "--ignore_running", + action="store_true", + help="Ignore any running hubble processes. This disables the pidfile.", + ) return vars(parser.parse_args(args=args)) @@ -875,9 +1040,9 @@ def check_pidfile(kill_other=False, scan_proc=True): otherwise exit with an error. """ - pidfile_path = __opts__['pidfile'] + pidfile_path = __opts__["pidfile"] if os.path.isfile(pidfile_path): - with open(pidfile_path, 'r') as pidfile: + with open(pidfile_path, "r") as pidfile: xpid = pidfile.readline().strip() try: xpid = int(xpid) @@ -885,35 +1050,36 @@ def check_pidfile(kill_other=False, scan_proc=True): xpid = 0 log.warn('unable to parse pid="%d" in pidfile=%s', xpid, pidfile_path) if xpid: - log.warn('pidfile=%s exists and contains pid=%d', pidfile_path, xpid) + log.warn("pidfile=%s exists and contains pid=%d", pidfile_path, xpid) kill_other_or_sys_exit(xpid, kill_other=kill_other) if scan_proc: scan_proc_for_hubbles(kill_other=kill_other) -def kill_other_or_sys_exit(xpid, hname=r'hubble', ksig=signal.SIGTERM, kill_other=True, - no_pgrp=True): - """ Attempt to locate other hubbles using a cmdline regular expression and kill them when found. - If killing the other processes fails (or kill_other is False), sys.exit instead. +def kill_other_or_sys_exit( + xpid, hname=r"hubble", ksig=signal.SIGTERM, kill_other=True, no_pgrp=True +): + """Attempt to locate other hubbles using a cmdline regular expression and kill them when found. + If killing the other processes fails (or kill_other is False), sys.exit instead. - params: - hname :- the regular expression pattern to use to locate hubble (default: hubble) - ksig :- the signal to use to kill the other processes (default: signal.SIGTERM=15) - kill_other :- (default: True); when false, don't attempt to kill, - just locate and exit (if found) - no_pgrp :- Avoid killing processes in this pgrp (avoid suicide). When no_pgrp is True, - invoke os.getprgp() to populate the actual value. + params: + hname :- the regular expression pattern to use to locate hubble (default: hubble) + ksig :- the signal to use to kill the other processes (default: signal.SIGTERM=15) + kill_other :- (default: True); when false, don't attempt to kill, + just locate and exit (if found) + no_pgrp :- Avoid killing processes in this pgrp (avoid suicide). When no_pgrp is True, + invoke os.getprgp() to populate the actual value. - caveats: - There are some detailed notes on the process scanning in the function as comments. + caveats: + There are some detailed notes on the process scanning in the function as comments. - The most important caveat is that the hname regular expressions must match expecting - that /proc/$$/cmdline text is null separated, not space separated. + The most important caveat is that the hname regular expressions must match expecting + that /proc/$$/cmdline text is null separated, not space separated. - The other main caveat is that we can't actually examine the /proc/$$/exe file (that's - always just a python). We have to scan the invocation text the kernel stored at launch. - That text is not immutable and should not (normally) be relied upon for any purpose - -- and this method does rely on it. + The other main caveat is that we can't actually examine the /proc/$$/exe file (that's + always just a python). We have to scan the invocation text the kernel stored at launch. + That text is not immutable and should not (normally) be relied upon for any purpose + -- and this method does rely on it. """ if no_pgrp is True: @@ -925,31 +1091,39 @@ def kill_other_or_sys_exit(xpid, hname=r'hubble', ksig=signal.SIGTERM, kill_othe # any good the /opt/whatever/bin/hubble is normally a text file with a # shebang; which the kernel picks up and uses to execute the real binary # with the "bin" file as an argument; so we'll have to live with cmdline - pfile = '/proc/{pid}/cmdline'.format(pid=xpid) - log.debug('searching %s for hubble procs matching %s', pfile, hname) - with open(pfile, 'r') as pidfile: + pfile = "/proc/{pid}/cmdline".format(pid=xpid) + log.debug("searching %s for hubble procs matching %s", pfile, hname) + with open(pfile, "r") as pidfile: # NOTE: cmdline is actually null separated, not space separated # that shouldn't matter much for most hname regular expressions, but one never knows. - cmdline = pidfile.readline().replace('\x00', ' ').strip() + cmdline = pidfile.readline().replace("\x00", " ").strip() if re.search(hname, cmdline): if no_pgrp: - pstatfile = '/proc/{pid}/stat'.format(pid=xpid) - with open(pstatfile, 'r') as fh2: + pstatfile = "/proc/{pid}/stat".format(pid=xpid) + with open(pstatfile, "r") as fh2: # NOTE: man proc(5) ยง /proc/[pid]/stat # (pid, comm, state, ppid, pgrp, session, tty_nr, tpgid, flags, ...) pgrp = fh2.readline().split()[4] if pgrp == no_pgrp: - log.debug("process (%s) exists and seems to be a hubble, " - "but matches our process group (%s), ignoring", xpid, pgrp) + log.debug( + "process (%s) exists and seems to be a hubble, " + "but matches our process group (%s), ignoring", + xpid, + pgrp, + ) return False if kill_other: log.warn( "process seems to still be alive and seems to be hubble," - " attempting to shutdown") + " attempting to shutdown" + ) os.kill(int(xpid), ksig) time.sleep(1) if os.path.isdir("/proc/{pid}".format(pid=xpid)): - log.error("fatal error: failed to shutdown process (pid=%s) successfully", xpid) + log.error( + "fatal error: failed to shutdown process (pid=%s) successfully", + xpid, + ) sys.exit(1) else: return True @@ -959,21 +1133,31 @@ def kill_other_or_sys_exit(xpid, hname=r'hubble', ksig=signal.SIGTERM, kill_othe else: # pidfile present, but nothing at that pid. Did we receive a sigterm? log.warning( - 'Pidfile found on startup, but no process at that pid. Did hubble receive a SIGTERM?') + "Pidfile found on startup, but no process at that pid. Did hubble receive a SIGTERM?" + ) return False -def scan_proc_for_hubbles(_proc_path='/proc', hname=r'^/\S+python.*?/opt/.*?hubble', - kill_other=True, ksig=signal.SIGTERM): - """ look for other hubble processes and kill them or sys.exit()""" +def scan_proc_for_hubbles( + _proc_path="/proc", + hname=r"^/\S+python.*?/opt/.*?hubble", + kill_other=True, + ksig=signal.SIGTERM, +): + """look for other hubble processes and kill them or sys.exit()""" no_pgrp = str(os.getpgrp()) - rpid = re.compile(r'\d+') - if os.path.isdir('/proc'): - for dirname, dirs, _files in os.walk('/proc'): - if dirname == '/proc': + rpid = re.compile(r"\d+") + if os.path.isdir("/proc"): + for dirname, dirs, _files in os.walk("/proc"): + if dirname == "/proc": for pid in [i for i in dirs if rpid.match(i)]: - kill_other_or_sys_exit(pid, hname=hname, kill_other=kill_other, ksig=ksig, - no_pgrp=no_pgrp) + kill_other_or_sys_exit( + pid, + hname=hname, + kill_other=kill_other, + ksig=ksig, + no_pgrp=no_pgrp, + ) break @@ -981,9 +1165,9 @@ def create_pidfile(): """ Create a pidfile after daemonizing """ - if not __opts__.get('ignore_running', False): + if not __opts__.get("ignore_running", False): pid = os.getpid() - with open(__opts__['pidfile'], 'w') as pidfile: + with open(__opts__["pidfile"], "w") as pidfile: pidfile.write(str(pid)) @@ -993,19 +1177,22 @@ def clean_up_process(received_signal, frame): pidfile and anything else that needs to be cleaned up. """ if received_signal is None and frame is None: - if not __opts__.get('ignore_running', False): - if __opts__['daemonize']: - if os.path.isfile(__opts__['pidfile']): - os.remove(__opts__['pidfile']) + if not __opts__.get("ignore_running", False) and \ + __opts__["daemonize"] and \ + os.path.isfile(__opts__["pidfile"]): + os.remove(__opts__["pidfile"]) sys.exit(0) try: - if __mods__['config.get']('splunklogging', False): - hubblestack.log.emit_to_splunk('Signal {0} detected'.format(received_signal), - 'INFO', 'hubblestack.signals') + if __mods__["config.get"]("splunklogging", False): + hubblestack.log.emit_to_splunk( + "Signal {0} detected".format(received_signal), + "INFO", + "hubblestack.signals", + ) finally: if received_signal == signal.SIGINT or received_signal == signal.SIGTERM: - if not __opts__.get('ignore_running', False): - if __opts__['daemonize']: - if os.path.isfile(__opts__['pidfile']): - os.remove(__opts__['pidfile']) + if not __opts__.get("ignore_running", False): + if __opts__["daemonize"]: + if os.path.isfile(__opts__["pidfile"]): + os.remove(__opts__["pidfile"]) sys.exit(0) diff --git a/hubblestack/files/filter_chain.yaml b/hubblestack/files/filter_chain.yaml new file mode 100644 index 000000000..1860870ae --- /dev/null +++ b/hubblestack/files/filter_chain.yaml @@ -0,0 +1,9 @@ +filter: + default: + sequence_id: + label: "seq" + type: "hubblestack.filter.seq_id" + padding: 0 + hubble_version: + label: "hubble_version" + type: "hubblestack.filter.hubble_version" diff --git a/hubblestack/filter/__init__.py b/hubblestack/filter/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/hubblestack/filter/base.py b/hubblestack/filter/base.py new file mode 100644 index 000000000..6812dc6cf --- /dev/null +++ b/hubblestack/filter/base.py @@ -0,0 +1,19 @@ +class BaseFilter: + """ + Base class for filtering messages before being emitted to logging systems + """ + + def __init__(self, name, default_label, config=None): + self.name = name + self.default_label = default_label + self.config = {} if not config else config.copy() + + def _process_config(self, config=None): + if config == None: + return + + def get_label(self): + return self.config.get("label", self.default_label) + + def get_subclass_name(self): + return self.__class__.__name__ diff --git a/hubblestack/filter/filter_chain.py b/hubblestack/filter/filter_chain.py new file mode 100644 index 000000000..52d18fea8 --- /dev/null +++ b/hubblestack/filter/filter_chain.py @@ -0,0 +1,96 @@ +## + +import importlib +import logging +import yaml + +import hubblestack.filter.seq_id as seq_id +from hubblestack.exceptions import CommandExecutionError +import hubblestack.modules.cp + +log = logging.getLogger(__name__) + +chains = {} + +class FilterChain: + """ + FilterChain loads the filter config from the hubble profile filter_chain.yaml + This configuraiton file will have a default configuration, as well as options for + overriding with returner specific filter order + example: + + default: + sequence: # Label + type: "hubblestack.filter.seq_id" # filter type + label: "seq" # optional: what field name to add + padding: 10 # seq: 0000000001, 0000000002 + """ + + def __init__(self, config_path, config_label="default"): + """ + config_path - path to yaml file defining the configuration for the filter chain + config_label - the label in the yaml file underwhich the filters and their configurations are located + """ + self.config_path = config_path + self.config_label = config_label + + # force the loading of the config. # anti-pattern + self._config = self.config + self._chain = None + + @property + def config(self): + self.cached_path = __mods__["cp.cache_file"](self.config_path) + + try: + with open(self.cached_path, 'r') as handle: + yaml_config = yaml.safe_load(handle) + except Exception as e: + yaml__config = {"default": {"filter": { "default": { + "sequence_id": { "label": "seq", "type": "hubblestack.filter.seq_id" }, + "hubble_version": { "label": "hubble_version", "type": "hubblestack.filter.hubble_version"}, + "filter_error": { "label": "load_error", "type": "hubblestack.filter.static_value", "value": "true"}}}}} + raise CommandExecutionError(f"Could not load filter config: {e}") + + if not isinstance(yaml_config, dict) or \ + "filter" not in yaml_config or \ + not(isinstance(yaml_config["filter"], dict)) or \ + self.config_label not in yaml_config["filter"].keys() or \ + not(isinstance(yaml_config["filter"][self.config_label], dict)): + raise CommandExecutionError("FilterChain config not formatted correctly") + + yaml_config = yaml_config['filter'][self.config_label] + + return yaml_config + + @property + def chain(self): + if self._chain is None or len(self._chain) == 0: + _chain = [] + for filter_name in self.config: + new_fltr = self._get_filter_class(self.config[filter_name]["type"])(filter_name, self.config[filter_name]) + _chain.append(new_fltr) + self._chain = _chain + return self._chain + + + def filter(self, msg=None): + for filter in self.chain: + try: + filter.filter(msg) + except Exception as e: + log.error(f"Error processing filters: {e}") + log.debug(msg) + + def _get_filter_class(self, filter_tag): + module = importlib.import_module(filter_tag) + return getattr(module, "Filter") + + @staticmethod + def get_chain(chain_name, filter_path="salt://filter_chain.yaml"): + if chain_name not in chains.keys(): + log.info(f"REBUILDING CHAINS for {chain_name}") + chains[chain_name] = FilterChain(filter_path) + log.info(f"GOT CHAIN {chain_name}") + + return chains[chain_name] diff --git a/hubblestack/filter/hubble_version.py b/hubblestack/filter/hubble_version.py new file mode 100644 index 000000000..ebe0ec8db --- /dev/null +++ b/hubblestack/filter/hubble_version.py @@ -0,0 +1,24 @@ +import threading + +from hubblestack.filter.base import BaseFilter +import hubblestack.version + +class Filter(BaseFilter): + """ + A Filter for adding a sequence number 'seq' to each message + """ + + DEFAULT_LABEL = "hubble_version" + + def __init__(self, name, config=None): + super().__init__(name, self.DEFAULT_LABEL, config) + + def filter(self, msg): + """ + add a sequence number if the msg does not have one already + """ + if self.get_label() not in msg.keys(): + msg[self.get_label()] = hubblestack.version.__version__ + return msg + + diff --git a/hubblestack/filter/seq_id.py b/hubblestack/filter/seq_id.py new file mode 100644 index 000000000..7238d0579 --- /dev/null +++ b/hubblestack/filter/seq_id.py @@ -0,0 +1,42 @@ +import logging +import threading + +from hubblestack.filter.base import BaseFilter + +log = logging.getLogger(__name__) + +class Filter(BaseFilter): + """ + A Filter for adding a sequence number 'seq' to each message + """ + + DEFAULT_LABEL = "seq" + current_seq = 0 + + def __init__(self, name, config=None): + super().__init__(name, self.DEFAULT_LABEL, config) + self.semaphore = threading.Semaphore(1) + + + def filter(self, msg): + """ + add a sequence number if the msg does not have one already + """ + if self.get_label() not in msg.keys(): + msg[self.get_label()] = self.get_next_value() + log.info(msg) + return msg + + def get_next_value(self): + my_seq = None + with self.semaphore: + self.current_seq = self.current_seq + 1 + my_seq = self.current_seq + + value = str(my_seq).rjust(self.get_padding(), "0") + return value + + def get_padding(self): + if "padding" in self.config: + return int(self.config["padding"]) + return 0 diff --git a/hubblestack/filter/static.py b/hubblestack/filter/static.py new file mode 100644 index 000000000..87cf74731 --- /dev/null +++ b/hubblestack/filter/static.py @@ -0,0 +1,27 @@ +import logging +import threading + +from hubblestack.filter.base import BaseFilter + +log = logging.getLogger(__name__) + +class Filter(BaseFilter): + """ + A Filter for adding a sequence number 'seq' to each message + """ + + DEFAULT_LABEL = "static" + DEFAULT_VALUE = "static" + + def __init__(self, name, config=None): + super().__init__(name, self.DEFAULT_LABEL, config) + + + def filter(self, msg): + """ + add a sequence number if the msg does not have one already + """ + if self.get_label() not in msg.keys(): + msg[self.get_label()] = self.config["value"] if "value" in self.config.keys() else DEFAULT_VALUE + return msg + diff --git a/hubblestack/loader.py b/hubblestack/loader.py index f72e3b822..b37fd4680 100644 --- a/hubblestack/loader.py +++ b/hubblestack/loader.py @@ -239,6 +239,17 @@ def returners(opts, functions, whitelist=None, context=None, proxy=None): pack={"__mods__": functions, "__context__": context, "__proxy__": proxy or {}}, ) +def filter(opts, whitelist=None, context=None, proxy=None): + """ + Returns the utility modules + """ + return LazyLoader( + _module_dirs(opts, "filter", ext_type_dirs="filter_dirs"), + opts, + tag="filter", + whitelist=whitelist, + pack={"__context__": context, "__proxy__": proxy or {}}, + ) def utils(opts, whitelist=None, context=None, proxy=None): """ @@ -252,7 +263,6 @@ def utils(opts, whitelist=None, context=None, proxy=None): pack={"__context__": context, "__proxy__": proxy or {}}, ) - def fileserver(opts, backends): """ Returns the file server modules @@ -261,7 +271,6 @@ def fileserver(opts, backends): _module_dirs(opts, "fileserver"), opts, tag="fileserver", whitelist=backends, pack={"__utils__": utils(opts)} ) - def grain_funcs(opts): """ Returns the grain functions diff --git a/hubblestack/returners/splunk_nebula_return.py b/hubblestack/returners/splunk_nebula_return.py index 492843f9e..ad5ff8841 100644 --- a/hubblestack/returners/splunk_nebula_return.py +++ b/hubblestack/returners/splunk_nebula_return.py @@ -45,6 +45,7 @@ import time from datetime import datetime from hubblestack.hec import http_event_collector, get_splunk_options, make_hec_args +from hubblestack.filter.filter_chain import FilterChain _MAX_CONTENT_BYTES = 100000 @@ -58,24 +59,28 @@ def returner(ret): Get nebula data and post it to Splunk """ # st = 'salt:hubble:nova' - if not ret['return']: + if not ret["return"]: return host_args = _build_args(ret) # Get cloud details - cloud_details = __grains__.get('cloud_details', {}) + cloud_details = __grains__.get("cloud_details", {}) try: - opts_list = get_splunk_options(sourcetype='hubble_osquery', - add_query_to_sourcetype=True, - _nick={'sourcetype_nebula': 'sourcetype'}) + opts_list = get_splunk_options( + sourcetype="hubble_osquery", + add_query_to_sourcetype=True, + _nick={"sourcetype_nebula": "sourcetype"}, + ) for opts in opts_list: - logging.debug('Options: %s', json.dumps(opts)) + logging.debug("Options: %s", json.dumps(opts)) # Set up the fields to be extracted at index time. The field values must be strings. # Note that these fields will also still be available in the event data index_extracted_fields = [] try: - index_extracted_fields.extend(__opts__.get('splunk_index_extracted_fields', [])) + index_extracted_fields.extend( + __opts__.get("splunk_index_extracted_fields", []) + ) except TypeError: pass @@ -83,22 +88,26 @@ def returner(ret): args, kwargs = make_hec_args(opts) hec = http_event_collector(*args, **kwargs) - for query in ret['return']: + for query in ret["return"]: for query_name, query_results in query.items(): - if 'data' not in query_results: - query_results['data'] = [{'error': 'result missing'}] - for query_result in query_results['data']: - payload = _generate_payload(host_args=host_args, opts=opts, - query_data={'query_name': query_name, - 'query_result': query_result}, - index_extracted_fields=index_extracted_fields, - cloud_details=cloud_details) + if "data" not in query_results: + query_results["data"] = [{"error": "result missing"}] + for query_result in query_results["data"]: + payload = _generate_payload( + host_args=host_args, + opts=opts, + query_data={ + "query_name": query_name, + "query_result": query_result, + }, + index_extracted_fields=index_extracted_fields, + cloud_details=cloud_details, + ) event_time = _check_time(query_result) hec.batchEvent(payload, eventtime=event_time) hec.flushBatch() - except Exception: - log.exception('Error ocurred in splunk_nebula_return') - return + except Exception as e: + log.exception(f"Error ocurred in splunk_nebula_return: {e}") def _build_args(ret): @@ -107,35 +116,37 @@ def _build_args(ret): processing the variables we care about """ # Sometimes fqdn is blank. If it is, replace it with minion_id - fqdn = __grains__['fqdn'] if __grains__['fqdn'] else ret['id'] + fqdn = __grains__["fqdn"] if __grains__["fqdn"] else ret["id"] + + # TODO: create give_me_a_reasonable_ip_address utility function try: - fqdn_ip4 = __grains__.get('local_ip4') - if not fqdn_ip4: - fqdn_ip4 = __grains__['fqdn_ip4'][0] + fqdn_ip4 = __grains__.get("local_ip4") or __grains__["fqdn_ip4"][0] except IndexError: try: - fqdn_ip4 = __grains__['ipv4'][0] + fqdn_ip4 = __grains__["ipv4"][0] except IndexError: - raise Exception('No ipv4 grains found. Is net-tools installed?') - if fqdn_ip4.startswith('127.'): - for ip4_addr in __grains__['ipv4']: - if ip4_addr and not ip4_addr.startswith('127.'): + raise Exception("No ipv4 grains found. Is net-tools installed?") + if fqdn_ip4.startswith("127."): + for ip4_addr in __grains__["ipv4"]: + if ip4_addr and not ip4_addr.startswith("127."): fqdn_ip4 = ip4_addr break - local_fqdn = __grains__.get('local_fqdn', __grains__['fqdn']) - - args = {'minion_id': ret['id'], - 'job_id': ret['jid'], - 'fqdn': fqdn, - 'fqdn_ip4': fqdn_ip4, - 'local_fqdn': local_fqdn} + local_fqdn = __grains__.get("local_fqdn", __grains__["fqdn"]) + + args = { + "minion_id": ret["id"], + "job_id": ret["jid"], + "fqdn": fqdn, + "fqdn_ip4": fqdn_ip4, + "local_fqdn": local_fqdn, + } # Sometimes fqdn reports a value of localhost. If that happens, try another method. - bad_fqdns = ['localhost', 'localhost.localdomain', 'localhost6.localdomain6'] + bad_fqdns = ["localhost", "localhost.localdomain", "localhost6.localdomain6"] if fqdn in bad_fqdns: new_fqdn = socket.gethostname() - if '.' not in new_fqdn or new_fqdn in bad_fqdns: + if "." not in new_fqdn or new_fqdn in bad_fqdns: new_fqdn = fqdn_ip4 - args['fqdn'] = new_fqdn + args["fqdn"] = new_fqdn return args @@ -146,58 +157,66 @@ def _generate_event(host_args, query_result, query_name, custom_fields, cloud_de """ event = {} event.update(query_result) - event.update({'query': query_name, - 'job_id': host_args['job_id'], - 'minion_id': host_args['minion_id'], - 'dest_host': host_args['fqdn'], - 'dest_ip': host_args['fqdn_ip4'], - 'dest_fqdn': host_args['local_fqdn'], - 'system_uuid': __grains__.get('system_uuid')}) + event.update( + { + "query": query_name, + "job_id": host_args["job_id"], + "minion_id": host_args["minion_id"], + "dest_host": host_args["fqdn"], + "dest_ip": host_args["fqdn_ip4"], + "dest_fqdn": host_args["local_fqdn"], + "system_uuid": __grains__.get("system_uuid"), + } + ) event.update(cloud_details) + FilterChain.get_chain(__name__).filter(event) + log.info(event) for custom_field in custom_fields: - custom_field_name = 'custom_' + custom_field - custom_field_value = __mods__['config.get'](custom_field, '') + custom_field_name = "custom_" + custom_field + custom_field_value = __mods__["config.get"](custom_field, "") if isinstance(custom_field_value, str): event.update({custom_field_name: custom_field_value}) elif isinstance(custom_field_value, list): - custom_field_value = ','.join(custom_field_value) + custom_field_value = ",".join(custom_field_value) event.update({custom_field_name: custom_field_value}) # Remove any empty fields from the event payload - remove_keys = [k for k,v in event.items() if v is None or v == ""] + remove_keys = [k for k, v in event.items() if v is None or v == ""] for k in remove_keys: del event[k] return event -def _generate_payload(host_args, opts, query_data, cloud_details, index_extracted_fields): +def _generate_payload( + host_args, opts, query_data, cloud_details, index_extracted_fields +): """ Build the payload that will be published to Splunk """ - query_name = query_data['query_name'] - query_result = query_data['query_result'] - payload = {'host': host_args['fqdn'], - 'index': opts['index']} - if opts['add_query_to_sourcetype']: - payload.update( - {'sourcetype': "%s_%s" % (opts['sourcetype'], query_name)}) + query_name = query_data["query_name"] + query_result = query_data["query_result"] + payload = {"host": host_args["fqdn"], "index": opts["index"]} + if opts["add_query_to_sourcetype"]: + payload.update({"sourcetype": f"{opts['sourcetype']}_{query_name}"}) else: - payload.update({'sourcetype': opts['sourcetype']}) + payload.update({"sourcetype": opts["sourcetype"]}) - event = _generate_event(host_args, query_result, query_name, - opts['custom_fields'], cloud_details) - payload['event'] = event + event = _generate_event( + host_args, query_result, query_name, opts["custom_fields"], cloud_details + ) + payload["event"] = event # Potentially add metadata fields: fields = {} for item in index_extracted_fields: - if item in payload['event'] and not isinstance(payload['event'][item], - (list, dict, tuple)): - fields["meta_%s" % item] = str(payload['event'][item]) + if item in payload["event"] and not isinstance( + payload["event"][item], (list, dict, tuple) + ): + fields["meta_%s" % item] = str(payload["event"][item]) if fields: - payload.update({'fields': fields}) + payload.update({"fields": fields}) return payload @@ -207,12 +226,14 @@ def _check_time(query_result): If the osquery query includes a field called 'time' it will be checked. If it's within the last year, it will be used as the eventtime. """ - event_time = query_result.get('time', '') + event_time = query_result.get("time", "") try: - if (datetime.fromtimestamp(time.time()) - datetime.fromtimestamp( - float(event_time))).days > 365: - event_time = '' + if ( + datetime.fromtimestamp(time.time()) + - datetime.fromtimestamp(float(event_time)) + ).days > 365: + event_time = "" except Exception: - event_time = '' + event_time = "" return event_time diff --git a/hubblestack/returners/splunk_osqueryd_return.py b/hubblestack/returners/splunk_osqueryd_return.py index 7882e9e8b..c924b8596 100644 --- a/hubblestack/returners/splunk_osqueryd_return.py +++ b/hubblestack/returners/splunk_osqueryd_return.py @@ -46,17 +46,18 @@ import copy from datetime import datetime from hubblestack.hec import http_event_collector, get_splunk_options, make_hec_args +from hubblestack.filter.filter_chain import FilterChain _MAX_CONTENT_BYTES = 100000 HTTP_EVENT_COLLECTOR_DEBUG = False log = logging.getLogger(__name__) - def returner(ret): """ Get osqueryd data and post it to Splunk """ + data = ret['return'] if not data: return @@ -121,6 +122,8 @@ def _generate_and_send_payload(hec, host_args, opts, event, query_results): except TypeError: pass + FilterChain.get_chain(__name__).filter(event) + payload = {'host': host_args['fqdn'], 'index': opts['index'], 'sourcetype': sourcetype, diff --git a/pkg/hubble.service b/pkg/hubble.service index fee99beeb..8d19f7111 100644 --- a/pkg/hubble.service +++ b/pkg/hubble.service @@ -10,6 +10,12 @@ ExecStart=/opt/hubble/hubble -d Restart=always RestartSec=300 MemoryAccounting=true +CPUQuota=5% +CPUWeight=10 +IOWeight=10 +MemorySwapMax=0 + + [Install] WantedBy=multi-user.target diff --git a/pytest.ini b/pytest.ini index e0e49c747..83c057be0 100644 --- a/pytest.ini +++ b/pytest.ini @@ -6,7 +6,5 @@ log_cli_level = CRITICAL log_cli_format = %(asctime)s %(name)17s %(levelname)5s %(message)s log_date_format = %H:%M:%S -pythonpath = . - filterwarnings = ignore::urllib3.exceptions.InsecureRequestWarning diff --git a/requirements.txt b/requirements.txt index b1e6904ba..51d2eb6d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,12 +27,12 @@ patch==1.* psutil pycryptodome pygit2 -pyinotify +pyinotify; platform_system == "Linux" pyinstaller pylint pyopenssl pyparsing -pystemd +pystemd; platform_system == "Linux" pytest pytest-cov pytest-html diff --git a/test-requirements.txt b/test-requirements.txt index 7139b8c3e..8264d7ae9 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -12,7 +12,7 @@ mock msgpack ntplib psutil -pyinotify +pyinotify; platform_system == "Linux" pytest pytest-cov pytest-html diff --git a/tests/unittests/resources/filter_chain.yaml b/tests/unittests/resources/filter_chain.yaml new file mode 100644 index 000000000..2ef19c0f5 --- /dev/null +++ b/tests/unittests/resources/filter_chain.yaml @@ -0,0 +1,6 @@ +filter: + default: + sequence_id: + label: "seq" + type: "hubblestack.filter.seq_id" + padding: 5 \ No newline at end of file diff --git a/tests/unittests/resources/filter_chain_ce_01.yaml b/tests/unittests/resources/filter_chain_ce_01.yaml new file mode 100644 index 000000000..030ec1e7b --- /dev/null +++ b/tests/unittests/resources/filter_chain_ce_01.yaml @@ -0,0 +1,4 @@ +bob: + seq: + label: seq + type: seq \ No newline at end of file diff --git a/tests/unittests/resources/filter_chain_ce_02.yaml b/tests/unittests/resources/filter_chain_ce_02.yaml new file mode 100644 index 000000000..7cf42698e --- /dev/null +++ b/tests/unittests/resources/filter_chain_ce_02.yaml @@ -0,0 +1,2 @@ +filter: + bob: \ No newline at end of file diff --git a/tests/unittests/resources/filter_chain_load.yaml b/tests/unittests/resources/filter_chain_load.yaml new file mode 100644 index 000000000..8aabd65ce --- /dev/null +++ b/tests/unittests/resources/filter_chain_load.yaml @@ -0,0 +1,5 @@ +filter: + default: + sequence_id: + label: "seq" + type: "hubblestack.filter.seq_id" \ No newline at end of file diff --git a/tests/unittests/test_filter_chain.py b/tests/unittests/test_filter_chain.py new file mode 100644 index 000000000..4cf6af0ab --- /dev/null +++ b/tests/unittests/test_filter_chain.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# coding: utf-8 + +import os +from hubblestack.exceptions import CommandExecutionError +import pytest +from unittest.mock import patch, mock_open + + +import hubblestack.filter.filter_chain as filter_chain +import hubblestack.modules.cp + + +filter_chain.__opts__ = {} +filter_chain.__context__ = {} + + +def test_load(): + + topfile = 'tests/unittests/resources/filter_chain_load.yaml' + + def cp_cache_file(_): + ''' pretend salt[cp.cache_file] ''' + return 'tests/unittests/resources/filter_chain_load.yaml' + + filter_chain.__mods__ = {'cp.cache_file': cp_cache_file} + + fc = filter_chain.FilterChain("bob", "default") + assert fc.config["sequence_id"] != None + assert fc.chain[0].name == "sequence_id" + assert fc.chain[0].get_label() == "seq" + + msg = fc.chain[0].filter({"bob": "alice"}) + assert msg["seq"] == "1" + + +def test_command_exception_one(): + def cp_cache_file(_): + ''' pretend salt[cp.cache_file] ''' + return 'tests/unittests/resources/filter_chain_ce_01.yaml' + + filter_chain.__mods__ = {'cp.cache_file': cp_cache_file} + try: + fc = filter_chain.FilterChain("bob", "default") + except CommandExecutionError as e: + ok = True + else: + assert 1 == 0 + +def test_command_exception_two(): + def cp_cache_file(_): + ''' pretend salt[cp.cache_file] ''' + return 'tests/unittests/resources/filter_chain_ce_02.yaml' + + filter_chain.__mods__ = {'cp.cache_file': cp_cache_file} + try: + fc = filter_chain.FilterChain("bob", "default") + except CommandExecutionError as e: + ok = True + else: + assert 1 == 0 + + +def test_pad(): + def cp_cache_file(_): + ''' pretend salt[cp.cache_file] ''' + return 'tests/unittests/resources/filter_chain.yaml' + + filter_chain.__mods__ = {'cp.cache_file': cp_cache_file} + fc = filter_chain.FilterChain("bob", "default") + assert fc.config["sequence_id"] != None + assert fc.chain[0].name == "sequence_id" + assert fc.chain[0].get_label() == "seq" + + msg = fc.chain[0].filter({"bob": "alice"}) + assert msg["seq"] == "00001" + + + + + + + + + + + + + diff --git a/tests/unittests/test_nebula_osquery.py b/tests/unittests/test_nebula_osquery.py index 294284344..6781cbaba 100644 --- a/tests/unittests/test_nebula_osquery.py +++ b/tests/unittests/test_nebula_osquery.py @@ -2,6 +2,9 @@ import json import pytest +from tests.support.unit import TestCase, skipIf +import hubblestack.utils.platform + __mods__ = None def dump_var_file(var, name='var', dumpster='tests/unittests/output'): @@ -26,6 +29,7 @@ def test_hubble_versions(self, __mods__): var = __mods__['nebula.hubble_versions']() assert ((var.get('hubble_versions')).get('result')) is True + @skipIf(hubblestack.utils.platform.is_darwin(), "System is not MacOs") def test_queries(self, __mods__, __grains__): query_group = 'day' query_file = 'salt://hubblestack_nebula_v2/hubblestack_nebula_queries.yaml' @@ -37,6 +41,7 @@ def test_queries(self, __mods__, __grains__): assert 'os_info' in os_info[0] assert 'data' in os_info[0]['os_info'] assert 'version' in os_info[0]['os_info']['data'][0] + ## TODO: Fix this for MacOs assert __grains__['os'] in os_info[0]['os_info']['data'][0]['name'] def test_queries_for_report_version_with_day(self, __mods__): @@ -54,6 +59,7 @@ def test_queries_for_report_version_with_day(self, __mods__): for m in 'pulsar nebula nova quasar'.split(): assert m in hubble_versions['data'][0] + @skipIf(hubblestack.utils.platform.is_darwin(), "System is not MacOs") def test_top(self, __mods__, __grains__): query_group = 'day' topfile = 'salt://top.nebula' @@ -67,4 +73,5 @@ def test_top(self, __mods__, __grains__): assert 'os_info' in os_info[0] assert 'data' in os_info[0]['os_info'] assert 'version' in os_info[0]['os_info']['data'][0] + ## TODO: Fix this for MacOs assert __grains__['os'] in os_info[0]['os_info']['data'][0]['name'] diff --git a/tests/unittests/test_pulsar.py b/tests/unittests/test_pulsar.py index 402c456c9..4cf7cf27f 100644 --- a/tests/unittests/test_pulsar.py +++ b/tests/unittests/test_pulsar.py @@ -5,9 +5,11 @@ import os import shutil import logging +from tests.support.unit import TestCase, skipIf from hubblestack.exceptions import CommandExecutionError import hubblestack.modules.pulsar as pulsar +import hubblestack.utils.platform log = logging.getLogger(__name__) @@ -23,6 +25,7 @@ def test_enqueue(self): var = pulsar._enqueue assert var != 0 + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_get_notifier(self): pulsar.__context__ = {} var = pulsar._get_notifier @@ -252,6 +255,7 @@ def boo(x): assert_len_listify_is(oogly_list, 4) assert_str_listify_is(oogly_list, [1,2,5,'one']) + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_add_watch(self, modality='add-watch'): options = {} kwargs = { self.atdir: options } @@ -304,6 +308,7 @@ def test_watch(self): def test_watch_new_files(self): self.test_add_watch(modality='watch_new_files') + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_recurse_without_watch_files(self): config1 = {self.atdir: { 'recurse': False }} config2 = {self.atdir: { 'recurse': True }} @@ -359,12 +364,14 @@ def config_make_files_watch_process_reconfig(self, config, reconfig=None, mk_fil set2 = set(self.watch_manager.watch_db) return set0, set1, set2 + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_pruning_watch_files_false(self): set0, set1, set2 = self.config_make_files_watch_process_reconfig({self.atdir:{}}, None, mk_files=2) assert set0 == set([self.atdir]) assert set1 == set([self.atdir]) assert set2 == set() + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_pruning_watch_new_files_then_false(self): config1 = {self.atdir: { 'watch_new_files': True }} config2 = {self.atdir: { 'watch_new_files': False }} @@ -375,6 +382,7 @@ def test_pruning_watch_new_files_then_false(self): assert set1 == set([self.atdir, fname1, fname2]) assert set2 == set([self.atdir]) + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_pruning_watch_files_then_false(self): config1 = {self.atdir: { 'watch_files': True }} config2 = {self.atdir: { 'watch_files': False }} @@ -385,6 +393,7 @@ def test_pruning_watch_files_then_false(self): assert set1 == set([self.atdir, self.atfile, fname1, fname2]) assert set2 == set([self.atdir]) + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_pruning_watch_new_files_then_nothing(self): config1 = {self.atdir: { 'watch_new_files': True }} set0, set1, set2 = self.config_make_files_watch_process_reconfig(config1, None, mk_files=2) @@ -394,6 +403,7 @@ def test_pruning_watch_new_files_then_nothing(self): assert set1 == set([self.atdir, fname1, fname2]) assert set2 == set() + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_pruning_watch_files_then_nothing(self): config1 = {self.atdir: { 'watch_files': True }} set0, set1, set2 = self.config_make_files_watch_process_reconfig(config1, None, mk_files=2) @@ -403,6 +413,7 @@ def test_pruning_watch_files_then_nothing(self): assert set1 == set([self.atdir, fname1, fname2, self.atfile]) assert set2 == set() + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_watch_files_events(self): config = {self.atdir: { 'watch_files': True }} self.reset(**config) @@ -448,6 +459,7 @@ def test_watch_files_events(self): assert set_ == set1 assert events_ == ['IN_MODIFY({})'.format(self.atfile)] + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_single_file_events(self): config = {self.atfile: dict()} self.reset(**config) @@ -493,6 +505,7 @@ def test_single_file_events(self): assert set_ == set1 assert events_ == ['IN_MODIFY({})'.format(self.atfile)] + @skipIf(not hubblestack.utils.platform.is_linux(), "System is not Linux") def test_fim_single_file(self): config = {self.atfile: {}} self.reset(**config)