Skip to content

Commit

Permalink
Add support for --prefix of returned files
Browse files Browse the repository at this point in the history
  • Loading branch information
BenGalewsky committed Oct 19, 2021
1 parent 7010224 commit be6d665
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
12 changes: 8 additions & 4 deletions src/servicex_did_finder_lib/communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st
servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds')


def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body):
def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body,
file_prefix=None):
'''rabbit_mq_callback Respond to RabbitMQ Message
When a request to resolve a DID comes into the DID finder, we
Expand All @@ -70,6 +71,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
method ([type]): Delivery method
properties ([type]): Properties of the message
body ([type]): The body (json for us) of the message
file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service
'''
request_id = None # set this in case we get an exception while loading request
try:
Expand All @@ -78,7 +80,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie
did = did_request['did']
request_id = did_request['request_id']
__logging.info(f'Received DID request {did_request}', extra={'requestId': request_id})
servicex = ServiceXAdapter(did_request['service-endpoint'])
servicex = ServiceXAdapter(did_request['service-endpoint'], file_prefix)
servicex.post_status_update("DID Request received")

info = {
Expand All @@ -105,7 +107,9 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie


def init_rabbit_mq(user_callback: UserDIDHandler,
rabbitmq_url: str, queue_name: str, retries: int, retry_interval: float):
rabbitmq_url: str, queue_name: str, retries: int,
retry_interval: float,
file_prefix: str = None):
rabbitmq = None
retry_count = 0

Expand All @@ -120,7 +124,7 @@ def init_rabbit_mq(user_callback: UserDIDHandler,
_channel.basic_consume(queue=queue_name,
auto_ack=False,
on_message_callback=lambda c, m, p, b:
rabbit_mq_callback(user_callback, c, m, p, b))
rabbit_mq_callback(user_callback, c, m, p, b, file_prefix))
_channel.start_consuming()

except pika.exceptions.AMQPConnectionError: # type: ignore
Expand Down
8 changes: 6 additions & 2 deletions src/servicex_did_finder_lib/servicex_adaptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@


class ServiceXAdapter:
def __init__(self, endpoint):
def __init__(self, endpoint, file_prefix=None):
self.endpoint = endpoint
self.file_prefix = file_prefix

self.logger = logging.getLogger(__name__)
self.logger.addHandler(logging.NullHandler())
Expand All @@ -61,14 +62,17 @@ def post_status_update(self, status_msg, severity="info"):
self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status '
f'message: {str(status_msg)} - Ignoring error.')

def _prefix_file(self, file_path):
return file_path if not self.file_prefix else self.file_prefix+file_path

def put_file_add(self, file_info):
success = False
attempts = 0
while not success and attempts < MAX_RETRIES:
try:
mesg = {
"timestamp": datetime.now().isoformat(),
"file_path": file_info['file_path'],
"file_path": self._prefix_file(file_info['file_path']),
'adler32': file_info['adler32'],
'file_size': file_info['file_size'],
'file_events': file_info['file_events']
Expand Down
22 changes: 22 additions & 0 deletions tests/servicex_did_finder_lib/test_servicex_did.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import json

import responses
from servicex_did_finder_lib import __version__
from servicex_did_finder_lib.servicex_adaptor import ServiceXAdapter


def test_version():
Expand All @@ -23,3 +27,21 @@ def test_put_file_add():
assert submitted['file_events'] == 3141
assert submitted['file_size'] == 1024


@responses.activate
def test_put_file_add_with_prefix():
responses.add(responses.PUT, 'http://servicex.org/files', status=206)
sx = ServiceXAdapter("http://servicex.org", "xcache123:")
sx.put_file_add({
'file_path': 'root://foo.bar.ROOT',
'adler32': '32',
'file_size': 1024,
'file_events': 3141
})

assert len(responses.calls) == 1
submitted = json.loads(responses.calls[0].request.body)
assert submitted['file_path'] == 'xcache123:root://foo.bar.ROOT'
assert submitted['adler32'] == '32'
assert submitted['file_events'] == 3141
assert submitted['file_size'] == 1024

0 comments on commit be6d665

Please sign in to comment.