Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Define index mappings from collection metadata #37

Merged
merged 5 commits into from
May 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog
- Add heartbeat (fixes #3)
- Delete indices when buckets and collections are deleted (fixes #21)
- Support quick search from querystring (fixes #34)
- Support defining mapping from the ``index:schema`` property in the collection metadata (ref #8)

**Bug fixes**

Expand Down
32 changes: 32 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,38 @@ Or an advanced search using request body:
}


Custom index mapping
--------------------

By default, ElasticSearch infers the data types from the indexed records.

But it's possible to define the index mappings (ie. schema) from the collection metadata,
in the ``index:schema`` property:

.. code-block:: bash

$ echo '{
"data": {
"index:schema": {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}
}
}' | http PATCH "http://localhost:8888/v1/buckets/blog/collections/builds" --auth token:admin-token --verbose

Refer to ElasticSearch official documentation for more information about mappings.

See also, `domapping <https://github.com/inveniosoftware/domapping/>`_ a CLI tool to convert JSON schemas to ElasticSearch mappings.


Running the tests
=================

Expand Down
2 changes: 2 additions & 0 deletions kinto_elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def includeme(config):
config.add_subscriber(listener.on_server_flushed, ServerFlushed)
config.add_subscriber(listener.on_collection_created, AfterResourceChanged,
for_resources=("collection",), for_actions=("create",))
config.add_subscriber(listener.on_collection_updated, AfterResourceChanged,
for_resources=("collection",), for_actions=("update",))
config.add_subscriber(listener.on_collection_deleted, AfterResourceChanged,
for_resources=("collection",), for_actions=("delete",))
config.add_subscriber(listener.on_bucket_deleted, AfterResourceChanged,
Expand Down
18 changes: 16 additions & 2 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,25 @@ def __init__(self, hosts, prefix="kinto", force_refresh=False):
def indexname(self, bucket_id, collection_id):
return "{}-{}-{}".format(self.prefix, bucket_id, collection_id)

def create_index(self, bucket_id, collection_id):
def create_index(self, bucket_id, collection_id, schema=None):
indexname = self.indexname(bucket_id, collection_id)
# Only if necessary.
if not self.client.indices.exists(index=indexname):
self.client.indices.create(index=indexname)
if schema:
body = {"mappings": {indexname: schema}}
else:
body = None
self.client.indices.create(index=indexname, body=body)
else:
self.update_index(bucket_id, collection_id, schema)

def update_index(self, bucket_id, collection_id, schema=None):
indexname = self.indexname(bucket_id, collection_id)
if schema is None:
schema = {"properties": {}}
self.client.indices.put_mapping(index=indexname,
doc_type=indexname,
body=schema)

def delete_index(self, bucket_id, collection_id=None):
if collection_id is None:
Expand Down
17 changes: 16 additions & 1 deletion kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,22 @@ def on_collection_created(event):
bucket_id = event.payload["bucket_id"]
for created in event.impacted_records:
collection_id = created["new"]["id"]
indexer.create_index(bucket_id, collection_id)
schema = created["new"].get("index:schema")
indexer.create_index(bucket_id, collection_id, schema=schema)


def on_collection_updated(event):
indexer = event.request.registry.indexer
bucket_id = event.payload["bucket_id"]
for updated in event.impacted_records:
collection_id = updated["new"]["id"]
old_schema = updated["old"].get("index:schema")
new_schema = updated["new"].get("index:schema")
# Create if there was no index before.
if old_schema is None and new_schema is not None:
indexer.create_index(bucket_id, collection_id, schema=new_schema)
elif old_schema != new_schema:
indexer.update_index(bucket_id, collection_id, schema=new_schema)


def on_collection_deleted(event):
Expand Down
105 changes: 105 additions & 0 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import mock
import time
import unittest
Expand Down Expand Up @@ -183,3 +184,107 @@ def test_search_is_not_allowed_if_only_read_on_certain_records(self):
headers=headers)

self.app.post("/buckets/bid/collections/cid/search", status=403, headers=headers)


class SchemaSupport(BaseWebTest, unittest.TestCase):

schema = {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}

def setUp(self):
self.app.put("/buckets/bid", headers=self.headers)
body = {"data": {"index:schema": self.schema}}
self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "abc", "date": "2017-05-24"}}},
headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "efg", "date": "2017-02-01"}}},
headers=self.headers)

def get_mapping(self, bucket_id, collection_id):
indexer = self.app.app.registry.indexer
indexname = indexer.indexname(bucket_id, collection_id)
index_mapping = indexer.client.indices.get_mapping(indexname)
mappings = index_mapping[indexname]["mappings"]
return mappings.get(indexname, {})

def test_index_has_mapping_if_collection_has_schema(self):
mapping = self.get_mapping("bid", "cid")
assert sorted(mapping["properties"].keys()) == ["build", "id", "last_modified"]

def test_mapping_is_updated_on_collection_update(self):
new_schema = copy.deepcopy(self.schema)
new_schema["properties"]["build"]["properties"]["id"]["ignore_above"] = 12

self.app.patch_json("/buckets/bid/collections/cid",
{"data": {"index:schema": new_schema}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12

def test_mapping_is_created_when_index_metadata_is_added(self):
self.app.put("/buckets/bid/collections/cid2", headers=self.headers)
mapping = self.get_mapping("bid", "cid2")
assert "build" not in mapping.get("properties", {})

self.app.patch_json("/buckets/bid/collections/cid2",
{"data": {"index:schema": self.schema}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid2")
assert "build" in mapping["properties"]

def test_mapping_is_preserved_when_index_metadata_is_removed(self):
self.app.put_json("/buckets/bid/collections/cid",
{"data": {}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert "build" in mapping["properties"]

def test_can_search_for_subproperties(self):
body = {
"query": {
"bool" : {
"must" : {
"term" : { "build.id" : "abc" }
}
}
}
}
resp = self.app.post_json("/buckets/bid/collections/cid/search", body,
headers=self.headers)
result = resp.json
assert len(result["hits"]["hits"]) == 1
assert result["hits"]["hits"][0]["_source"]["build"]["id"] == "abc"

def test_can_aggregate_values(self):
body = {
"aggs" : {
"build_dates" : {
"terms": {
"field" : "build.id",
"size" : 1000
}
}
}
}
resp = self.app.post_json("/buckets/bid/collections/cid/search", body,
headers=self.headers)
result = resp.json
assert result["aggregations"]["build_dates"]["buckets"] == [
{"key": "abc", "doc_count": 1},
{"key": "efg", "doc_count": 1},
]