Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: split stashes #22910

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions src/olympia/blocklist/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):

base_filters: dict[BlockType, MLBF | None] = {key: None for key in BlockType}
base_filters_to_update: List[BlockType] = []
create_stash = False
stashes_to_update: List[BlockType] = []

# Determine which base filters need to be re uploaded
# and whether a new stash needs to be created.
Expand Down Expand Up @@ -103,9 +103,9 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
# Only update the stash if we should AND if we aren't already
# re-uploading the filter for this block type.
elif mlbf.should_upload_stash(block_type, previous_filter or base_filter):
create_stash = True
stashes_to_update.append(block_type)

skip_update = len(base_filters_to_update) == 0 and not create_stash
skip_update = len(base_filters_to_update) == 0 and len(stashes_to_update) == 0
if skip_update:
log.info('No new/modified/deleted Blocks in database; skipping MLBF generation')
# Delete the locally generated MLBF directory and files as they are not needed.
Expand All @@ -125,11 +125,12 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
len(mlbf.data.not_blocked_items),
)

if create_stash:
# We generate unified stashes, which means they can contain data
# for both soft and hard blocks. We need the base filters of each
# block type to determine what goes in a stash.
for block_type in stashes_to_update:
# We generate stashes for each block type, with a unified schema.
# That means every stash contains lists of each blokc type
# but only contains data for the block type specified in the stash key.
mlbf.generate_and_write_stash(
block_type=block_type,
previous_mlbf=previous_filter,
blocked_base_filter=base_filters[BlockType.BLOCKED],
soft_blocked_base_filter=base_filters[BlockType.SOFT_BLOCKED],
Expand All @@ -141,7 +142,7 @@ def _upload_mlbf_to_remote_settings(*, force_base=False):
upload_filter.delay(
mlbf.created_at,
filter_list=[key.name for key in base_filters_to_update],
create_stash=create_stash,
stash_list=[key.name for key in stashes_to_update],
)


Expand Down
20 changes: 14 additions & 6 deletions src/olympia/blocklist/mlbf.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ class MLBF:
FILTER_FILE = 'filter'
STASH_FILE = 'stash'
KEY_FORMAT = '{guid}:{version}'
STASH_KEYS = {
BlockType.BLOCKED: 'blocked',
BlockType.SOFT_BLOCKED: 'softblocked',
}

def __init__(
self,
Expand Down Expand Up @@ -222,9 +226,8 @@ def filter_path(self, block_type: BlockType, compat: bool = False):
return self.storage.path('filter')
return self.storage.path(f'filter-{BaseMLBFLoader.data_type_key(block_type)}')

@property
def stash_path(self):
return self.storage.path('stash.json')
def stash_path(self, block_type: BlockType):
return self.storage.path(f'stash-{BaseMLBFLoader.data_type_key(block_type)}.json')

def delete(self):
if self.storage.exists(self.storage.base_location):
Expand Down Expand Up @@ -292,6 +295,7 @@ def generate_diffs(

def generate_and_write_stash(
self,
block_type: BlockType = BlockType.BLOCKED,
previous_mlbf: 'MLBF' = None,
blocked_base_filter: 'MLBF' = None,
soft_blocked_base_filter: 'MLBF' = None,
Expand Down Expand Up @@ -333,11 +337,15 @@ def generate_and_write_stash(
blocked_added, blocked_removed, _ = diffs[BlockType.BLOCKED]
added_items = set(blocked_added)

if not self.should_upload_filter(BlockType.BLOCKED, blocked_base_filter):
if block_type == BlockType.BLOCKED and not self.should_upload_filter(
BlockType.BLOCKED, blocked_base_filter
):
stash_json[STASH_KEYS[BlockType.BLOCKED]] = blocked_added
stash_json[UNBLOCKED_STASH_KEY] = blocked_removed

if waffle.switch_is_active('enable-soft-blocking'):
if block_type == BlockType.SOFT_BLOCKED and waffle.switch_is_active(
'enable-soft-blocking'
):
soft_blocked_added, soft_blocked_removed, _ = diffs[BlockType.SOFT_BLOCKED]
added_items.update(soft_blocked_added)
if not self.should_upload_filter(
Expand All @@ -352,7 +360,7 @@ def generate_and_write_stash(
]

# write stash
stash_path = self.stash_path
stash_path = self.stash_path(block_type)
with self.storage.open(stash_path, 'w') as json_file:
log.info(f'Writing to file {stash_path}')
json.dump(stash_json, json_file)
Expand Down
45 changes: 33 additions & 12 deletions src/olympia/blocklist/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ def monitor_remote_settings():


@task
def upload_filter(generation_time, filter_list=None, create_stash=False):
def upload_filter(generation_time, filter_list=None, stash_list=None):
filters_to_upload: List[BlockType] = []
stashes_to_upload: List[BlockType] = []
base_filter_ids = dict()
bucket = settings.REMOTE_SETTINGS_WRITER_BUCKET
server = RemoteSettings(
Expand All @@ -131,6 +132,10 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
if filter_list and block_type.name in filter_list:
filters_to_upload.append(block_type)

# Only upload stashes that are in the stash_list arg.
if stash_list and block_type.name in stash_list:
stashes_to_upload.append(block_type)

base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
Expand All @@ -141,6 +146,7 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
if base_filter_id is not None:
base_filter_ids[block_type] = base_filter_id

# It is possible to upload multiple filters in the same task.
for block_type in filters_to_upload:
attachment_type = BLOCKLIST_RECORD_MLBF_BASE(block_type)
data = {
Expand All @@ -161,9 +167,9 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
# so we can delete stashes older than this new filter.
base_filter_ids[block_type] = generation_time

# It is possible to upload a stash and a filter in the same task.
if create_stash:
with mlbf.storage.open(mlbf.stash_path, 'r') as stash_file:
# It is possible to upload multiple stashes in the same task.
for block_type in stashes_to_upload:
with mlbf.storage.open(mlbf.stash_path(block_type), 'r') as stash_file:
stash_data = json.load(stash_file)
# If we have a stash, write that
stash_upload_data = {
Expand All @@ -174,10 +180,6 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
server.publish_record(stash_upload_data)
statsd.incr('blocklist.tasks.upload_filter.upload_stash')

# Get the oldest base filter id so we can delete only stashes
# that are definitely not needed anymore.
oldest_base_filter_id = min(base_filter_ids.values()) if base_filter_ids else None

for record in old_records:
# Delete attachment records that match the
# attachment types of filters we just uploaded.
Expand All @@ -191,10 +193,29 @@ def upload_filter(generation_time, filter_list=None, create_stash=False):
# Delete stash records that are older than the oldest
# pre-existing filter attachment records. These records
# cannot apply to any existing filter since we uploaded.
elif 'stash' in record and oldest_base_filter_id is not None:
record_time = record['stash_time']
if record_time < oldest_base_filter_id:
server.delete_record(record['id'])
# Note: stashes only contain one type of block.
# So we should delete stashes older than the oldest
# base filter for the corresponding block type.
elif 'stash' in record:
stash_time = record['stash_time']
stash_data = record['stash']

for block_type in BlockType:
stash_key = MLBF.STASH_KEYS[block_type]
base_filter_id = get_config(
MLBF_BASE_ID_CONFIG_KEY(block_type, compat=True),
json_value=True,
)
# Delete the record if...
if (
# this stash contains a non empty list of this block type
stash_key in stash_data
and len(stash_data[stash_key]) > 0
# there is a base filter id that is more recent than this stash
and base_filter_id is not None
and stash_time < base_filter_id
):
server.delete_record(record['id'])

# Commit the changes to remote settings for review + signing.
# Only after any changes to records (attachments and stashes)
Expand Down
Loading