From a8b1205ce648463f68421756f255b2208a3be155 Mon Sep 17 00:00:00 2001 From: daniel-leicht Date: Wed, 25 Sep 2019 18:30:17 +0300 Subject: [PATCH] Added support for GetAttributeSingle identity. Raises exception on LinkToSelf error. --- README.md | 2 +- pylogix/__init__.py | 2 +- pylogix/eip.py | 238 +++++++++++++++++++++++++++++++++----------- setup.py | 6 +- 4 files changed, 187 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index c40783d..df6bcb8 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ There are currently no dependencies so you can get going quickly without having You can clone the repo with the following: ``` -git clone https://github.com/dmroeder/pylogix.git +git clone https://github.com/daniel-leicht/pylogix.git cd pylogix python setup.py install --user ``` diff --git a/pylogix/__init__.py b/pylogix/__init__.py index df46f5b..d6d217f 100644 --- a/pylogix/__init__.py +++ b/pylogix/__init__.py @@ -1,4 +1,4 @@ from .lgxDevice import LGXDevice -from .eip import PLC +from .eip import PLC, RouteAddressToSelfError, CIPConnectionError __version_info__ = (0, 5, 1) __version__ = '.'.join(str(x) for x in __version_info__) diff --git a/pylogix/eip.py b/pylogix/eip.py index 52f68a4..969c97c 100644 --- a/pylogix/eip.py +++ b/pylogix/eip.py @@ -26,8 +26,16 @@ from random import randrange from struct import pack, unpack_from -class PLC: +class RouteAddressToSelfError(IOError): + pass + + +class CIPConnectionError(IOError): + pass + + +class PLC: def __init__(self): ''' Initialize our parameters @@ -161,11 +169,36 @@ def Discover(self): ''' return self._discover() - def GetModuleProperties(self, slot=None, custom_routing_path=None): - ''' - Get the properties of module in specified slot - ''' - return self._getModuleProperties(slot=slot, custom_routing_path=custom_routing_path) + def GetModuleProperties(self, slot=None, custom_routing_path=None, basic=False): + ''' + Get the properties of module in specified slot + If CIP routing path is known it can be specified in custom_routing_path as a list of tuples: (port, slot) + By default GetModuleProperties used "GetAttributeAll" to retrive the identity, some modules + (such as DeviceNet modules) will only answer to "GetAttributeSingle", use basic=True to query the basic + attibutes using "GetAttributeSingle". + ''' + if basic: + IDENTITY_ATTRIBUTES = { + "VendorID": (0x01, "word"), + "DeviceID": (0x02, "word"), + "ProductCode": (0x03, "word"), + "Revision": (0x04, "revision"), + "Status": (0x05, "word"), + "SerialNumber": (0x06, "dword"), + "ProductName": (0x07, "string"), + "State": (0x08, "word"), + } + module_object = LGXDevice() + value_list = list() + for attribute_name, attribute_info in IDENTITY_ATTRIBUTES.items(): + attr_value = self._getModuleProperty(attribute_info, slot=slot, custom_routing_path=custom_routing_path) + if (attribute_name == "VendorID") and (attr_value == None): + return Response(None, LGXDevice(), 1) + value_list.append(attr_value) + setattr(module_object, attribute_name, attr_value) + return Response(None, module_object, 0) + else: + return self._getModuleProperties(slot=slot, custom_routing_path=custom_routing_path) def Close(self): ''' @@ -178,7 +211,7 @@ def _readTag(self, tag, elements, dt): processes the read request ''' self.Offset = 0 - + if not self._connect(): return None t,b,i = _parseTagName(tag, 0) @@ -208,7 +241,7 @@ def _readTag(self, tag, elements, dt): # everything else tagData = self._buildTagIOI(tag, isBoolArray=False) readRequest = self._addReadIOI(tagData, elements) - + eipHeader = self._buildEIPHeader(readRequest) status, retData = self._getBytes(eipHeader) @@ -216,7 +249,7 @@ def _readTag(self, tag, elements, dt): return_value = self._parseReply(tag, elements, retData) return Response(tag, return_value, status) else: - return Response(tag, None, status) + return Response(tag, None, status) def _writeTag(self, tag, value, dt): ''' @@ -248,7 +281,7 @@ def _writeTag(self, tag, value, dt): writeData.append(self._makeString(v)) else: writeData.append(int(v)) - + # write a bit of a word, boolean array or everything else if BitofWord(tag): tagData = self._buildTagIOI(tag, isBoolArray=False) @@ -259,12 +292,12 @@ def _writeTag(self, tag, value, dt): else: tagData = self._buildTagIOI(tag, isBoolArray=False) writeRequest = self._addWriteIOI(tagData, writeData, dataType) - + eipHeader = self._buildEIPHeader(writeRequest) status, retData = self._getBytes(eipHeader) return Response(tag, value, status) - + def _multiRead(self, tags): ''' Processes the multiple read request @@ -296,7 +329,7 @@ def _multiRead(self, tags): header = self._buildMultiServiceHeader() segmentCount = pack(' 2: temp += (tagCount-2)*2 @@ -306,7 +339,7 @@ def _multiRead(self, tags): for i in range(tagCount): segments += serviceSegments[i] - for i in range(tagCount-1): + for i in range(tagCount-1): temp += len(serviceSegments[i]) offsets += pack(' it should be a list with tuples containing pairs of port number & slot address. + """ + custom_route_path_size = len(custom_routing_path) + route_path = pack('<2B', + custom_route_path_size, + Reserved) + + for routing_pair in custom_routing_path: + assert len(routing_pair) == 2, "Routing path should be list of 2-tuples." + port_segment = 0b00001111 & routing_pair[0] + link_address = routing_pair[1] + route_path += pack("<2B", + port_segment, + link_address) + + AttributePacket += route_path + + frame = self._buildCIPUnconnectedSend(ServiceSize=12) + AttributePacket + eipHeader = self._buildEIPSendRRDataHeader(len(frame)) + frame + pad = pack('