Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Mongo db backend updated #17

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions crawlfrontier/contrib/backends/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ def request_error(self, request, error):
def _get_or_create_request(self, request):
fingerprint = request.meta['fingerprint']
if fingerprint not in self.requests:
new_request = request.copy()
new_request.meta['created_at'] = datetime.datetime.utcnow()
new_request.meta['depth'] = 0
new_request = self._create_request(request)
self.requests[fingerprint] = new_request
self.manager.logger.backend.debug('Creating request %s' % new_request)
return new_request, True
Expand All @@ -59,6 +57,12 @@ def _get_or_create_request(self, request):
self.manager.logger.backend.debug('Request exists %s' % request)
return page, False

def _create_request(self, request):
new_request = request.copy()
new_request.meta['created_at'] = datetime.datetime.utcnow()
new_request.meta['depth'] = 0
return new_request

def _compare_pages(self, first, second):
raise NotImplementedError

Expand Down
184 changes: 184 additions & 0 deletions crawlfrontier/contrib/backends/mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
from datetime import datetime
from pymongo import MongoClient, DESCENDING

from crawlfrontier import Backend, Request, Response
from crawlfrontier.exceptions import NotConfigured


class MongodbBackend(Backend):
name = 'Mongodb Backend'

class State:
NOT_CRAWLED = 'NOT CRAWLED'
QUEUED = 'QUEUED'
CRAWLED = 'CRAWLED'
ERROR = 'ERROR'

def __init__(self, manager):
settings = manager.settings
mongo_hostname = settings.get('BACKEND_MONGO_HOSTNAME')
mongo_port = settings.get('BACKEND_MONGO_PORT')
mongo_db = settings.get('BACKEND_MONGO_DB_NAME')
mongo_collection = settings.get('BACKEND_MONGO_COLLECTION_NAME')
if mongo_hostname is None or mongo_port is None or mongo_db is None or mongo_collection is None:
raise NotConfigured

self.client = MongoClient(mongo_hostname, mongo_port)
self.db = self.client[mongo_db]
self.collection = self.db[mongo_collection]
self.collection.ensure_index("meta.fingerprint", unique=True, drop_dups=True)
self.collection.ensure_index("score")
self.collection.ensure_index("meta.created_at")
self.collection.ensure_index("meta.depth")
self.manager = manager

@classmethod
def from_manager(cls, manager):
return cls(manager)

def add_seeds(self, seeds):
# Log
self.manager.logger.backend.debug('ADD_SEEDS n_links=%s' % len(seeds))

for seed in seeds:
# Get or create page from link
request, _ = self._get_or_create_request(seed)

def page_crawled(self, response, links):
# Log
self.manager.logger.backend.debug('PAGE_CRAWLED page=%s status=%s links=%s' %
(response, response.status_code, len(links)))

# process page crawled
backend_page = self._page_crawled(response)

# Update crawled fields
backend_page.state = self.State.CRAWLED
self.collection.update(self._get_mongo_spec(backend_page), {
"$set": self._to_mongo_dict(backend_page)}, upsert=False)

# Create links
for link in links:
self.manager.logger.backend.debug('ADD_LINK link=%s' % link)
link_page, link_created = self._get_or_create_request(link)
if link_created:
link_page._meta['depth'] = response.meta['depth'] + 1
self.collection.update(self._get_mongo_spec(link_page), {
"$set": self._to_mongo_dict(link_page)}, upsert=False)

def _page_crawled(self, response):
# Get timestamp
now = datetime.utcnow()

# Get or create page from incoming page
backend_page, created = self._get_or_create_request(response)

# Update creation fields
if created:
backend_page.created_at = now

# Update fields
backend_page.last_update = now
backend_page.status = response.status_code
return backend_page

def request_error(self, request, error):
self.manager.logger.backend.debug('PAGE_CRAWLED_ERROR page=%s error=%s' % (request, error))
now = datetime.utcnow()

backend_page, created = self._get_or_create_request(request)

if created:
backend_page.created_at = now
backend_page.last_update = now

backend_page.state = self.State.ERROR
self.collection.update(self._get_mongo_spec(backend_page),
{"$set": self._to_mongo_dict(backend_page)}, upsert=False)
return backend_page

def get_next_requests(self, max_next_pages, downloader_info=None):
# Log
self.manager.logger.backend.debug('GET_NEXT_PAGES max_next_pages=%s' % max_next_pages)
now = datetime.utcnow()
mongo_pages = self._get_sorted_pages(max_next_pages)
requests = []
for p in mongo_pages:
req = self._request_from_mongo_dict(p)
requests.append(req)

if max_next_pages:
requests = requests[0:max_next_pages]
for req in requests:
req.state = self.State.QUEUED
req.last_update = now
self.collection.update(self._get_mongo_spec(req), {
"$set": self._to_mongo_dict(req)}, upsert=False)
return requests

def _get_mongo_spec(self, obj):
return {'meta.fingerprint': obj.meta['fingerprint']}

