Skip to content

Commit

Permalink
Merge pull request #44 from spectriclabs/dev
Browse files Browse the repository at this point in the history
Add tests for current filter types and refactor filter creation
  • Loading branch information
BenShoeSpectric authored Nov 28, 2023
2 parents 8933791 + bf00330 commit becfc4f
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 230 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: "Build, Test, and Publish Test Docker Image"

on:
push:
branches: [ master ]
branches: [ master, dev ]

env:
REGISTRY: ghcr.io
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ ENTRYPOINT [ "gunicorn", \
"--ciphers","ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384", \
"--chdir", "/opt/elastic_datashader", \
"-c", "/opt/elastic_datashader/gunicorn_config.py", \
"--max-requests", "40", \
"--max-requests", "400", \
"--workers", "30", \
"-k", "uvicorn.workers.UvicornWorker", \
"elastic_datashader:app" \
Expand Down
11 changes: 11 additions & 0 deletions elastic_datashader/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections import OrderedDict
from datetime import datetime, timedelta, timezone
from os import scandir
import os
from contextlib import suppress
from pathlib import Path
from shutil import rmtree
from time import time
Expand Down Expand Up @@ -162,6 +164,15 @@ def age_off_cache(cache_path: Path, idx_name: str, max_age: timedelta) -> None:
# set missing_ok=True in case another process deleted the same file
file_path.unlink(missing_ok=True)

# clear all empty dirs and dirs that contain empty dirs to prevent build up of param hash directories
remove_empty_dirs(cache_path/idx_name)

def remove_empty_dirs(path: Path):
for root, dirs, _ in os.walk(path, topdown=False):
for d in dirs:
with suppress(OSError):
os.rmdir(Path(root, d))

def get_idx_names(cache_path: Path) -> Iterable[str]:
for path in cache_path.glob("*"):
if path.is_dir():
Expand Down
6 changes: 0 additions & 6 deletions elastic_datashader/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class Config:
cache_cleanup_interval: timedelta
cache_path: Path
cache_timeout: timedelta
csrf_secret_key: str
datashader_headers: Dict[Any, Any]
elastic_hosts: str
ellipse_render_mode: str
Expand All @@ -29,8 +28,6 @@ class Config:
max_ellipses_per_tile: int
max_legend_items_per_tile: int
num_ellipse_points: int
proxy_host: Optional[str]
proxy_prefix: str
query_timeout_seconds: int
render_timeout: timedelta
tms_key: Optional[str]
Expand Down Expand Up @@ -93,7 +90,6 @@ def config_from_env(env) -> Config:
cache_cleanup_interval=timedelta(seconds=int(env.get("DATASHADER_CACHE_CLEANUP_INTERVAL", 5*60))),
cache_path=Path(env.get("DATASHADER_CACHE_DIRECTORY", "tms-cache")),
cache_timeout=timedelta(seconds=int(env.get("DATASHADER_CACHE_TIMEOUT", 60*60))),
csrf_secret_key=env.get("DATASHADER_CSRF_SECRET_KEY", "CSRFProtectionKey"),
datashader_headers=load_datashader_headers(env.get("DATASHADER_HEADER_FILE", "headers.yaml")),
elastic_hosts=env.get("DATASHADER_ELASTIC", "http://localhost:9200"),
ellipse_render_mode=env.get("DATASHADER_ELLIPSE_RENDER_MODE", "matrix"),
Expand All @@ -104,8 +100,6 @@ def config_from_env(env) -> Config:
max_ellipses_per_tile=int(env.get("DATASHADER_MAX_ELLIPSES_PER_TILE", 100_000)),
max_legend_items_per_tile=int(env.get("MAX_LEGEND_ITEMS_PER_TILE", 20)),
num_ellipse_points=int(env.get("DATASHADER_NUM_ELLIPSE_POINTS", 100)),
proxy_host=env.get("DATASHADER_PROXY_HOST", None),
proxy_prefix=env.get("DATASHADER_PROXY_PREFIX", ""),
query_timeout_seconds=int(env.get("DATASHADER_QUERY_TIMEOUT", 0)),
render_timeout=timedelta(seconds=int(env.get("DATASHADER_RENDER_TIMEOUT", 30))),
tms_key=env.get("DATASHADER_TMS_KEY", None),
Expand Down
154 changes: 13 additions & 141 deletions elastic_datashader/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,26 +158,6 @@ def convert_nm_to_ellipse_units(distance: float, units: str) -> float:
# NB. assume "majmin_m" if any others
return distance * 1852

def get_field_type(elastic_hosts: str, headers: Optional[str], params: Dict[str, Any], field: str, idx: str) -> str:
user = params.get("user")
x_opaque_id = params.get("x-opaque-id")
es = Elasticsearch(
elastic_hosts.split(","),
verify_certs=False,
timeout=900,
headers=get_es_headers(headers, user, x_opaque_id),
)
if idx.find("*:") != -1:
idx = idx[idx.find("*:")+2:] # when you query for mappings if it is cross cluster you don't get a mapping
mappings = es.indices.get_field_mapping(fields=field, index=idx)
# {'foot_prints': {'mappings': {'foot_print': {'full_name': 'foot_print', 'mapping': {'foot_print': {'type': 'geo_shape'}}}}}}
index = list(mappings.keys())[0] # if index is my_index* it comes back as my_index
field_parts = field.split(".")
try:
return mappings[index]['mappings'][field]['mapping'][field_parts[-1]]['type'] # handles 'geo_center' or a nested object {signal:{geo:{location:{}}}}
except AttributeError:
return mappings[index]['mappings'][field]['mapping'][field]['type'] # handles literal string with periods 'signal.geo.location'

def get_search_base(
elastic_hosts: str,
headers: Optional[str],
Expand Down Expand Up @@ -271,21 +251,6 @@ def get_search_base(

return base_s

def handle_range_or_exists_filters(filter_input: Dict[Any, Any]) -> Dict[str, Any]:
"""
`range` and `exists` filters can appear either directly under
`filter[]` or under `filter[].query` depending on the version
of Kibana, the former being the old way, so they need special
handling for backward compatibility.
"""
filter_type = filter_input.get("meta").get("type") # "range" or "exists"

# Handle old query structure for backward compatibility
if filter_input.get(filter_type) is not None:
return {filter_type: filter_input.get(filter_type)}

return filter_input.get("query")

def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
"""
Expand All @@ -309,78 +274,25 @@ def build_dsl_filter(filter_inputs) -> Optional[Dict[str, Any]]:
f.get("geo_shape") or
f.get("geo_distance")
)

# Handle spatial filters
if is_spatial_filter:
if f.get("geo_polygon"):
geo_polygon_dict = {"geo_polygon": f.get("geo_polygon")}
if f.get("meta").get("negate"):
filter_dict["must_not"].append(geo_polygon_dict)
else:
filter_dict["filter"].append(geo_polygon_dict)
elif f.get("geo_bounding_box"):
geo_bbox_dict = {"geo_bounding_box": f.get("geo_bounding_box")}
if f.get("meta").get("negate"):
filter_dict["must_not"].append(geo_bbox_dict)
else:
filter_dict["filter"].append(geo_bbox_dict)
elif f.get("geo_shape"):
geo_bbox_dict = {"geo_shape": f.get("geo_shape")}
if f.get("meta").get("negate"):
filter_dict["must_not"].append(geo_bbox_dict)
else:
filter_dict["filter"].append(geo_bbox_dict)
elif f.get("geo_distance"):
geo_bbox_dict = {"geo_distance": f.get("geo_distance")}
if f.get("meta").get("negate"):
filter_dict["must_not"].append(geo_bbox_dict)
else:
filter_dict["filter"].append(geo_bbox_dict)
elif f.get("query"):
if f.get("meta").get("negate"):
filter_dict["must_not"].append(f.get("query"))
else:
filter_dict["filter"].append(f.get("query"))
else:
raise ValueError("unsupported spatial_filter {}".format(f)) # pylint: disable=C0209

# Handle phrase matching
elif f.get("meta").get("type") in ("phrase", "phrases", "bool"):
if f.get("query", None):
if f.get("meta").get("negate"):
filter_dict["must_not"].append(f.get("query"))
else:
filter_dict["filter"].append(f.get("query"))

elif f.get("meta").get("type") in ("range", "exists"):
if f.get("meta").get("negate"):
filter_dict["must_not"].append(handle_range_or_exists_filters(f))
else:
filter_dict["filter"].append(handle_range_or_exists_filters(f))

elif f.get("meta", {}).get("type") == "custom" and f.get("meta", {}).get("key") is not None:
filter_key = f.get("meta", {}).get("key")
if f.get("meta", {}).get("negate"):
if filter_key == "query":
filt_index = list(f.get(filter_key))[0]
filter_dict["must_not"].append({filt_index: f.get(filter_key).get(filt_index)})
else:
filter_dict["must_not"].append({filter_key: f.get(filter_key)})
else:
if filter_key == "query":
filt_index = list(f.get(filter_key))[0]
filter_dict["must_not"].append({filt_index: f.get(filter_key).get(filt_index)})
else:
filter_dict["filter"].append({filter_key: f.get(filter_key)})

else:
# Here we handle filters that don't send a type (this happens when controls send filters)
# example filters[{"meta":{"index":"11503c28-7d88-4f9a-946b-2997a5ea64cf","key":"name"},"query":{"match_phrase":{"name":"word_5"}}}]
if f.get("meta", {}).get("negate"):
filter_dict["must_not"].append(f.get("query"))
if not is_spatial_filter:
filt_type = f.get("meta").get("type")
if f.get("meta").get("negate"):
filter_dict["must_not"].append({filt_type: f.get(filt_type)})
else:
filter_dict["filter"].append({filt_type: f.get(filt_type)})
else:
filter_dict["filter"].append(f.get("query"))
# raise ValueError("unsupported filter type {}".format(f.get("meta").get("type"))) # pylint: disable=C0209

for geo_type in ["geo_polygon", "geo_bounding_box", "geo_shape", "geo_distance"]:
if f.get(geo_type, None):
if f.get("meta").get("negate"):
filter_dict["must_not"].append({geo_type: f.get(geo_type)})
else:
filter_dict["filter"].append({geo_type: f.get(geo_type)})
logger.info("Filter output %s", filter_dict)
return filter_dict

Expand Down Expand Up @@ -449,32 +361,6 @@ def parse_duration_interval(interval):
kwargs[key] = int(interval[0:len(interval)-1])
return relativedelta(**kwargs)

def convert(response, category_formatter=str):
"""
:param response:
:return:
"""
if hasattr(response.aggregations, "categories"):
for category in response.aggregations.categories:
for bucket in category.grids:
x, y = lnglat_to_meters(
bucket.centroid.location.lon, bucket.centroid.location.lat
)
yield {
"lon": bucket.centroid.location.lon,
"lat": bucket.centroid.location.lat,
"x": x,
"y": y,
"c": bucket.centroid.count,
"t": category_formatter(category.key),
}
else:
for bucket in response.aggregations.grids:
lon = bucket.centroid.location.lon
lat = bucket.centroid.location.lat
x, y = lnglat_to_meters(lon, lat)
yield {"lon": lon, "lat": lat, "x": x, "y": y, "c": bucket.centroid.count}

def convert_composite(response, categorical, filter_buckets, histogram_interval, category_type, category_format):
if categorical and filter_buckets is False:
Expand Down Expand Up @@ -586,20 +472,6 @@ def get_nested_field_from_hit(hit, field_parts: List[str], default=None):

raise ValueError("field must be provided")

def chunk_iter(iterable, chunk_size):
chunks = [None] * chunk_size
i = -1
for i, v in enumerate(iterable):
idx = i % chunk_size
if idx == 0 and i > 0:
i = -1
yield (True, chunks)
chunks[idx] = v

if i >= 0:
last_written_idx = i % chunk_size
yield (False, chunks[0:last_written_idx+1])

def bucket_noop(bucket, search):
# pylint: disable=unused-argument
return bucket
Expand Down
8 changes: 3 additions & 5 deletions elastic_datashader/parameters.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime, timedelta, timezone
from hashlib import sha256
from json import loads
from socket import gethostname
from time import sleep
from typing import Any, Dict, Optional, Tuple
from urllib.parse import unquote
Expand Down Expand Up @@ -351,7 +350,6 @@ def generate_global_params(headers, params, idx):
if category_type == "number":
bounds_s.aggs.metric("field_stats", "stats", field=category_field)

# field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
field_type = params["geofield_type"] # CCS you cannot get mappings so we needed to push the field type from the client side
# Execute and process search
if len(list(bounds_s.aggs)) > 0 and field_type != "geo_shape":
Expand Down Expand Up @@ -470,7 +468,7 @@ def generate_global_params(headers, params, idx):


def merge_generated_parameters(headers, params, idx, param_hash):
layer_id = f"{param_hash}_{gethostname()}"
layer_id = f"{param_hash}_{config.hostname}"
es = Elasticsearch(
config.elastic_hosts.split(","),
verify_certs=False,
Expand All @@ -488,7 +486,7 @@ def merge_generated_parameters(headers, params, idx, param_hash):
try:
doc = Document(
_id=layer_id,
creating_host=gethostname(),
creating_host=config.hostname,
creating_pid=os.getpid(),
creating_timestamp=datetime.now(timezone.utc),
generated_params=None,
Expand Down Expand Up @@ -533,7 +531,7 @@ def merge_generated_parameters(headers, params, idx, param_hash):
generated_params = {
"complete": False,
"generation_start_time": datetime.now(timezone.utc),
"generating_host": gethostname(),
"generating_host": config.hostname,
"generating_pid": os.getpid(),
}

Expand Down
11 changes: 7 additions & 4 deletions elastic_datashader/routers/tms.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from datetime import datetime, timezone
from os import getpid
from socket import gethostname
from typing import Optional
import time
import uuid
Expand All @@ -25,7 +24,7 @@
)
from ..config import config
from ..drawing import generate_x_tile
from ..elastic import get_es_headers
from ..elastic import get_es_headers, get_search_base
from ..logger import logger
from ..parameters import extract_parameters, merge_generated_parameters
from ..tilegen import (
Expand Down Expand Up @@ -111,7 +110,7 @@ def create_datashader_tiles_entry(es, **kwargs) -> None:
'''
doc_info = {
**kwargs,
'host': gethostname(),
'host': config.hostname,
'pid': getpid(),
'timestamp': datetime.now(timezone.utc),
}
Expand Down Expand Up @@ -157,7 +156,7 @@ def cached_response(es, idx, x, y, z, params, parameter_hash) -> Optional[Respon
except NotFoundError:
logger.warning("Unable to find cached tile entry in .datashader_tiles")

return make_image_response(img, params.get("user") or "", parameter_hash, 60)
return make_image_response(img, params.get("user") or "", parameter_hash, config.cache_timeout.seconds)

logger.debug("Did not find image in cache: %s", tile_name(idx, x, y, z, parameter_hash))
return None
Expand Down Expand Up @@ -294,6 +293,10 @@ async def fetch_or_render_tile(already_waited: int, idx: str, x: int, y: int, z:
# Get hash and parameters
try:
parameter_hash, params = extract_parameters(request.headers, request.query_params)
# try to build the dsl object bad filters cause exceptions that are then retried.
# underlying elasticsearch_dsl doesn't support the elasticsearch 8 api yet so this causes requests to thrash
# If the filters are bad or elasticsearch_dsl cannot build the request will never be completed so serve X tile
get_search_base(config.elastic_hosts, request.headers, params, idx)
except Exception as ex: # pylint: disable=W0703
logger.exception("Error while extracting parameters")
params = {"user": request.headers.get("es-security-runas-user", None)}
Expand Down
1 change: 0 additions & 1 deletion elastic_datashader/tilegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1099,7 +1099,6 @@ def generate_tile(idx, x, y, z, headers, params, tile_width_px=256, tile_height_

# the composite needs one bin for 'after_key'
composite_agg_size = int(max_bins / inner_agg_size) - 1
# field_type = get_field_type(config.elastic_hosts, headers, params, geopoint_field, idx)
field_type = params["geofield_type"] # CCS you cannot get mappings so we needed to push the field type from the client side
partial_data = False # TODO can we get partial data?
span = None
Expand Down
6 changes: 5 additions & 1 deletion tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_clear_hash_cache(tmp_path):


def test_age_off_cache(tmp_path):
xdir = tmp_path / "fooindex/somehash/3/1"
xdir = tmp_path / "fooindex/some_new_hash/3/1"
xdir.mkdir(parents=True)

yfile = xdir / "2.png"
Expand All @@ -90,6 +90,10 @@ def test_age_off_cache(tmp_path):

assert not yfile.exists()
assert yfile_after.exists()
sleep(2)
# clear again should remove all files and empty folders
cache.age_off_cache(tmp_path, "fooindex", timedelta(seconds=1))
assert not xdir.exists()


def test_build_layer_info(tmp_path):
Expand Down
Loading

0 comments on commit becfc4f

Please sign in to comment.