-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpx2_shell.py
executable file
·102 lines (87 loc) · 2.99 KB
/
px2_shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/python3
# SPDX-License-Identifier: BSD-3-Clause
#
# Copyright 2012 Raritan Inc. All rights reserved.
import time, datetime, sys, os, atexit, code, getpass
import rlcompleter
try:
import readline
except:
# no readline support, error message will be generated when used below
pass
sys.path.append("pdu-python-api")
import raritan.rpc
from raritan.rpc import Agent, idl
from raritan.rpc import net, firmware, cfg, session, event
from raritan.rpc import pdumodel, tfw, test, sensors, usermgmt
def parse_url(url):
scheme = 'https'
username = 'admin'
password = 'raritan'
hostname = url
parts = hostname.split('://')
if len(parts) >= 2:
(scheme, hostname) = parts
parts = hostname.split('@')
if len(parts) >= 2:
(username, hostname) = parts
parts = username.split(':')
if len(parts) >= 2:
(username, password) = parts
return (scheme, hostname, username, password)
try:
(scheme, hostname, username, password) = parse_url(sys.argv[1])
except:
print("Usage: px2_shell.py [scheme://][user[:password]@]host")
sys.exit(1)
try:
readline.parse_and_bind('tab: complete')
history = os.path.join(os.environ["HOME"], ".px2_shell_history")
try:
readline.read_history_file(history)
except IOError:
pass
atexit.register(readline.write_history_file, history)
except:
print("Sorry, no readline support!")
agent = Agent(scheme, hostname, username, password, disable_certificate_verification = True)
try:
pdumodel.Pdu("/model/pdu/0", agent).getMetaData()
except raritan.rpc.HttpException as e:
if "HTTP Error 451" in str(e):
print("Changing the default password for user 'admin' is required.")
admin = usermgmt.User("/auth/user/admin", agent)
password = getpass.getpass()
admin.setAccountPassword(password)
agent.passwd = password
# selected sysrpc proxies
net_proxy = net.Net("/net", agent)
firmware_proxy = firmware.Firmware("/firmware", agent)
cfg_proxy = cfg.Cfg("/cfg", agent)
session_manager = session.SessionManager("/session", agent)
event_engine = event.Engine("/event_engine", agent)
event_service = event.Service("/eventservice", agent)
# selected modelrpc proxies
core = tfw.CoreCtrl("/model/core", agent)
unit = pdumodel.Unit("/model/unit", agent)
pdu = pdumodel.Pdu("/model/pdu/0", agent)
inlets = pdu.getInlets()
ocps = pdu.getOverCurrentProtectors()
outlets = pdu.getOutlets()
def poll_events(types = None):
if types == None:
types = [ raritan.rpc.idl.Event ]
elif not isinstance(types, list):
types = [ types ]
channel = event_service.createChannel()
channel.demandEventTypes(types)
print("Polling for events, Ctrl-C to stop ...")
try:
while True:
_, events = channel.pollEvents()
ts = datetime.datetime.now().strftime("[%H:%M:%S]")
for event in events:
print("%s %s" % (ts, event))
except KeyboardInterrupt:
print()
code.interact(local=globals())