Skip to content

Commit

Permalink
Merge pull request #32 from sara-nl/minor-fixes
Browse files Browse the repository at this point in the history
Minor fixes
  • Loading branch information
lnauta authored Nov 22, 2024
2 parents 3cbd308 + f6acc34 commit 849d04d
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 33 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ git clone [email protected]:sara-nl/picasclient.git
cd picasclient
poetry install --with test
```
Note that poetry will create a virtual environment if it is not running withing an activated virtual environment already. In that case, you will need to run `poetry run` before your commands to execute them within the poetry virtual environment.
Note that poetry will create a virtual environment if it is not running within an activated virtual environment already. In that case, you will need to run `poetry run` before your commands to execute them within the poetry virtual environment.

If you prefer not to use `poetry`, then you can install with (in a virtual environment):
```
pip install -U .
pip install flake8 pytest
```

To test, run
```
Expand Down
3 changes: 0 additions & 3 deletions examples/process_task.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ if [[ "$?" != "0" ]]; then
exit 1
fi

#Copy output to the grid storage
#globus-url-copy file:///${PWD}/${OUTPUT} gsiftp://gridftp.grid.sara.nl:2811/pnfs/grid.sara.nl/data/lsgrid/homer/${OUTPUT}

echo `date`

exit 0
7 changes: 3 additions & 4 deletions examples/slurm-example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
# Attach the logs to the token


cd $PWD
# You need to load your environment here
# mamba activate MAMBA-ENV
# source /PATH/TO/VENV/bin/activate
# You may set environmental variables needed in the SLURM job
# For example, when using the LUMI container wrapper:
# export PATH="/path/to/install_dir/bin:$PATH"
python local-example.py
28 changes: 16 additions & 12 deletions picas/actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,28 @@ def _run(self, task, timeout):
with ThreadingTimeout(timeout, swallow_exc=False) as context_manager:
self.process_task(task)
except Exception as ex:
msg = ("Exception {0} occurred during processing: {1}"
.format(type(ex), ex))
msg = f"Exception {type(ex)} occurred during processing: {ex}"
task.error(msg, exception=ex)
log.info(msg)

if context_manager.state == context_manager.TIMED_OUT:
msg = ("Token execution exceeded timeout limit of {0} seconds".format(timeout))
msg = f"Token execution exceeded timeout limit of {timeout} seconds"
log.info(msg)

while True:
try:
self.db.save(task)
break
except ResourceConflict:
# simply overwrite changes - model results are more
# important
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev
try:
self.db.save(task)
except ResourceConflict as ex:
# simply overwrite changes - model results are more important
msg = f"Warning: {type(ex)} occurred while saving task to database: " + \
"Document exists with different revision or was deleted"
log.info(msg)
new_task = self.db.get(task.id)
task['_rev'] = new_task.rev
except Exception as ex:
# re-raise Exception
msg = f"Error: {type(ex)} occurred while saving task to database: {ex}"
log.info(msg)
raise

self.cleanup_run()
self.tasks_processed += 1
Expand Down
11 changes: 8 additions & 3 deletions picas/jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ def add_job_id(doc):
A glite wms system makes underneath use of a cream system which makes use
of PBS. In such a case only the glite wms id instead of all of them.
"""

dirac_jobid = environ.get("DIRACJOBID")
slurm_jobid = environ.get("SLURM_JOB_ID")
wms_jobid = environ.get("GLITE_WMS_JOBID")
cream_jobid = environ.get("CREAM_JOBID")
pbs_jobid = environ.get("PBS_JOBID")
slurm_jobid = environ.get("SLURM_JOB_ID")
slurm_array_jobid = environ.get("SLURM_ARRAY_JOB_ID")
slurm_array_taskid = environ.get("SLURM_ARRAY_TASK_ID")

if slurm_jobid is not None:
doc["slurm_job_id"] = slurm_jobid
if dirac_jobid is not None:
doc["dirac_job_id"] = dirac_jobid
elif wms_jobid is not None:
Expand All @@ -32,6 +33,10 @@ def add_job_id(doc):
doc["cream_job_id"] = cream_jobid
elif pbs_jobid is not None:
doc["pbs_job_id"] = pbs_jobid
elif slurm_array_jobid is not None:
doc["slurm_job_id"] = slurm_array_jobid+"_"+slurm_array_taskid
elif slurm_jobid is not None:
doc["slurm_job_id"] = slurm_jobid


def remove_job_id(doc):
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "picas"
version = "0.3.1"
version = "1.0.0"
description = "Python client using CouchDB as a token pool server."
authors = ["Jan Bot, Joris Borgdorff, Lodewijk Nauta, Haili Hu"]
license = "MIT"
Expand Down
23 changes: 23 additions & 0 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from picas.documents import Task
from picas.actors import RunActor
from picas.iterators import EndlessViewIterator
from couchdb.http import ResourceConflict


class ExampleRun(RunActor):
Expand Down Expand Up @@ -59,6 +60,28 @@ def test_run(self):
runner.run()
self.assertEqual(self.count, len(MockDB.TASKS))

@patch('test_mock.MockDB.save')
def test_run_resourceconflict(self, mock_save):
"""
Test the _run function, in case the DB throws a ResourceConflict
(when document exists with different revision or was deleted)
the _run function should continue
"""
mock_save.side_effect = ResourceConflict
runner = ExampleRun(self._callback)
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)
self.assertEqual(runner.tasks_processed, 1)

@patch('test_mock.MockDB.save')
def test_run_exception(self, mock_save):
"""
Test the _run function, in case the DB throws a an unexpected Exception
"""
with pytest.raises(ValueError):
mock_save.side_effect = ValueError
runner = ExampleRun(self._callback)
runner._run(task=Task({'_id': 'c', 'lock': None, 'done': None}), timeout=None)

def test_stop_max_tasks(self):
"""
Test to stop after 1 task is performed.
Expand Down
12 changes: 6 additions & 6 deletions tests/test_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@


class MockDB(object):
TASKS = [{'_id': 'a', 'lock': 0, 'scrub_count': 0},
{'_id': 'b', 'lock': 0, 'scrub_count': 0},
{'_id': 'c', 'lock': 0, 'scrub_count': 0}]
TASKS = [{'_id': 'a', '_rev': '1', 'lock': 0, 'scrub_count': 0},
{'_id': 'b', '_rev': '1', 'lock': 0, 'scrub_count': 0},
{'_id': 'c', '_rev': '1', 'lock': 0, 'scrub_count': 0}]
JOBS = [{'_id': 'myjob'}]

def __init__(self):
Expand All @@ -21,11 +21,11 @@ def get_single_from_view(self, view, **view_params):

def get(self, idx):
if idx in self.saved:
return self.saved[idx]
return Document(self.saved[idx])
elif idx in self.tasks:
return self.tasks[idx]
return Document(self.tasks[idx])
elif idx in self.jobs:
return self.jobs[idx]
return Document(self.jobs[idx])
else:
raise KeyError

Expand Down

0 comments on commit 849d04d

Please sign in to comment.