def _request_from_mongo_dict(self, o):
request = Request(o['url'], o['method'], o['headers'], o['cookies'], o['meta'])
request.state = o['state']
return request

def _to_mongo_dict(self, obj):
def _request_to_dict(req):
return {
'url': req.url,
'method': req.method,
'headers': req.headers,
'cookies': req.cookies,
'meta': req.meta,
'state': req.state
}

if isinstance(obj, Request):
return _request_to_dict(obj)

if isinstance(obj, Response):
return {
'url': obj.url,
'status_code': obj.status_code,
'headers': obj.headers,
'body': obj.body,
'meta': obj.request.meta,
'method': obj.request.method,
'cookies': obj.request.cookies,
'state': obj.state
}

raise TypeError("Type of object %s isn't known." % obj)

def _get_or_create_request(self, obj):
existing_request = self.collection.find_one(self._get_mongo_spec(obj))
if existing_request is None:
new_request = obj.copy()
new_request.meta['created_at'] = datetime.utcnow()
new_request.meta['depth'] = 0
new_request.state = self.State.NOT_CRAWLED
self.collection.insert(self._to_mongo_dict(new_request))
self.manager.logger.backend.debug('Creating request %s' % new_request)
return new_request, True
else:
obj = self._request_from_mongo_dict(existing_request)
self.manager.logger.backend.debug('Request exists %s' % obj)
return obj, False

def _get_sorted_pages(self, max_pages):
raise NotImplementedError

def frontier_start(self):
pass

def frontier_stop(self):
self.client.close()


class MongodbScoreBackend(MongodbBackend):
name = 'Score Mongodb Backend'

def _get_sorted_pages(self, max_pages):
return self.collection.find({'state': self.State.NOT_CRAWLED}).sort('meta.score', DESCENDING).limit(max_pages)
36 changes: 24 additions & 12 deletions crawlfrontier/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ def frontier_stop(self):

def add_seeds(self, seeds):
for seed in seeds:
db_page, _ = self._get_or_create_db_page(url=seed.url, fingerprint=seed.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(url=seed.url, fingerprint=seed.meta['fingerprint'],
request_or_response=seed)
self.session.commit()

def get_next_requests(self, max_next_requests):
Expand All @@ -134,35 +135,34 @@ def get_next_requests(self, max_next_requests):
next_pages = []
for db_page in query:
db_page.state = Page.State.QUEUED
request = self.manager.request_model(url=db_page.url)
request = self._create_request(db_page) # FIXME: we loose all the Request metadata here: methods, meta...
next_pages.append(request)
self.session.commit()
return next_pages

def page_crawled(self, response, links):
db_page, _ = self._get_or_create_db_page(url=response.url, fingerprint=response.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(url=response.url, fingerprint=response.meta['fingerprint'],
request_or_response=response)
db_page.state = Page.State.CRAWLED
db_page.status_code = response.status_code
# TODO: a performance bottle-neck on big volumes, operations should be batched here
for link in links:
db_page_from_link, created = self._get_or_create_db_page(url=link.url, fingerprint=link.meta['fingerprint'])
db_page_from_link, created = self._get_or_create_db_page(url=link.url, fingerprint=link.meta['fingerprint'],
request_or_response=link)
if created:
db_page_from_link.depth = db_page.depth+1
self.session.commit()

def request_error(self, request, error):
db_page, _ = self._get_or_create_db_page(url=request.url, fingerprint=request.meta['fingerprint'])
db_page, _ = self._get_or_create_db_page(url=request.url, fingerprint=request.meta['fingerprint'],
request_or_response=request)
db_page.state = Page.State.ERROR
db_page.error = error
self.session.commit()

def _get_or_create_db_page(self, url, fingerprint):
def _get_or_create_db_page(self, url, fingerprint, request_or_response):
if not self._request_exists(fingerprint):
db_request = self.page_model()
db_request.fingerprint = fingerprint
db_request.state = Page.State.NOT_CRAWLED
db_request.url = url
db_request.depth = 0
db_request.created_at = datetime.datetime.utcnow()
db_request = self._create_page(url, fingerprint, request_or_response)
self.session.add(db_request)
self.manager.logger.backend.debug('Creating request %s' % db_request)
return db_request, True
Expand All @@ -171,6 +171,18 @@ def _get_or_create_db_page(self, url, fingerprint):
self.manager.logger.backend.debug('Request exists %s' % db_request)
return db_request, False

def _create_page(self, url, fingerprint, request_or_response):
page = self.page_model()
page.fingerprint = fingerprint
page.state = Page.State.NOT_CRAWLED
page.url = url
page.depth = 0
page.created_at = datetime.datetime.utcnow()
return page

def _create_request(self, db_page):
return self.manager.request_model(url=db_page.url)

def _request_exists(self, fingerprint):
q = self.page_model.query(self.session).filter_by(fingerprint=fingerprint)
return self.session.query(q.exists()).scalar()
Expand Down