-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhuawei.py
207 lines (157 loc) · 8.8 KB
/
huawei.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# Description: This is a python script to read data from a Huawei Sun2000 inverter via Modbus TCP
# https://github.com/olivergregorius/sun2000_modbus
from sun2000_modbus import inverter
from sun2000_modbus import registers
# https://github.com/victronenergy/venus/wiki/howto-add-a-driver-to-Venus
from vedbus import VeDbusService
from dbus.mainloop.glib import DBusGMainLoop
try:
import gobject # Python 2.x
except:
from gi.repository import GLib as gobject # Python 3.x
import dbus
import dbus.service
import sys
import os
import platform
# our own packages
sys.path.insert(1, os.path.join(os.path.dirname(
__file__), '/opt/victronenergy/dbus-modem'))
# Again not all of these needed this is just duplicating the Victron code.
class SystemBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SYSTEM)
class SessionBus(dbus.bus.BusConnection):
def __new__(cls):
return dbus.bus.BusConnection.__new__(cls, dbus.bus.BusConnection.TYPE_SESSION)
def dbusconnection():
return SessionBus() if 'DBUS_SESSION_BUS_ADDRESS' in os.environ else SystemBus()
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
DBusGMainLoop(set_as_default=True)
class modbusQuerry:
def __init__(self):
self.thisInverter = inverter.Sun2000(host='192.168.169.38', port=502, unit=1)
self.thisInverter.connect()
print("intialising")
def _readData(self):
self.thisInverter.connect()
if(self.thisInverter.connected == False):
return True
# pv inverter
self.ac_c1 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseACurrent)
self.ac_c2 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseBCurrent)
self.ac_c3 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseCCurrent)
self.ac_v1 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseAVoltage)
self.ac_v2 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseBVoltage)
self.ac_v3 = self.thisInverter.read(registers.InverterEquipmentRegister.PhaseCVoltage)
self.pf = self.thisInverter.read(registers.InverterEquipmentRegister.PowerFactor)
self.power = self.thisInverter.read(registers.InverterEquipmentRegister.InputPower)
self.energy = self.thisInverter.read(registers.InverterEquipmentRegister.AccumulatedEnergyYield)
print("power: " + str(self.power))
return True
def _update(self):
if(self.thisInverter.connected == False):
dbusservice['pvinverter.pv0']['/Ac/L1/Power'] = 0
dbusservice['pvinverter.pv0']['/Ac/L2/Power'] = 0
dbusservice['pvinverter.pv0']['/Ac/L3/Power'] = 0
dbusservice['pvinverter.pv0']['/Ac/Power'] = 0
print("not connected")
return True
# pv inverter
if(self.ac_c1 is not None):
dbusservice['pvinverter.pv0']['/Ac/L1/Current'] = self.ac_c1
if(self.ac_c2 is not None):
dbusservice['pvinverter.pv0']['/Ac/L2/Current'] = self.ac_c2
if(self.ac_c3 is not None):
dbusservice['pvinverter.pv0']['/Ac/L3/Current'] = self.ac_c3
if(self.ac_v1 is not None):
dbusservice['pvinverter.pv0']['/Ac/L1/Voltage'] = self.ac_v1
if(self.ac_v2 is not None):
dbusservice['pvinverter.pv0']['/Ac/L2/Voltage'] = self.ac_v2
if(self.ac_v3 is not None):
dbusservice['pvinverter.pv0']['/Ac/L3/Voltage'] = self.ac_v3
if(self.power is not None):
dbusservice['pvinverter.pv0']['/Ac/Power'] = self.power
if(self.ac_v1 is not None and self.ac_c1 is not None and self.pf is not None):
self.ac_p1 = self.ac_v1*self.ac_c1*self.pf
if(self.ac_p1 > self.power/3):
self.ac_p1 = self.power/3
dbusservice['pvinverter.pv0']['/Ac/L1/Power'] = self.ac_p1
if(self.ac_v2 is not None and self.ac_c2 is not None and self.pf is not None):
self.ac_p2 = self.ac_v2*self.ac_c2*self.pf
if(self.ac_p2 > self.power/3):
self.ac_p2 = self.power/3
dbusservice['pvinverter.pv0']['/Ac/L2/Power'] = self.ac_p2
if(self.ac_v3 is not None and self.ac_c3 is not None and self.pf is not None):
self.ac_p3 = self.ac_v3*self.ac_c3*self.pf
if(self.ac_p3 > self.power/3):
self.ac_p3 = self.power/3
dbusservice['pvinverter.pv0']['/Ac/L3/Power'] = self.ac_p3
if(self.energy is not None):
dbusservice['pvinverter.pv0']['/Ac/Energy/Forward'] = self.energy
dbusservice['pvinverter.pv0']['/Ac/L1/Energy/Forward'] = round(self.energy/3.0, 2)
dbusservice['pvinverter.pv0']['/Ac/L2/Energy/Forward'] = round(self.energy/3.0, 2)
dbusservice['pvinverter.pv0']['/Ac/L3/Energy/Forward'] = round(self.energy/3.0, 2)
return True
# -----------------------------
# Here is the bit you need to create multiple new services - try as much as possible timplement the Victron Dbus API requirements.
def new_service(base, type, physical, id, instance):
self = VeDbusService("{}.{}.{}_id{:02d}".format(
base, type, physical, id), dbusconnection())
# Create the management objects, as specified in the ccgx dbus-api document
# Supported paths:
# https://github.com/victronenergy/venus/wiki/dbus#generic-paths
self.add_path('/Mgmt/ProcessName', __file__)
self.add_path('/Mgmt/ProcessVersion',
'Unknown version, and running on Python ' + platform.python_version())
self.add_path('/Connected', 1)
self.add_path('/HardwareVersion', 0)
def _kwh(p, v): return (str(v) + 'kWh')
def _a(p, v): return (str(v) + 'A')
def _w(p, v): return (str(v) + 'W')
def _v(p, v): return (str(v) + 'V')
def _c(p, v): return (str(v) + 'C')
if physical == 'pvinverter':
# Supported paths:
# https://github.com/victronenergy/venus/wiki/dbus#pv-inverters
self.add_path('/DeviceInstance', instance)
self.add_path('/FirmwareVersion', "Unknown")
# value used in self.ac_sensor_bridge.cpp of dbus-cgwacs
self.add_path('/ProductId', "Unknown")
self.add_path('/ProductName', "Huawei SUN2000")
self.add_path('/Ac/Energy/Forward', None, gettextcallback=_kwh)
self.add_path('/Ac/L1/Energy/Forward', None, gettextcallback=_kwh)
self.add_path('/Ac/L2/Energy/Forward', None, gettextcallback=_kwh)
self.add_path('/Ac/L3/Energy/Forward', None, gettextcallback=_kwh)
self.add_path('/Ac/Power', None, gettextcallback=_w)
self.add_path('/Ac/L1/Current', None, gettextcallback=_a)
self.add_path('/Ac/L2/Current', None, gettextcallback=_a)
self.add_path('/Ac/L3/Current', None, gettextcallback=_a)
self.add_path('/Ac/L1/Power', None, gettextcallback=_w)
self.add_path('/Ac/L2/Power', None, gettextcallback=_w)
self.add_path('/Ac/L3/Power', None, gettextcallback=_w)
self.add_path('/Ac/L1/Voltage', None, gettextcallback=_v)
self.add_path('/Ac/L2/Voltage', None, gettextcallback=_v)
self.add_path('/Ac/L3/Voltage', None, gettextcallback=_v)
self.add_path('/Ac/MaxPower', None, gettextcallback=_w)
self.add_path('/ErrorCode', None)
self.add_path('/Position', 0)
self.add_path('/StatusCode', None)
return self
dbusservice = {} # Dictonary to hold the multiple services
Querry = modbusQuerry()
# service defined by (base*, type*, id*, instance):
# * items are include in service name
# Create all the dbus-services we want
dbusservice['pvinverter.pv0'] = new_service(
'com.victronenergy', 'pvinverter.pv0', 'pvinverter', 0, 20)
# Everything done so just set a time to run an update function to update the data values every 1 second
gobject.timeout_add(10000, Querry._update)
print("Connected to dbus, and switching over to gobject.MainLoop() (= event based)")
mainloop = gobject.MainLoop()
mainloop.run()
if __name__ == "__main__":
try:
Querry._readData()
except Exception as ex:
print("Issues querying Inverter -ERROR :", ex)