Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue fetcher updates #266

Merged
merged 6 commits into from
Mar 20, 2024
Merged

Queue fetcher updates #266

merged 6 commits into from
Mar 20, 2024

Conversation

philbudne
Copy link
Contributor

Take another page from scrapy scheduling internals

Override _on_input_message, which runs in the Pika thread when a new
message is received from RabbitMQ, and rather than just queuing the
message to the internal work queue (for consumption by worker
threads), decode it, and see when it could next be started (using the
Slot.issue_interval calculated from avg_seconds for request completion
to keep "next_issue" time).  If the delay is less than the fast delay
queue time (set with --busy-delay-minutes), use the Pika connection
"call_later" method to delay putting the message on the work queue
until it's ripe to be issued.  If the delay is longer than
busy-delay-minutes, requeue the message to the -fast queue.

This GREATLY reduces use of the -fast queue (lower CPU load) AND means
that requests can be started as soon as possible, without waiting for
the message to come around through RabbitMQ (better thruput).

Also: default worker count to the number of available CPU cores.

Phil Budne added 3 commits March 13, 2024 22:13
Override _on_input_message, which runs in the Pika thread when a new
message is received from RabbitMQ, and rather than just queuing the
message to the internal work queue (for consumption by worker
threads), decode it, and see when it could next be started (using the
Slot.issue_interval calculated from avg_seconds for request completion
to keep "next_issue" time).  If the delay is less than the fast delay
queue time (set with --busy-delay-minutes), use the Pika connection
"call_later" method to delay putting the message on the work queue
until it's ripe to be issued.  If the delay is longer than
busy-delay-minutes, requeue the message to the -fast queue.

This GREATLY reduces use of the -fast queue (lower CPU load) AND means
that requests can be started as soon as possible, without waiting for
the message to come around through RabbitMQ (better thruput).

Also: default worker count to the number of available CPU cores.
xml.sax parser cannot ignore non-conforming XML (control characters in titles)

* added lxml-stubs package
* ran make upgrade
@philbudne philbudne requested review from pgulley and kilemensi March 15, 2024 04:49
@@ -565,8 +563,17 @@ def _on_message(
"""
im = InputMessage(chan, method, properties, body, time.monotonic())
msglogger.debug("on_message tag #%s", method.delivery_tag)
self._on_input_message(im)

def _on_input_message(self, im: InputMessage) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's designed to be override-able, shouldn't it be named on_input_message (without _ prefix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still an interface that MOST applications should never use/override, and perhaps with time I'll come up with a clean way to abstract away the nastiness!

Comment on lines +55 to +58
self.link: str | None = ""
self.domain: str | None = ""
self.pub_date: str | None = ""
self.title: str | None = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason it's preferred to initialise these to "" instead of None like the rest of fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No ready answer! The ones with None values are ones that were added recently, and therefore REALLY optional. I didn't originally didn't have the ones above as "optional" (see line 61 in red) but the change in XML parser might have forced my hand, and I didn't think too hard about how it looked....

Comment on lines +134 to +136
# mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!!
rss = s.rss_entry()
with rss:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, weird edge cases... not sure if it works, but PEP 526 says we can (should?) annotate the variable before using it.

            rss: RSSEntry
            with s.rss_entry() as rss:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been pre-declaring variables that don't have an initial value, unless the initial assignment doesn't paint a complete picture like:

    var: int | str
    if condition:
        var = 1
    else:
        var = None

mypy does a good job inferring the types, and I'd rather not have to add clutter (unless I'm overruled)!

One thing in PEP 526 that caught my eye was

PEP 484 explicitly states that type comments are intended to help with type inference in complex cases, and this PEP does not change this intention.

which aligns with my attitude that declaring variable type is there for when it's needed

Copy link
Contributor

@kilemensi kilemensi Mar 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-reading my original comment, I see it can be misunderstood.

not sure if it works, but PEP 526 says we can (should?) annotate the variable before using it if inference fails*.

Added the "if inference fails" part. I assumed this could be inferred (pun intended) from the context i.e. the # mypy reval_type(rss) in "with s.rss_entry() as rss" gives Any!! comment line

Otherwise yes, there shouldn't be any need for explicit type hinting.

@@ -10,6 +10,7 @@ dependencies = [
"boto3 ~= 1.28.44",
"docker ~= 6.1.0",
"elasticsearch ~= 8.12.0",
# lxml installed by some other package?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 I think both mediacloud-metadata (via trafilatura) and scrapy requires lxml.

target_concurrency=self.args.target_concurrency,
max_delay_seconds=self.busy_delay_seconds,
conn_retry_seconds=self.args.conn_retry_minutes * 60,
min_interval_seconds=MIN_INTERVAL_SECONDS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we use value passed in via --min-interval-seconds arg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!!! Thanks!!!!

@philbudne
Copy link
Contributor Author

philbudne commented Mar 20, 2024 via email

@philbudne philbudne merged commit a7a8a8e into mediacloud:main Mar 20, 2024
@philbudne philbudne deleted the qfetch-later branch March 20, 2024 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants