Skip to content

Commit

Permalink
Fix sqlalchemy queue component partition_id
Browse files Browse the repository at this point in the history
  • Loading branch information
jpbalarini committed Nov 15, 2018
1 parent e1a4ca9 commit 7af4e1c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 6 deletions.
9 changes: 9 additions & 0 deletions docs/source/topics/frontera-settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,15 @@ Default: ``5.0``
(in progress + queued requests in that slot) / max allowed concurrent downloads per slot before slot is considered
overused. This affects only Scrapy scheduler."

.. setting:: QUEUE_HOSTNAME_PARTITIONING

QUEUE_HOSTNAME_PARTITIONING
--------------------

Default: ``False``

Wheter to use the hostname as a partitioning scheme or not (uses the fingerprint as default).

.. setting:: REQUEST_MODEL

REQUEST_MODEL
Expand Down
2 changes: 1 addition & 1 deletion frontera/contrib/backends/sqlalchemy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _init_db_worker(self, manager):
self.check_and_create_tables(drop, clear_content, (metadata_m, queue_m))
self._metadata = Metadata(self.session_cls, metadata_m,
settings.get('SQLALCHEMYBACKEND_CACHE_SIZE'))
self._queue = Queue(self.session_cls, queue_m, settings.get('SPIDER_FEED_PARTITIONS'))
self._queue = Queue(self.session_cls, queue_m, settings)

@classmethod
def strategy_worker(cls, manager):
Expand Down
10 changes: 7 additions & 3 deletions frontera/contrib/backends/sqlalchemy/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,14 @@ def flush(self):


class Queue(BaseQueue):
def __init__(self, session_cls, queue_cls, partitions, ordering='default'):
def __init__(self, session_cls, queue_cls, settings, ordering='default'):
partitions = settings.get('SPIDER_FEED_PARTITIONS')
self.session = session_cls()
self.queue_model = queue_cls
self.logger = logging.getLogger("sqlalchemy.queue")
self.partitions = [i for i in range(0, partitions)]
self.partitioner = Crc32NamePartitioner(self.partitions)
self.hostname_partitioning = settings.get('QUEUE_HOSTNAME_PARTITIONING')
self.ordering = ordering

def frontier_stop(self):
Expand Down Expand Up @@ -202,9 +204,11 @@ def schedule(self, batch):
partition_id = self.partitions[0]
host_crc32 = 0
else:
partition_id = self.partitioner.partition(hostname, self.partitions)
fingerprint = to_native_str(fprint)
partition_key = hostname if self.hostname_partitioning else fingerprint
partition_id = self.partitioner.partition(partition_key, self.partitions)
host_crc32 = get_crc32(hostname)
q = self.queue_model(fingerprint=to_native_str(fprint), score=score, url=request.url, meta=request.meta,
q = self.queue_model(fingerprint=fingerprint, score=score, url=request.url, meta=request.meta,
headers=request.headers, cookies=request.cookies, method=to_native_str(request.method),
partition_id=partition_id, host_crc32=host_crc32, created_at=time()*1E+6)
to_save.append(q)
Expand Down
8 changes: 6 additions & 2 deletions tests/contrib/backends/test_backends.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from frontera.core.components import States
from frontera.core.models import Request
from frontera import Settings
from happybase import Connection
from frontera.contrib.backends.hbase import HBaseState, HBaseQueue
from frontera.contrib.backends.sqlalchemy import States as SQLAlchemyStates, Queue as SQLAlchemyQueue
Expand Down Expand Up @@ -88,11 +89,14 @@ def queue(request):
return

if request.param == "sqlalchemy":
settings = Settings()
settings.SPIDER_FEED_PARTITIONS = 2
settings.QUEUE_HOSTNAME_PARTITIONING = True
engine = create_engine('sqlite:///:memory:', echo=False)
session_cls = sessionmaker()
session_cls.configure(bind=engine)
QueueModel.__table__.create(bind=engine)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, 2)
sqla_queue = SQLAlchemyQueue(session_cls, QueueModel, settings)
yield sqla_queue
sqla_queue.frontier_stop()
engine.dispose()
Expand All @@ -114,4 +118,4 @@ def test_queue(queue):
assert set([r.url for r in queue.get_next_requests(10, 0, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r3.url])
assert set([r.url for r in queue.get_next_requests(10, 1, min_requests=3, min_hosts=1,
max_requests_per_host=10)]) == set([r1.url, r2.url])
max_requests_per_host=10)]) == set([r1.url, r2.url])

0 comments on commit 7af4e1c

Please sign in to comment.