Skip to content
This repository has been archived by the owner on Aug 5, 2024. It is now read-only.

Commit

Permalink
Manage collection metadata update
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem committed May 26, 2017
1 parent cdc6acf commit 1f70534
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 15 deletions.
2 changes: 2 additions & 0 deletions kinto_elasticsearch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def includeme(config):
config.add_subscriber(listener.on_server_flushed, ServerFlushed)
config.add_subscriber(listener.on_collection_created, AfterResourceChanged,
for_resources=("collection",), for_actions=("create",))
config.add_subscriber(listener.on_collection_updated, AfterResourceChanged,
for_resources=("collection",), for_actions=("update",))
config.add_subscriber(listener.on_collection_deleted, AfterResourceChanged,
for_resources=("collection",), for_actions=("delete",))
config.add_subscriber(listener.on_bucket_deleted, AfterResourceChanged,
Expand Down
8 changes: 8 additions & 0 deletions kinto_elasticsearch/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ def create_index(self, bucket_id, collection_id, schema=None):
body = None
self.client.indices.create(index=indexname, body=body)

def update_index(self, bucket_id, collection_id, schema=None):
indexname = self.indexname(bucket_id, collection_id)
if schema is None:
schema = {"properties": {}}
self.client.indices.put_mapping(index=indexname,
doc_type=indexname,
body=schema)

def delete_index(self, bucket_id, collection_id=None):
if collection_id is None:
collection_id = "*"
Expand Down
14 changes: 14 additions & 0 deletions kinto_elasticsearch/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ def on_collection_created(event):
indexer.create_index(bucket_id, collection_id, schema=schema)


def on_collection_updated(event):
indexer = event.request.registry.indexer
bucket_id = event.payload["bucket_id"]
for updated in event.impacted_records:
collection_id = updated["new"]["id"]
old_schema = updated["old"].get("index:schema")
new_schema = updated["new"].get("index:schema")
# Create if there was no index before.
if old_schema is None and new_schema is not None:
indexer.create_index(bucket_id, collection_id, schema=new_schema)
elif old_schema != new_schema:
indexer.update_index(bucket_id, collection_id, schema=new_schema)


def on_collection_deleted(event):
indexer = event.request.registry.indexer
bucket_id = event.payload["bucket_id"]
Expand Down
36 changes: 21 additions & 15 deletions tests/test_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import mock
import time
import unittest
Expand Down Expand Up @@ -198,18 +199,26 @@ def get_mapping(self, bucket_id, collection_id):

def test_index_has_mapping_if_collection_has_schema(self):
mapping = self.get_mapping("bid", "cid")
assert mapping == {
"properties": {
"id": {"type": "keyword"},
"last_modified": {"type": "long"},
"build": {
"properties": {
"date": {"type": "date", "format": "strict_date"},
"id": {"type": "keyword"}
}
}
}
}
assert sorted(mapping["properties"].keys()) == ["build", "id", "last_modified"]

def test_mapping_is_updated_on_collection_update(self):
new_schema = copy.deepcopy(self.schema)
new_schema["properties"]["build"]["properties"]["id"]["ignore_above"] = 12

self.app.patch_json("/buckets/bid/collections/cid",
{"data": {"index:schema": new_schema}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert mapping["properties"]["build"]["properties"]["id"]["ignore_above"] == 12

def test_mapping_is_preserved_when_index_metadata_is_removed(self):
self.app.put_json("/buckets/bid/collections/cid",
{"data": {}},
headers=self.headers)

mapping = self.get_mapping("bid", "cid")
assert "build" in mapping["properties"]

def test_can_search_for_subproperties(self):
body = {
Expand Down Expand Up @@ -245,6 +254,3 @@ def test_can_aggregate_values(self):
{"key": "abc", "doc_count": 1},
{"key": "efg", "doc_count": 1},
]

def test_mapping_is_updated_on_collection_update(self):
pass

0 comments on commit 1f70534

Please sign in to comment.