forked from Linuxfabrik/lib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshell.py
165 lines (145 loc) · 6.19 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Communicates with the Shell.
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2023112901'
import os
import re
import shlex
import subprocess
from . import txt
RETC_SSHPASS = {
1: 'Invalid command line argument',
2: 'Conflicting arguments given',
3: 'General runtime error',
4: 'Unrecognized response from ssh (parse error)',
5: 'Invalid/incorrect password',
6: 'Host public key is unknown. sshpass exits without confirming the new key.',
7: 'IP public key changed. sshpass exits without confirming the new key.',
}
def get_command_output(cmd, regex=None):
"""Runs a shell command and returns its output. Optionally, applies a regex and just
returns the first matching group. If the command is not found, an empty string is returned.
>>> get_command_output('nano --version')
GNU nano, version 5.3
(C) 1999-2011, 2013-2020 Free Software Foundation, Inc.
(C) 2014-2020 the contributors to nano
Compiled options: --enable-utf8
>>> get_command_output('nano --version', regex=r'version (.*)\n')
5.3
"""
success, result = shell_exec(cmd)
if not success:
return ''
stdout, stderr, retc = result
if stdout == '' and stderr != '':
# https://stackoverflow.com/questions/26028416/why-does-python-print-version-info-to-stderr
# https://stackoverflow.com/questions/13483443/why-does-java-version-go-to-stderr]
stdout = stderr
stdout = stdout.strip()
if regex:
# extract something special from output
try:
stdout = re.search(regex, stdout)
return stdout.group(1).strip()
except:
return ''
else:
return stdout.strip()
def shell_exec(cmd, env=None, shell=False, stdin='', cwd=None, timeout=None):
"""Executes external command and returns the complete output as a
string (stdout, stderr) and the program exit code (retc).
Parameters
----------
cmd : str
Command to spawn the child process.
env : None or dict
Environment variables. Example: env={'PATH': '/usr/bin'}.
shell : bool
If True, the new process is called via what is set in the SHELL
environment variable - means using shell=True invokes a program of the
user's choice and is platform-dependent. It allows you to expand
environment variables and file globs according to the shell's usual
mechanism, which can be a security hazard. Generally speaking, avoid
invocations via the shell. It is very seldom needed to set this
to True.
stdin : str
If set, use this as input into `cmd`.
cwd : str
Current Working Directory
timeout : int
If the process does not terminate after timeout seconds, False is returned.
Returns
-------
result : tuple
result[0] = the functions return code (bool)
False: result[1] contains the error message (str)
True: result[1] contains the result of the called `cmd`
as a tuple (stdout, stderr, retc)
https://docs.python.org/2/library/subprocess.html
"""
if not env:
env = os.environ.copy()
else:
# merge the OS environment variables with the ones set by the env parameter
env = {**os.environ.copy(), **env}
# set cmd output to English, no matter what the user has choosen
env['LC_ALL'] = 'C'
# subprocess.PIPE: Special value that can be used as the stdin,
# stdout or stderr argument to Popen and indicates that a pipe to
# the standard stream should be opened.
if shell or stdin:
# New console wanted, or we have some input for our cmd - then we
# need a new console, too.
# Pipes '|' are handled by the shell itself.
try:
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, shell=True, cwd=cwd)
except OSError as e:
return (False, 'OS Error "{} {}" calling command "{}"'.format(e.errno, e.strerror, cmd))
except ValueError as e:
return (False, 'Value Error "{}" calling command "{}"'.format(e, cmd))
except Exception as e:
return (False, 'Unknown error "{}" while calling command "{}"'.format(e, cmd))
if stdin:
# provide stdin as input for the cmd
stdout, stderr = p.communicate(input=txt.to_bytes(stdin))
else:
stdout, stderr = p.communicate()
retc = p.returncode
return (True, (txt.to_text(stdout), txt.to_text(stderr), retc))
# No new console wanted, but then we have to do pipe handling on our own.
# Examples:
# * `cat /var/log/messages | grep DENY | grep Rule`
# * `. /etc/os-release && echo $NAME $VERSION`
cmds = cmd.split('|')
p = None
for cmd in cmds:
try:
args = shlex.split(cmd.strip())
# use the previous output from last cmd call as input for next cmd in pipe chain,
# if there is any
stdin = p.stdout if p else subprocess.PIPE
p = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, env=env, shell=False, cwd=cwd)
except OSError as e:
return (False, 'OS Error "{} {}" calling command "{}"'.format(e.errno, e.strerror, cmd))
except ValueError as e:
return (False, 'Value Error "{}" calling command "{}"'.format(e, cmd))
except Exception as e:
return (False, 'Unknown error "{}" while calling command "{}"'.format(e, cmd))
try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
outs, errs = p.communicate()
return (False, 'Timeout after {} seconds.'.format(timeout))
retc = p.returncode
return (True, (txt.to_text(stdout), txt.to_text(stderr), retc))