Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --nodown Option to Exclude Nodes That Are Down #449

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/groups.conf.d/genders.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
map: nodeattr -n $GROUP
all: nodeattr -n ALL
list: nodeattr -l
down: whatsup -n -d || /bin/true

3 changes: 3 additions & 0 deletions doc/man/man1/clush.1
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ exclude nodes from the node list
.B \-a\fP,\fB \-\-all
run command on all nodes
.TP
.B \-D\fP,\fB \-\-nodown
exclude nodes that are down
.TP
.BI \-g \ GROUP\fP,\fB \ \-\-group\fB= GROUP
run command on a group of nodes
.TP
Expand Down
7 changes: 6 additions & 1 deletion doc/man/man5/groups.conf.5
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Global configuration options. There should be only one Main section.
.B \fIGroup_source\fP
The \fIGroup_source\fP section(s) define the configuration for each node group
source (or namespace). This configuration consists in external commands
definition (map, all, list and reverse).
definition (map, all, list, down, and reverse).
.UNINDENT
.sp
Only \fIGroup_source\fP section(s) are allowed in additional configuration files.
Expand Down Expand Up @@ -123,6 +123,11 @@ Optional external shell command that should return the list of all groups
for this group source (separated by space characters or by carriage
returns).
.TP
.B down
Optional external shell command that should return the list of all hosts
that are down for this group source (separated by space characters or by
carriage returns).
.TP
.B reverse
Optional external shell command used to find the group(s) of a single
node. The variable $NODE is previously replaced. If this upcall is not
Expand Down
1 change: 1 addition & 0 deletions doc/sphinx/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ groups are bound to the source named *genders* by default::
map: nodeattr -n $GROUP
all: nodeattr -n ALL
list: nodeattr -l
down: whatsup -n -d || /bin/true

[slurm]
map: sinfo -h -o "%N" -p $GROUP
Expand Down
1 change: 1 addition & 0 deletions doc/txt/clush.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Selecting target nodes:
-w NODES nodes where to run the command
-x NODES exclude nodes from the node list
-a, --all run command on all nodes
-D, --nodown exclude nodes that are down
-g GROUP, --group=GROUP
run command on a group of nodes
-X GROUP exclude nodes from this group
Expand Down
5 changes: 5 additions & 0 deletions lib/ClusterShell/CLI/Clush.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,11 @@ def main():
msg = "Picked random nodes: %s" % nodeset_base
print(Display.COLOR_RESULT_FMT % msg)

# If we need to remove nodes that are down do it here
if options.nodown:
down = NodeSet.fromdown()
nodeset_base.difference_update(down)

# Set open files limit.
set_fdlimit(config.fd_max, display)

