diff --git a/src/servicex_did_finder_lib/communication.py b/src/servicex_did_finder_lib/communication.py index 5f1cc8a..a0200d0 100644 --- a/src/servicex_did_finder_lib/communication.py +++ b/src/servicex_did_finder_lib/communication.py @@ -57,7 +57,8 @@ async def run_file_fetch_loop(did: str, servicex: ServiceXAdapter, info: Dict[st servicex.post_status_update(f'Completed load of file in {elapsed_time} seconds') -def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body): +def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, properties, body, + file_prefix=None): '''rabbit_mq_callback Respond to RabbitMQ Message When a request to resolve a DID comes into the DID finder, we @@ -70,6 +71,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie method ([type]): Delivery method properties ([type]): Properties of the message body ([type]): The body (json for us) of the message + file_prefix([str]): Prefix to put in front of file paths to enable use of Cache service ''' request_id = None # set this in case we get an exception while loading request try: @@ -78,7 +80,7 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie did = did_request['did'] request_id = did_request['request_id'] __logging.info(f'Received DID request {did_request}', extra={'requestId': request_id}) - servicex = ServiceXAdapter(did_request['service-endpoint']) + servicex = ServiceXAdapter(did_request['service-endpoint'], file_prefix) servicex.post_status_update("DID Request received") info = { @@ -105,7 +107,9 @@ def rabbit_mq_callback(user_callback: UserDIDHandler, channel, method, propertie def init_rabbit_mq(user_callback: UserDIDHandler, - rabbitmq_url: str, queue_name: str, retries: int, retry_interval: float): + rabbitmq_url: str, queue_name: str, retries: int, + retry_interval: float, + file_prefix: str = None): rabbitmq = None retry_count = 0 @@ -120,7 +124,7 @@ def init_rabbit_mq(user_callback: UserDIDHandler, _channel.basic_consume(queue=queue_name, auto_ack=False, on_message_callback=lambda c, m, p, b: - rabbit_mq_callback(user_callback, c, m, p, b)) + rabbit_mq_callback(user_callback, c, m, p, b, file_prefix)) _channel.start_consuming() except pika.exceptions.AMQPConnectionError: # type: ignore diff --git a/src/servicex_did_finder_lib/servicex_adaptor.py b/src/servicex_did_finder_lib/servicex_adaptor.py index aaaeaab..cb6f769 100644 --- a/src/servicex_did_finder_lib/servicex_adaptor.py +++ b/src/servicex_did_finder_lib/servicex_adaptor.py @@ -35,8 +35,9 @@ class ServiceXAdapter: - def __init__(self, endpoint): + def __init__(self, endpoint, file_prefix=None): self.endpoint = endpoint + self.file_prefix = file_prefix self.logger = logging.getLogger(__name__) self.logger.addHandler(logging.NullHandler()) @@ -61,6 +62,9 @@ def post_status_update(self, status_msg, severity="info"): self.logger.error(f'After {attempts} tries, failed to send ServiceX App a status ' f'message: {str(status_msg)} - Ignoring error.') + def _prefix_file(self, file_path): + return file_path if not self.file_prefix else self.file_prefix+file_path + def put_file_add(self, file_info): success = False attempts = 0 @@ -68,7 +72,7 @@ def put_file_add(self, file_info): try: mesg = { "timestamp": datetime.now().isoformat(), - "file_path": file_info['file_path'], + "file_path": self._prefix_file(file_info['file_path']), 'adler32': file_info['adler32'], 'file_size': file_info['file_size'], 'file_events': file_info['file_events'] diff --git a/tests/servicex_did_finder_lib/test_servicex_did.py b/tests/servicex_did_finder_lib/test_servicex_did.py index 222ccf2..4f01528 100644 --- a/tests/servicex_did_finder_lib/test_servicex_did.py +++ b/tests/servicex_did_finder_lib/test_servicex_did.py @@ -1,4 +1,8 @@ +import json + +import responses from servicex_did_finder_lib import __version__ +from servicex_did_finder_lib.servicex_adaptor import ServiceXAdapter def test_version(): @@ -23,3 +27,21 @@ def test_put_file_add(): assert submitted['file_events'] == 3141 assert submitted['file_size'] == 1024 + +@responses.activate +def test_put_file_add_with_prefix(): + responses.add(responses.PUT, 'http://servicex.org/files', status=206) + sx = ServiceXAdapter("http://servicex.org", "xcache123:") + sx.put_file_add({ + 'file_path': 'root://foo.bar.ROOT', + 'adler32': '32', + 'file_size': 1024, + 'file_events': 3141 + }) + + assert len(responses.calls) == 1 + submitted = json.loads(responses.calls[0].request.body) + assert submitted['file_path'] == 'xcache123:root://foo.bar.ROOT' + assert submitted['adler32'] == '32' + assert submitted['file_events'] == 3141 + assert submitted['file_size'] == 1024