-
Notifications
You must be signed in to change notification settings - Fork 48
facilities tutorial developing a monitor
- What is a monitor
- Monitors anatomy
- Developing a monitor to observe free space left in the HD where the project is placed.
- Using the Monitor
Table of contents generated with markdown-toc
A monitor is a non-GUI class which main action would be to do something, periodically and notify any possible "listeners".
The base monitor class looks like this:
class Monitor():
def __init__(self, **kwargs):
# Where to store any data from this monitor
self.workingDir = kwargs['workingDir']
self.samplingInterval = kwargs.get('samplingInterval', None)
self.monitorTime = kwargs.get('monitorTime', None)
self._notifiers = []
if kwargs.get('email', None) is not None:
self._notifiers.append(kwargs['email'])
if 'stdout' in kwargs:
self._notifiers.append(PrintNotifier())
def notify(self, title, message):
for n in self._notifiers:
if n:
n.notify(title, message)
def info(self, message):
self.notify("INFO", message)
def initLoop(self):
""" To be defined in subclasses. """
pass
def loop(self):
self.initLoop()
timeout = time.time() + 60. * self.monitorTime # interval minutes from now
while True:
finished = self.step()
if (time.time() > timeout) or finished:
break
time.sleep(self.samplingInterval)
def step(self):
""" To be defined in subclasses. """
pass
def addNotifier(self, notifier):
self._notifiers.append(notifier)
The most important method of a monitor is
def step(self):
""" To be defined in subclasses. """
pass
In this base class it is not implemented, but if you want to develop a monitor, better implement this method. The step() method will be called from the loop() in a regular manner until a self.monitorTime is reached or it is intentionally stoped.
Optionally, you can code and initLoop() method that will be called before the loops start, suitable for some initializations.
Additionally, monitors follow an observer pattern where any interested "listener/observer" could register and receive notifications from the monitor.
Let's start coding something. Let's create a python file in our tutorial package and create there our new monitor class.
- Add a
mymonitor.py
file topyworkflow/em/packages/myfacility/
- Import
Monitor
base class, and define your new class based onMonitor
from pyworkflow.em.protocol.monitors import Monitor
class SpaceMonitor(Monitor):
"""
Monitor to monitor free space on the HD where scipion project is placed
"""
- Implement the step method. This method should find the HD where the project is placed.
Import some modules at the top of the file (import section)
import os
import collections
Add the following to the step method:
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
print(usage)
# Taken from http://code.activestate.com/recipes/577972-disk-usage/
def disk_usage(path):
_ntuple_diskusage = collections.namedtuple('usage', 'total used free')
st = os.statvfs(path)
free = st.f_bavail * st.f_frsize
total = st.f_blocks * st.f_frsize
used = (st.f_blocks - st.f_bfree) * st.f_frsize
return _ntuple_diskusage(total, used, free)
This is adding a new method disk_usage
which receives a path and uses it in the step method, printing it (temporarily).
We are going now to add some tests to check our progress.
- Add a tests folder to the package (
pyworkflow/em/packages/myfacility/tests
) - Add an empty
__init__.py
to make it a module - Add
test_monitor.py
to the tests folder - Add the code below
from pyworkflow.tests import *
from pyworkflow.em.packages.myfacility import SpaceMonitor
# Test monitor functionality
class TestMonitor(BaseTest):
def test_monitor(self):
# Instantiate the monitor
spaceMonitor = SpaceMonitor(workingDir=os.getcwd())
spaceMonitor.step()
- Run the test:
scipion test em.packages.myfacility.tests.test_monitor.TestMonitor
Output should look like this:
Scipion (2018-06-12) ((HEAD detached at june_2018_course) b9d0e37)
>>>>> python scripts/run_tests.py em.packages.myfacility.tests.test_monitor.TestMonitor
Running tests....
usage(total=245527773184, used=127268225024, free=105763827712)
[ RUN OK ] TestMonitor.test_monitor (0.001 secs)
[==========] run 1 tests (0.001 secs)
[ PASSED ] 1 tests
The disk stats are printed on the screen. This is not a proper test since it will never fail and there are no checks(assertions) but soon we will add them.
We need to persist/store the values, and the easiest approach here is to write the values in a text file.
Following a TDD approach, we will first modify the test to expect the new behaviour.
Since we are going to generate files, let's tell the monitor to use a temporary folder instead of the os.getcwd()
. For this we need to first import at the top mkdtemp function from tempfile from tempfile import mkdtemp
and then use it when creating the monitor:
def test_monitor(self):
# Instantiate the monitor
spaceMonitor = SpaceMonitor(workingDir=mkdtemp())
spaceMonitor.step()
# Check storage file exists
fnStorageFile = spaceMonitor.getStorageFilePath()
# Check that the file exists
self.assertTrue(os.path.exists(fnStorageFile),
"Storage file %s not created." % fnStorageFile)
# Check there are 2 lines (headers and first data line)
num_lines = sum(1 for line in open(fnStorageFile,'r'))
# Assert lines are 2
self.assertEqual(2, num_lines,
"First step of the monitor does not "
"have the expected lines: %s" % 2)
Additionally, we have added some lines to: get the name of the storage file; assert that it exists; and test that it has 2 lines. Since we haven't modified our Monitor yet this test should fail.
[ FAILED ] TestMonitor.test_monitor
Traceback (most recent call last):
File "/home/pablo/desarrollo/scipion/software/lib/python2.7/unittest/case.py", line 329, in run
testMethod()
File "/home/pablo/desarrollo/scipion/pyworkflow/em/packages/myfacility/tests/test_monitor.py", line 16, in test_monitor
fnStorageFile = spaceMonitor.getStorageFilePath()
AttributeError: SpaceMonitor instance has no attribute 'getStorageFilePath'
[==========] run 1 tests (0.003 secs)
[ FAILED ] 1 tests
[ PASSED ] 0 tests
Our monitor does not have the getStorageFilePath() function.
Let's implement what the test is expecting:
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
self.storeUsageData(usage)
def getStorageFilePath(self):
return os.path.join(self.workingDir, 'space_usage.txt')
def storeUsageData(self, usageData):
fnStorageFile = self.getStorageFilePath()
if not os.path.exists(fnStorageFile):
fhStorage = open(fnStorageFile, "w")
fhStorage.write("total\tused\tfree\n")
else:
fhStorage = open(fnStorageFile, "a")
fhStorage.write("%s\t%s\t%s\n" % (usageData.total, usageData.used, usageData.free))
fhStorage.close()
Note that we have:
- Implemented the
getStorageFilePath()
method. - Implemented a
storeUsageData()
to store our usage data - In the
step()
function we have called the storage function:self.storeUsageData(usage)
What we have done is just a piece of "behaviour" it will calculate the statistics of the HD where a certain folder (workingFolder) belongs. But how can we start it from a scipion project?
There is a special protocol (ProtMonitor) that is designed to have monitors running. It can be found at pyworkflow/em/protocol/monitors/protocol_monitor.py
. It inherits from EMProtcol and defines several parameters:
- inputProtocols: Protocols to be monitored
- samplingInterval: Time interval between sampling
- Mail params (Optional): All email params needed to send emails
Note that our case, monitoring the HD, does not require any specific inputProtocol to monitor.
Additionally, this special protocol will create a "monitorStep" that any "implementer" has to implement:
# -------------------------- INSERT steps functions -----------------------
def _insertAllSteps(self):
self._insertFunctionStep('monitorStep')
# -------------------------- STEPS functions ------------------------------
def monitorStep(self):
pass
Let's define a new Class for our Space Monitor Protocol at mymonitor.py.
- Import ProtMonitor and PrintNotifier at the top:
from pyworkflow.em import ProtMonitor, PrintNotifier
- Import last version:
from pyworkflow import VERSION_1_2
- Add our ProtMonitorSpace:
class ProtMonitorSpace(ProtMonitor):
_label = 'monitor of HD space'
_lastUpdateVersion = VERSION_1_2
# Overwrite the monitor step function
def monitorStep(self):
# Instantiate a Space Monitor
monitor = SpaceMonitor(workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
monitor.addNotifier(PrintNotifier())
monitor.loop()
-
Make our monitor to notify something, at the end of out SpaceMonitor.step() add
self.notify("HD stats", str(usage))
-
Expose the new protocol to scipion. In the package
__init__.py
add:from mymonitor import SpaceMonitor, ProtMonitorSpace
Now Scipion should be able to discover it and you should be able to run it:
So far this is OK, but:
- The output is hardly readable....we should convert bytes into GB (at least) to be human-friendly
- More important, there is only a Print notifier, so someone has to activelly look at the logs.
- A plot might be useful to plot the HD stats.
- Import
from pyworkflow.utils import prettySize
- In the SpaceMonitor, just before calling the notify convert the values and modify the notify call:
self.storeUsageData(usage)
# Stats line readable:
free = prettySize(usage.free)
total = prettySize(usage.total)
used = prettySize(usage.used)
self.notify("HD stats (%s)" % self.workingDir,
"Free: %s, Total: %s, Used: %s" % (free, total, used))```
#### Add an Email notifier... and remove the inputProtocols
Let's tweak the parameters of the protocol:
Import params from em, like so: `from pyworkflow.em import ProtMonitor, PrintNotifier, params`
```python
_label = 'monitor of HD space'
_lastUpdateVersion = VERSION_1_2
def _defineParams(self, form):
""" Overwrite the standard define params """
# This should define the inputProtocols and the sampling interval
ProtMonitor._defineParams(self, form)
# Remove the inputProtocols
section = form.getSection('Input')
section._paramList.remove('inputProtocols')
# Add a threshold for the email notifier
form.addParam('minimumFreeSpace', params.IntParam, default=500,
label="Minimum free space (GB)",
help="Notify by email or console when HD free space"
" drops below the minimum")
self._sendMailParams(form)
# Overwrite the monitor step function
def monitorStep(self):
# Instantiate a Space Monitor
monitor = SpaceMonitor(self.minimumFreeSpace,
workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
monitor.addNotifier(PrintNotifier())
# Create the email notifier
email = self.createEmailNotifier()
# If email option active
if email is not None:
monitor.addNotifier(email)
monitor.loop()
We have added the _defineParams(self, form)
method. Ther we have called the ProtMonitor._defineParams(self, form)
to have the default parent params. Next thing is to delete the 'inputProtocols'. This wasn't expected in our API, but python is flexible enough to make this happen:
# Remove the inputProtocols
section = form.getSection('Input')
section._paramList.remove('inputProtocols')
Next thing is to add a minimumFreeSpace
parameter to serve as a threshold to send email in case free space goes below that threshold and we add the email notifications parameters with self._sendMailParams(form)
.
Secondly, we have also made some changes in the step()
method. We are passing the minimumFreeSpace value to the SpaceMonitor:
# Instantiate a Space Monitor
monitor = SpaceMonitor(self.minimumFreeSpace,
workingDir=self._getExtraPath(),
samplingInterval=self.samplingInterval.get(),
monitorTime=100)
and finaly, at the end we have requested for an EmailNotifier and added it to the Monitor if condition apply.
Our protocol looks ok, but our monitor is not yet aware of the new parameter 'minimumFreeSpace'
and is notifying in any loop.
- Make space monitor to understand and react to 'minimumFreeSpace'
def __init__(self, minimumFreeSpace, **kwards):
Monitor.__init__(**kwards)
self.minimumFreeSpace = minimumFreeSpace
- And modify the
step()
method to take the threshold into account
def step(self):
""" Using the workingdir attribute has to find the HD and then get the
available free space."""
usage = disk_usage(self.workingDir)
self.storeUsageData(usage)
# Stats line readable:
free = prettySize(usage.free)
total = prettySize(usage.total)
used = prettySize(usage.used)
# Free space in GB
freeGB = usage.free/(1024.0**3.0)
# Notify only if free space is bellow the threshold in GB
if freeGB < self.minimumFreeSpace:
self.notify("WARNING: There is only %s left for %s" %
(free, self.workingDir),
"Free: %s, Total: %s, Used: %s, Threshold: %s" %
(free, total, used, self.minimumFreeSpace))
We haven't touched the monitor test and more important, we are not testing the protocol.
Our test should be failing because we are not passing the minimumFreeSpace. Let's do that:
Replace (for simplicity) all content in test_monitor.py with:
import math
from pyworkflow.em.packages.myfacility.mymonitor import disk_usage
from pyworkflow.tests import *
from pyworkflow.em.packages.myfacility import SpaceMonitor
from tempfile import mkdtemp
# Test monitor functionality
class TestMonitor(BaseTest):
def test_monitor(self):
# Get a tmp folder
tmpFolder = mkdtemp()
# Get freespace
diskUsage = disk_usage(tmpFolder)
freeSpaceInGB = diskUsage.free/(1024.0**3)
# Round it to the "foor"
freeSpaceInGB = math.floor(freeSpaceInGB)
# Instantiate the monitor
spaceMonitor = SpaceMonitor(freeSpaceInGB, workingDir=tmpFolder, stdout=True)
testNotifier = TestNotifier()
spaceMonitor.addNotifier(testNotifier)
spaceMonitor.step()
# Check storage file exists
fnStorageFile = spaceMonitor.getStorageFilePath()
# Check that the file exists
self.assertTrue(os.path.exists(fnStorageFile),
"Storage file %s not created." % fnStorageFile)
# Check there are 2 lines (headers and first data line)
num_lines = sum(1 for line in open(fnStorageFile,'r'))
# Assert lines are 2
self.assertEqual(2, num_lines,
"First step of the monitor does not "
"have the expected lines: %s" % 2)
# Check notifications are empty
self.assertEqual(0, len(testNotifier.getNotifications()), "Notifiactions are not empty!")
# Move the threshold to trigger notifications
spaceMonitor.minimumFreeSpace = freeSpaceInGB + 1
spaceMonitor.step()
# Check there is a notification
self.assertEqual(1, len(testNotifier.getNotifications()), "There isn't a notifiaction")
class TestNotifier():
def __init__(self):
self._notifications=[]
def getNotifications(self):
return self._notifications
def notify(self, title, message):
# Just store the message
self._notifications.append(message)
We have used the diskUsage()
method to get the free space of the HD of the /tmp folder. We round it "down" and use that as the threshold for the SpaceMonitor. Afte the first step there should not be any notification. After the first assertions, we add another to check there are no notifications. Secondly, we increase the threshold by one and call for a second time step()
. This time, there should be a notification.
Note that we have to make out own TestNotifier in order to check the notifications. Something we should provide, but wasn't available in scipion today.
We also have to test the protocol. You will need to import ProtMonitorSpace and a wait() function:
import math
from pyworkflow.em.packages.myfacility.mymonitor import disk_usage, \
ProtMonitorSpace
from pyworkflow.tests import *
from pyworkflow.em.packages.myfacility import SpaceMonitor
from tempfile import mkdtemp
Now add the code below right after self.assertEqual(1, len(testNotifier.getNotifications()), "There isn't a notifiaction")
and rigth above our custom class TestNotifier
@classmethod
def setUpClass(cls):
setupTestProject(cls)
def test_spacemonitor_protocol(self):
prot = self.newProtocol(ProtMonitorSpace,
objLabel='HD free Space monitor',
samplingInterval=10)
self.proj.launchProtocol(prot, wait=False)
# Test that the spaceMonitor txt file is where expected
spaceMon = SpaceMonitor(10, workingDir=prot._getExtraPath())
txtPath = spaceMon.getStorageFilePath()
# Wait for a minute maximun or if file exists
wait(lambda: not os.path.exists(txtPath), timeout=15)
self.assertTrue(os.path.exists(txtPath), "Space monitor txt file not "
"found at %s" % txtPath)
# Stop the protocol. Do not wait for its timeout
self.proj.stopProtocol(prot)
With the setUpClass()
we are creating an empty Scipion project
Run the test:
scipion test em.packages.myfacility.tests.test_monitor.TestMonitor
. In the test_spacemonitor_protocol
we are creating one ProtMonitorSpace
and launching it.
Since we want to test the existence of txt file, we make use of a temporary SpaceMonitor to get the exact txt file at the extra folder of our protocol. We must give some time to the monitor (15 secs) before checking the file existence, thus wait(lambda: not os.path.exists(txtPath), timeout=15)
. THen, we make the assertion and stop the protocol...since the only stop mechanism it has is the default timeout (100 minutes).
You should get something like:
Scipion (2018-06-12) ((HEAD detached at june_2018_course) b9d0e37)
>>>>> python scripts/run_tests.py em.packages.myfacility.tests.test_monitor.TestMonitor
Running tests....
('Creating project at: ', '/home/pablo/ScipionUserData/projects/TestMonitor/project.sqlite')
WARNING: There is only 9.40 GB left for /tmp/tmpRRlsHF Free: 9.40 GB, Total: 36.54 GB, Used: 25.26 GB, Threshold: 10.0
[ RUN OK ] TestMonitor.test_monitor (0.004 secs)
** Running command: 'python /home/pablo/desarrollo/scipion/scipion runprotocol pw_protocol_run.py "/home/pablo/ScipionUserData/projects/TestMonitor" "Runs/000002_ProtMonitorSpace/logs/run.db" 2'
Scipion (2018-06-12) ((HEAD detached at june_2018_course) b9d0e37)
>>>>> python /home/pablo/desarrollo/scipion/pyworkflow/apps/pw_protocol_run.py "/home/pablo/ScipionUserData/projects/TestMonitor" "Runs/000002_ProtMonitorSpace/logs/run.db" "2"
[ RUN OK ] TestMonitor.test_spacemonitor_protocol (1.093 secs)
[==========] run 2 tests (1.605 secs)
[ PASSED ] 2 tests
```