diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4d85a30..d231b84 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,7 @@ Changelog - Delete indices when buckets and collections are deleted (fixes #21) - Support quick search from querystring (fixes #34) - Return details about invalid queries in request body (fixes #23) +- Support defining mapping from the ``index:schema`` property in the collection metadata (ref #8) **Bug fixes** diff --git a/README.rst b/README.rst index c3909f9..591ac98 100644 --- a/README.rst +++ b/README.rst @@ -126,6 +126,38 @@ Or an advanced search using request body: } +Custom index mapping +-------------------- + +By default, ElasticSearch infers the data types from the indexed records. + +But it's possible to define the index mappings (ie. schema) from the collection metadata, +in the ``index:schema`` property: + +.. code-block:: bash + + $ echo '{ + "data": { + "index:schema": { + "properties": { + "id": {"type": "keyword"}, + "last_modified": {"type": "long"}, + "build": { + "properties": { + "date": {"type": "date", "format": "strict_date"}, + "id": {"type": "keyword"} + } + } + } + } + } + }' | http PATCH "http://localhost:8888/v1/buckets/blog/collections/builds" --auth token:admin-token --verbose + +Refer to ElasticSearch official documentation for more information about mappings. + +See also, `domapping `_ a CLI tool to convert JSON schemas to ElasticSearch mappings. + + Running the tests ================= diff --git a/kinto_elasticsearch/__init__.py b/kinto_elasticsearch/__init__.py index 2eda70f..f5c9f2f 100644 --- a/kinto_elasticsearch/__init__.py +++ b/kinto_elasticsearch/__init__.py @@ -26,6 +26,8 @@ def includeme(config): config.add_subscriber(listener.on_server_flushed, ServerFlushed) config.add_subscriber(listener.on_collection_created, AfterResourceChanged, for_resources=("collection",), for_actions=("create",)) + config.add_subscriber(listener.on_collection_updated, AfterResourceChanged, + for_resources=("collection",), for_actions=("update",)) config.add_subscriber(listener.on_collection_deleted, AfterResourceChanged, for_resources=("collection",), for_actions=("delete",)) config.add_subscriber(listener.on_bucket_deleted, AfterResourceChanged, diff --git a/kinto_elasticsearch/indexer.py b/kinto_elasticsearch/indexer.py index 646be7c..1291579 100644 --- a/kinto_elasticsearch/indexer.py +++ b/kinto_elasticsearch/indexer.py @@ -18,11 +18,25 @@ def __init__(self, hosts, prefix="kinto", force_refresh=False): def indexname(self, bucket_id, collection_id): return "{}-{}-{}".format(self.prefix, bucket_id, collection_id) - def create_index(self, bucket_id, collection_id): + def create_index(self, bucket_id, collection_id, schema=None): indexname = self.indexname(bucket_id, collection_id) # Only if necessary. if not self.client.indices.exists(index=indexname): - self.client.indices.create(index=indexname) + if schema: + body = {"mappings": {indexname: schema}} + else: + body = None + self.client.indices.create(index=indexname, body=body) + else: + self.update_index(bucket_id, collection_id, schema) + + def update_index(self, bucket_id, collection_id, schema=None): + indexname = self.indexname(bucket_id, collection_id) + if schema is None: + schema = {"properties": {}} + self.client.indices.put_mapping(index=indexname, + doc_type=indexname, + body=schema) def delete_index(self, bucket_id, collection_id=None): if collection_id is None: diff --git a/kinto_elasticsearch/listener.py b/kinto_elasticsearch/listener.py index 06929e3..e9b8265 100644 --- a/kinto_elasticsearch/listener.py +++ b/kinto_elasticsearch/listener.py @@ -12,7 +12,22 @@ def on_collection_created(event): bucket_id = event.payload["bucket_id"] for created in event.impacted_records: collection_id = created["new"]["id"] - indexer.create_index(bucket_id, collection_id) + schema = created["new"].get("index:schema") + indexer.create_index(bucket_id, collection_id, schema=schema) + + +def on_collection_updated(event): + indexer = event.request.registry.indexer + bucket_id = event.payload["bucket_id"] + for updated in event.impacted_records: + collection_id = updated["new"]["id"] + old_schema = updated["old"].get("index:schema") + new_schema = updated["new"].get("index:schema") + # Create if there was no index before. + if old_schema is None and new_schema is not None: + indexer.create_index(bucket_id, collection_id, schema=new_schema) + elif old_schema != new_schema: + indexer.update_index(bucket_id, collection_id, schema=new_schema) def on_collection_deleted(event): diff --git a/tests/test_elasticsearch.py b/tests/test_elasticsearch.py index 2139dd6..90ad8d4 100644 --- a/tests/test_elasticsearch.py +++ b/tests/test_elasticsearch.py @@ -1,3 +1,4 @@ +import copy import mock import time import unittest @@ -197,3 +198,107 @@ def test_search_is_not_allowed_if_only_read_on_certain_records(self): headers=headers) self.app.post("/buckets/bid/collections/cid/search", status=403, headers=headers) + + +class SchemaSupport(BaseWebTest, unittest.TestCase): + + schema = { + "properties": { + "id": {"type": "keyword"}, + "last_modified": {"type": "long"}, + "build": { + "properties": { + "date": {"type": "date", "format": "strict_date"}, + "id": {"type": "keyword"} + } + } + } + } + + def setUp(self): + self.app.put("/buckets/bid", headers=self.headers) + body = {"data": {"index:schema": self.schema}} + self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers) + self.app.post_json("/buckets/bid/collections/cid/records", + {"data": {"build": {"id": "abc", "date": "2017-05-24"}}}, + headers=self.headers) + self.app.post_json("/buckets/bid/collections/cid/records", + {"data": {"build": {"id": "efg", "date": "2017-02-01"}}}, + headers=self.headers) + + def get_mapping(self, bucket_id, collection_id): + indexer = self.app.app.registry.indexer + indexname = indexer.indexname(bucket_id, collection_id) + index_mapping = indexer.client.indices.get_mapping(indexname) + mappings = index_mapping[indexname]["mappings"] + return mappings.get(indexname, {}) + + def test_index_has_mapping_if_collection_has_schema(self): + mapping = self.get_mapping("bid", "cid") + assert sorted(mapping["properties"].keys()) == ["build", "id", "last_modified"] + + def test_mapping_is_updated_on_collection_update(self): + new_schema = copy.deepcopy(self.schema) + new_schema["properties"]["build"]["properties"]["id"]["ignore_above"] = 12 + + self.app.patch_json("/buckets/bid/collections/cid", + {"data": {"index:schema": new_schema}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid") + assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12 + + def test_mapping_is_created_when_index_metadata_is_added(self): + self.app.put("/buckets/bid/collections/cid2", headers=self.headers) + mapping = self.get_mapping("bid", "cid2") + assert "build" not in mapping.get("properties", {}) + + self.app.patch_json("/buckets/bid/collections/cid2", + {"data": {"index:schema": self.schema}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid2") + assert "build" in mapping["properties"] + + def test_mapping_is_preserved_when_index_metadata_is_removed(self): + self.app.put_json("/buckets/bid/collections/cid", + {"data": {}}, + headers=self.headers) + + mapping = self.get_mapping("bid", "cid") + assert "build" in mapping["properties"] + + def test_can_search_for_subproperties(self): + body = { + "query": { + "bool" : { + "must" : { + "term" : { "build.id" : "abc" } + } + } + } + } + resp = self.app.post_json("/buckets/bid/collections/cid/search", body, + headers=self.headers) + result = resp.json + assert len(result["hits"]["hits"]) == 1 + assert result["hits"]["hits"][0]["_source"]["build"]["id"] == "abc" + + def test_can_aggregate_values(self): + body = { + "aggs" : { + "build_dates" : { + "terms": { + "field" : "build.id", + "size" : 1000 + } + } + } + } + resp = self.app.post_json("/buckets/bid/collections/cid/search", body, + headers=self.headers) + result = resp.json + assert result["aggregations"]["build_dates"]["buckets"] == [ + {"key": "abc", "doc_count": 1}, + {"key": "efg", "doc_count": 1}, + ]