Expand Down
2 changes: 2 additions & 0 deletions lib/ClusterShell/CLI/OptionParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ def install_nodes_options(self):
optgrp.add_option("-x", action="append", type="safestring",
dest="exclude", metavar="NODES",
help="exclude nodes from the node list")
optgrp.add_option("-D", "--nodown", action="store_true", dest="nodown",
help="exclude down nodes from the node list")
optgrp.add_option("-a", "--all", action="store_true", dest="nodes_all",
help="run command on all nodes")
optgrp.add_option("-g", "--group", action="append", type="safestring",
Expand Down
18 changes: 18 additions & 0 deletions lib/ClusterShell/NodeSet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,24 @@ def fromall(cls, groupsource=None, autostep=None, resolver=None):
raise NodeSetExternalError(errmsg)
return inst

@classmethod
def fromdown(cls, groupsource=None, autostep=None, resolver=None):
"""Class method that returns a new NodeSet with all nodes from optional
groupsource."""
inst = NodeSet(autostep=autostep, resolver=resolver)
try:
if not inst._resolver:
raise NodeSetExternalError("Group resolver is not defined")
else:
# fill this nodeset with all nodes found by resolver
down_nodes = inst._parser.group_resolver.down_nodes(groupsource)
inst = NodeSet.fromlist(down_nodes)
except NodeUtils.GroupResolverError as exc:
errmsg = "Group source error (%s: %s)" % (exc.__class__.__name__,
exc)
raise NodeSetExternalError(errmsg)
return inst

def __getstate__(self):
"""Called when pickling: remove references to group resolver."""
odict = self.__dict__.copy()
Expand Down
41 changes: 31 additions & 10 deletions lib/ClusterShell/NodeUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ class UpcallGroupSource(GroupSource):
"""

def __init__(self, name, map_upcall, all_upcall=None,
list_upcall=None, reverse_upcall=None, cfgdir=None,
cache_time=None):
list_upcall=None, down_upcall=None, reverse_upcall=None,
cfgdir=None, cache_time=None):
GroupSource.__init__(self, name)
self.verbosity = 0 # deprecated
self.cfgdir = cfgdir
Expand All @@ -174,6 +174,8 @@ def __init__(self, name, map_upcall, all_upcall=None,
self.upcalls['all'] = all_upcall
if list_upcall:
self.upcalls['list'] = list_upcall
if down_upcall:
self.upcalls['down'] = down_upcall
if reverse_upcall:
self.upcalls['reverse'] = reverse_upcall
self.has_reverse = True
Expand All @@ -192,6 +194,7 @@ def clear_cache(self):
"""
self._cache = {
'map': {},
'down': {},
'reverse': {}
}

Expand Down Expand Up @@ -224,12 +227,12 @@ def _upcall_cache(self, upcall, cache, key, **args):
raise GroupSourceNoUpcall(upcall, self)

# Purge expired data from cache
if key in cache and cache[key][1] < time.time():
if key in cache and cache[key]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That doesn't look right, you're purging the whole cache instead of a time-based expiration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Needs to be fixed. Was seeing a problem where cache[key][1] was not defined and was blowing up. Did this temporarily and forgot to go back and fix. Thanks!

self.logger.debug("PURGE EXPIRED (%d)'%s'", cache[key][1], key)
del cache[key]

# Fetch the data if unknown of just purged
if key not in cache:
if key not in cache or not cache[key]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one could make sense but could use its own commit at least, possibly its own PR.
You're basically saying not to cache empty groups and I'm not sure I agree e.g. if your "down" set is currently empty then it won't be cached and getting the list of down nodes is possibly expensive.

cache_expiry = time.time() + self.cache_time
# $CFGDIR and $SOURCE always replaced
args['CFGDIR'] = self.cfgdir
Expand All @@ -252,6 +255,12 @@ def resolv_list(self):
"""
return self._upcall_cache('list', self._cache, 'list')

def resolv_down(self):
"""
Return a list of all nodes that are down in this group.
"""
return self._upcall_cache('down', self._cache, 'down')

def resolv_all(self):
"""
Return the content of special group ALL, using the cached value
Expand Down Expand Up @@ -496,6 +505,13 @@ def all_nodes(self, namespace=None):
source = self._source(namespace)
return self._list_nodes(source, 'all')

def down_nodes(self, namespace=None):
"""
Find all nodes. You may specify an optional namespace.
"""
source = self._source(namespace)
return self._list_nodes(source, 'down')

def grouplist(self, namespace=None):
"""
Get full group list. You may specify an optional
Expand Down Expand Up @@ -653,23 +669,28 @@ def _sources_from_cfg(self, cfg, cfgdir):
if srcname != self.SECTION_MAIN:
# only map is a mandatory upcall
map_upcall = cfg.get(section, 'map', raw=True)
all_upcall = list_upcall = reverse_upcall = ctime = None
all_upcall = list_upcall = down_upcall = reverse_upcall = ctime = None
if cfg.has_option(section, 'all'):
all_upcall = cfg.get(section, 'all', raw=True)
if cfg.has_option(section, 'list'):
list_upcall = cfg.get(section, 'list', raw=True)
if cfg.has_option(section, 'down'):
down_upcall = cfg.get(section, 'down', raw=True)
if cfg.has_option(section, 'reverse'):
reverse_upcall = cfg.get(section, 'reverse',
raw=True)
if cfg.has_option(section, 'cache_time'):
ctime = float(cfg.get(section, 'cache_time',
raw=True))
# add new group source
self.add_source(UpcallGroupSource(srcname, map_upcall,
all_upcall,
list_upcall,
reverse_upcall,
cfgdir, ctime))
self.add_source(UpcallGroupSource(srcname,
map_upcall,
all_upcall=all_upcall,
list_upcall=list_upcall,
down_upcall=down_upcall,
reverse_upcall=reverse_upcall,
cfgdir=cfgdir,
cache_time=ctime))
except (NoSectionError, NoOptionError, ValueError) as exc:
raise GroupResolverConfigError(str(exc))

Expand Down
6 changes: 4 additions & 2 deletions tests/NodeSetGroupTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,8 +1346,10 @@ def __init__(self, name, data):
reverse_upcall = None
if 'reverse' in data:
reverse_upcall = 'fake_reverse'
UpcallGroupSource.__init__(self, name, "fake_map", all_upcall,
list_upcall, reverse_upcall)
UpcallGroupSource.__init__(self, name, "fake_map",
all_upcall=all_upcall,
list_upcall=list_upcall,
reverse_upcall=reverse_upcall)
self._data = data

def _upcall_read(self, cmdtpl, args=dict()):
Expand Down