Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS integration and implement PdmV use case #617

Merged
merged 6 commits into from
Jan 16, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 56 additions & 1 deletion Server/Python/src/dbs/web/DBSWriterModel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import traceback

# CMSMonitoring modules
from CMSMonitoring.NATS import NATSManager


class DBSWriterModel(DBSReaderModel):
"""
Expand All @@ -36,6 +39,17 @@ def __init__(self, config):
if isinstance(urls, type({})):
config.database.connectUrl = urls['writer']

# initialize NATS if requested
self.nats = None
if hasattr(config, 'use_nats') and config.use_nats:
topic = 'cms.dbs'
topics = config.nats_topics
if not topics:
topics = ['%s.topic' % topic]
self.nats = NATSManager(config.nats_server, topics=topics, default_topic=topic)
msg = "DBS NATS: %s" % self.nats
self.logger.info(msg)

DBSReaderModel.__init__(self, config)

self.sequenceManagerDAO = self.daofactory(classname="SequenceManager")
Expand Down Expand Up @@ -240,6 +254,16 @@ def insertDataset(self):

# need proper validation
self.dbsDataset.insertDataset(indata)
# send message to NATS if it is configured
if self.nats:
try:
dataset = indata.get('dataset')
dataset_access_type = indata.get('dataset_access_type')
doc = {'dataset':dataset, 'dataset_type': dataset_access_type}
self.nats.publish(doc)
except Exception as exp:
err = 'insertDataset NATS error, %s, trace:\n%s' % (str(exp), traceback.format_exc())
self.logger.warning(err)
except cjson.DecodeError as dc:
dbsExceptionHandler("dbsException-invalid-input2", "Wrong format/data from insert dataset input", self.logger.exception, str(dc))
except dbsException as de:
Expand Down Expand Up @@ -267,6 +291,18 @@ def insertBulkBlock(self):
self.logger.exception, "insertBulkBlock: datset and file parentages cannot be in the input at the same time.")
indata = validateJSONInputNoCopy("blockBulk", indata)
self.dbsBlockInsert.putBlock(indata)
# send message to NATS if it is configured
if self.nats:
try:
ddata = indata.get('dataset')
if isinstance(ddata, dict) and 'dataset' in ddata:
dataset = ddata.get('dataset')
dataset_access_type = ddata.get('dataset_access_type')
doc = {'dataset':dataset, 'dataset_type': dataset_access_type}
self.nats.publish(doc)
except Exception as exp:
err = 'insertDataset NATS error, %s, trace:\n%s' % (str(exp), traceback.format_exc())
self.logger.warning(err)
except cjson.DecodeError as dc:
dbsExceptionHandler("dbsException-invalid-input2", "Wrong format/data from insert BulkBlock input", self.logger.exception, str(dc))
except dbsException as de:
Expand Down Expand Up @@ -375,6 +411,8 @@ def insertFile(self, qInserts=False):
if isinstance(indata, dict):
indata = [indata]
indata = validateJSONInputNoCopy("files", indata)
tot_size = 0
tot_evts = 0
for f in indata:
f.update({
#"dataset":f["dataset"],
Expand All @@ -387,7 +425,17 @@ def insertFile(self, qInserts=False):
"file_assoc_list":f.get("assoc_list", []),
"file_output_config_list":f.get("file_output_config_list", [])})
businput.append(f)
tot_evts += f.get('event_count', 0)
tot_size += f.get('file_size', 0)
self.dbsFile.insertFile(businput, qInserts)
# send message to NATS if it is configured
if self.nats and tot_evts and tot_size:
try:
doc = {'dataset':f['dataset'], 'evts': tot_evts, 'size': tot_size}
self.nats.publish(doc)
except Exception as exp:
err = 'insertFile NATS error, %s, trace:\n%s' % (str(exp), traceback.format_exc())
self.logger.warning(err)
except cjson.DecodeError as dc:
dbsExceptionHandler("dbsException-invalid-input2", "Wrong format/data from insert File input", self.logger.exception, str(dc))
except dbsException as de:
Expand Down Expand Up @@ -450,9 +498,16 @@ def updateDataset(self, dataset="", is_dataset_valid=-1, dataset_access_type="")
try:
if dataset_access_type != "":
self.dbsDataset.updateType(dataset, dataset_access_type)
# send message to NATS if it is configured
if self.nats:
try:
doc = {'dataset':dataset, 'dataset_type': dataset_access_type}
self.nats.publish(doc)
except Exception as exp:
err = 'updateDataset NATS error, %s, trace:\n%s' % (str(exp), traceback.format_exc())
self.logger.warning(err)
else:
dbsExceptionHandler("dbsException-invalid-input", "DBSWriterModel/updateDataset. dataset_access_type is required.")

except dbsException as de:
dbsExceptionHandler(de.eCode, de.message, self.logger.exception, de.message)
except HTTPError as he:
Expand Down