Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
Merge pull request #37 from Kinto/8-collection-indexschema
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed May 26, 2017
2 parents 1eb4eb8 + 7963ed2 commit d529ea7
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Changelog
- Delete indices when buckets and collections are deleted (fixes #21)
- Support quick search from querystring (fixes #34)
- Return details about invalid queries in request body (fixes #23)
- Support defining mapping from the ``index:schema`` property in the collection metadata (ref #8)

**Bug fixes**

Expand Down
32 changes: 32 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,38 @@ Or an advanced search using request body:
}
Custom index mapping
--------------------

By default, ElasticSearch infers the data types from the indexed records.

But it's possible to define the index mappings (ie. schema) from the collection metadata,
in the ``index:schema`` property:

.. code-block:: bash
$ echo '{
"data": {
"index:schema": {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}
}
}' | http PATCH "http://localhost:8888/v1/buckets/blog/collections/builds" --auth token:admin-token --verbose
Refer to ElasticSearch official documentation for more information about mappings.

See also, `domapping <https://github.com/inveniosoftware/domapping/>`_ a CLI tool to convert JSON schemas to ElasticSearch mappings.


Running the tests
=================

Expand Down
2 changes: 2 additions & 0 deletions kinto_elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def includeme(config):
config.add_subscriber(listener.on_server_flushed, ServerFlushed)
config.add_subscriber(listener.on_collection_created, AfterResourceChanged,
for_resources=("collection",), for_actions=("create",))
config.add_subscriber(listener.on_collection_updated, AfterResourceChanged,
for_resources=("collection",), for_actions=("update",))
config.add_subscriber(listener.on_collection_deleted, AfterResourceChanged,
for_resources=("collection",), for_actions=("delete",))
config.add_subscriber(listener.on_bucket_deleted, AfterResourceChanged,
Expand Down
18 changes: 16 additions & 2 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,25 @@ def __init__(self, hosts, prefix="kinto", force_refresh=False):
def indexname(self, bucket_id, collection_id):
return "{}-{}-{}".format(self.prefix, bucket_id, collection_id)

def create_index(self, bucket_id, collection_id):
def create_index(self, bucket_id, collection_id, schema=None):
indexname = self.indexname(bucket_id, collection_id)
# Only if necessary.
if not self.client.indices.exists(index=indexname):
self.client.indices.create(index=indexname)
if schema:
body = {"mappings": {indexname: schema}}
else:
body = None
self.client.indices.create(index=indexname, body=body)
else:
self.update_index(bucket_id, collection_id, schema)

def update_index(self, bucket_id, collection_id, schema=None):
indexname = self.indexname(bucket_id, collection_id)
if schema is None:
schema = {"properties": {}}
self.client.indices.put_mapping(index=indexname,
doc_type=indexname,
body=schema)

def delete_index(self, bucket_id, collection_id=None):
if collection_id is None:
Expand Down
17 changes: 16 additions & 1 deletion kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,22 @@ def on_collection_created(event):
bucket_id = event.payload["bucket_id"]
for created in event.impacted_records:
collection_id = created["new"]["id"]
indexer.create_index(bucket_id, collection_id)
schema = created["new"].get("index:schema")
indexer.create_index(bucket_id, collection_id, schema=schema)


def on_collection_updated(event):
indexer = event.request.registry.indexer
bucket_id = event.payload["bucket_id"]
for updated in event.impacted_records:
collection_id = updated["new"]["id"]
old_schema = updated["old"].get("index:schema")
new_schema = updated["new"].get("index:schema")
# Create if there was no index before.
if old_schema is None and new_schema is not None:
indexer.create_index(bucket_id, collection_id, schema=new_schema)
elif old_schema != new_schema:
indexer.update_index(bucket_id, collection_id, schema=new_schema)


def on_collection_deleted(event):
Expand Down
105 changes: 105 additions & 0 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import mock
import time
import unittest
Expand Down Expand Up @@ -197,3 +198,107 @@ def test_search_is_not_allowed_if_only_read_on_certain_records(self):
headers=headers)

self.app.post("/buckets/bid/collections/cid/search", status=403, headers=headers)


class SchemaSupport(BaseWebTest, unittest.TestCase):

schema = {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}

def setUp(self):
self.app.put("/buckets/bid", headers=self.headers)
body = {"data": {"index:schema": self.schema}}
self.app.put_json("/buckets/bid/collections/cid", body, headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "abc", "date": "2017-05-24"}}},
headers=self.headers)
self.app.post_json("/buckets/bid/collections/cid/records",
{"data": {"build": {"id": "efg", "date": "2017-02-01"}}},
headers=self.headers)

def get_mapping(self, bucket_id, collection_id):
indexer = self.app.app.registry.indexer
indexname = indexer.indexname(bucket_id, collection_id)
index_mapping = indexer.client.indices.get_mapping(indexname)
mappings = index_mapping[indexname]["mappings"]
return mappings.get(indexname, {})

def test_index_has_mapping_if_collection_has_schema(self):
mapping = self.get_mapping("bid", "cid")
assert sorted(mapping["properties"].keys()) == ["build", "id", "last_modified"]

def test_mapping_is_updated_on_collection_update(self):
new_schema = copy.deepcopy(self.schema)
new_schema["properties"]["build"]["properties"]["id"]["ignore_above"] = 12

self.app.patch_json("/buckets/bid/collections/cid",
{"data": {"index:schema": new_schema}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12

def test_mapping_is_created_when_index_metadata_is_added(self):
self.app.put("/buckets/bid/collections/cid2", headers=self.headers)
mapping = self.get_mapping("bid", "cid2")
assert "build" not in mapping.get("properties", {})

self.app.patch_json("/buckets/bid/collections/cid2",
{"data": {"index:schema": self.schema}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid2")
assert "build" in mapping["properties"]

def test_mapping_is_preserved_when_index_metadata_is_removed(self):
self.app.put_json("/buckets/bid/collections/cid",
{"data": {}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert "build" in mapping["properties"]

def test_can_search_for_subproperties(self):
body = {
"query": {
"bool" : {
"must" : {
"term" : { "build.id" : "abc" }
}
}
}
}
resp = self.app.post_json("/buckets/bid/collections/cid/search", body,
headers=self.headers)
result = resp.json
assert len(result["hits"]["hits"]) == 1
assert result["hits"]["hits"][0]["_source"]["build"]["id"] == "abc"

def test_can_aggregate_values(self):
body = {
"aggs" : {
"build_dates" : {
"terms": {
"field" : "build.id",
"size" : 1000
}
}
}
}
resp = self.app.post_json("/buckets/bid/collections/cid/search", body,
headers=self.headers)
result = resp.json
assert result["aggregations"]["build_dates"]["buckets"] == [
{"key": "abc", "doc_count": 1},
{"key": "efg", "doc_count": 1},
]

0 comments on commit d529ea7

Please sign in to comment.