Skip to content

Commit

Permalink
GetTagList gets UDT information, tag.DataType will now return the nam…
Browse files Browse the repository at this point in the history
…e of the data type
  • Loading branch information
dmroeder committed Jan 31, 2019
1 parent db7629e commit 49df931
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 32 deletions.
3 changes: 3 additions & 0 deletions changelog
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
01/31/19
-GetTagList gets UDT names so the tag list can contain the name of the data types

01/07/19
-Another fix for dealing with bool arrays

Expand Down
221 changes: 189 additions & 32 deletions eip.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

from datetime import datetime, timedelta
from lgxDevice import *
import math
from random import randrange
import socket
from struct import *
import sys
import time

programNames = []
taglist = []


class PLC:

Expand All @@ -52,6 +51,7 @@ def __init__(self):
self.SequenceCounter = 1
self.Offset = 0
self.KnownTags = {}
self.TagList = []
self.StructIdentifier = 0x0fCE
self.Version = '0.1.0'
self.CIPTypes = {160:(88 ,"STRUCT", 'B'),
Expand Down Expand Up @@ -127,8 +127,10 @@ def GetTagList(self, allTags = True):
_getAllProgramsTags(self)
else:
_getTagList(self)

_getUDT(self)

return taglist
return self.TagList

def GetProgramTagList(self, programName):
'''
Expand All @@ -142,7 +144,9 @@ def GetProgramTagList(self, programName):

# Get a single program tags if progragName exists
if programName in programNames:
return _getProgramTagList(self, programName)
program_tags = _getProgramTagList(self, programName)
_getUDT(self)
return self.TagList
if programName not in programNames:
print("Program not found, please check name!")
return None
Expand Down Expand Up @@ -176,12 +180,17 @@ def Close(self):
'''
return _closeConnection(self)

class LGXTag():
class LgxTag:

def __init__(self):
self.TagName = ""
self.Offset = 0
self.DataType = ""
self.TagName = ''
self.InstanceID = 0x00
self.SymbolType = 0x00
self.DataTypeValue = 0x00
self.DataType = ''
self.Array = 0x00
self.Struct = 0x00
self.Size = 0x00

def _readTag(self, tag, elements, dt):
'''
Expand Down Expand Up @@ -419,7 +428,7 @@ def _getTagList(self):

self.Offset = 0
del programNames[:]
del taglist[:]
del self.TagList[:]

request = _buildTagListRequest(self, programName=None)
eipHeader = _buildEIPHeader(self, request)
Expand All @@ -432,9 +441,8 @@ def _getTagList(self):
eipHeader = _buildEIPHeader(self, request)
status, retData = _getBytes(self, eipHeader)
extractTagPacket(self, retData, programName=None)
time.sleep(0.25)

return taglist
return

def _getAllProgramsTags(self):
'''
Expand All @@ -459,9 +467,8 @@ def _getAllProgramsTags(self):
eipHeader = _buildEIPHeader(self, request)
status, retData = _getBytes(self, eipHeader)
extractTagPacket(self, retData, programName)
time.sleep(0.25)

return taglist
return

def _getProgramTagList(self, programName):
'''
Expand All @@ -470,7 +477,7 @@ def _getProgramTagList(self, programName):
if not _connect(self): return None

self.Offset = 0
del taglist[:]
del self.TagList[:]

request = _buildTagListRequest(self, programName)
eipHeader = _buildEIPHeader(self, request)
Expand All @@ -483,10 +490,139 @@ def _getProgramTagList(self, programName):
eipHeader = _buildEIPHeader(self, request)
status, retData = _getBytes(self, eipHeader)
extractTagPacket(self, retData, programName)
time.sleep(0.25)

return taglist
return

def _getUDT(self):

# get only tags that are a struct
struct_tags = [x for x in self.TagList if x.Struct == 1]
# reduce our struct tag list to only unique instances
seen = set()
unique = [obj for obj in struct_tags if obj.DataTypeValue not in seen and not seen.add(obj.DataTypeValue)]

template = {}
for u in unique:
y = _getTemplateAttribute(self, u.DataTypeValue)

val = unpack_from('<I', y[46:], 10)[0]
x = (val * 4) - 23
size = int(math.ceil(x / 4.0)) * 4
member_count = int(unpack_from('<H', y[46:], 24)[0])

template[u.DataTypeValue] = [size, '', member_count]

for key,value in template.items():
t = _getTemplate(self, key, value[0])
size = value[2] * 8
p = t[50:]
memberBytes = p[size:]
split_char = pack('<b', 0x00)
members = memberBytes.split(split_char)
split_char = pack('<b', 0x3b)
name = members[0].split(split_char)[0]
template[key][1] = str(name.decode('utf-8'))

for tag in self.TagList:
if tag.DataTypeValue in template:
tag.DataType = template[tag.DataTypeValue][1]
elif tag.SymbolType in self.CIPTypes:
tag.DataType = self.CIPTypes[tag.SymbolType][1]
return

def _getTemplateAttribute(self, instance):
'''
Get the attributes of a UDT
'''

if not _connect(self): return None

readRequest = _buildTemplateAttributes(instance)
eipHeader = _buildEIPHeader(self, readRequest)
status, retData = _getBytes(self, eipHeader)
return retData

def _getTemplate(self, instance, dataLen):
'''
Get the members of a UDT so we can get it
'''

if not _connect(self): return None

readRequest = _readTemplateService(instance, dataLen)
eipHeader = _buildEIPHeader(self, readRequest)
status, retData = _getBytes(self, eipHeader)
return retData

def _buildTemplateAttributes(instance):

TemplateService = 0x03
TemplateLength = 0x03
TemplateClassType = 0x20
TemplateClass = 0x6c
TemplateInstanceType = 0x25
TemplateInstance = instance
AttribCount = 0x04
Attrib4 = 0x04
Attrib3 = 0x03
Attrib2 = 0x02
Attrib1 = 0x01

return pack('<BBBBHHHHHHH',
TemplateService,
TemplateLength,
TemplateClassType,
TemplateClass,
TemplateInstanceType,
TemplateInstance,
AttribCount,
Attrib4,
Attrib3,
Attrib2,
Attrib1)

def _buildTemplateService(instance, dataLen):

TemplateService = 0x4c
TemplateLength = 0x03
TemplateClassType = 0x20
TemplateClass = 0x6c
TemplateInstanceType = 0x25
TemplateInstance = instance
TemplateOffset = 0x00
DataLength = dataLen

return pack('<BBBBHHIH',
TemplateService,
TemplateLength,
TemplateClassType,
TemplateClass,
TemplateInstanceType,
TemplateInstance,
TemplateOffset,
DataLength)

def _readTemplateService(instance, dataLen):

TemplateService = 0x4c
TemplateLength = 0x03
TemplateClassType = 0x20
TemplateClass = 0x6c
TemplateInstanceType = 0x25
TemplateInstance = instance
TemplateOffset = 0x00
DataLength = dataLen

return pack('<BBBBHHIH',
TemplateService,
TemplateLength,
TemplateClassType,
TemplateClass,
TemplateInstanceType,
TemplateInstance,
TemplateOffset,
DataLength)

def _discover():
devices = []
request = _buildListIdentity()
Expand Down Expand Up @@ -1127,9 +1263,9 @@ def _buildTagListRequest(self, programName):
PathSegmentLen = int(len(PathSegment)/2)
AttributeCount = 0x03
SymbolType = 0x02
ByteCount = 0x07
ByteCount = 0x08
SymbolName = 0x01
Attributes = pack('<HHHH', AttributeCount, SymbolType, ByteCount, SymbolName)
Attributes = pack('<HHHH', AttributeCount, SymbolName, SymbolType, ByteCount)
TagListRequest = pack('<BB', Service, PathSegmentLen)
TagListRequest += PathSegment + Attributes

Expand Down Expand Up @@ -1489,33 +1625,54 @@ def extractTagPacket(self, data, programName):

while packetStart < len(data):
# get the length of the tag name
tagLen = unpack_from('<H', data, packetStart+8)[0]
tagLen = unpack_from('<H', data, packetStart+4)[0]
# get a single tag from the packet
packet = data[packetStart:packetStart+tagLen+10]
packet = data[packetStart:packetStart+tagLen+20]
# extract the offset
self.Offset = unpack_from('<H', packet, 0)[0]
# add the tag to our tag list
tag = parseLgxTag(packet, programName)
tag = parseLgxTag(self, packet, programName)

# filter out garbage
if "__DEFVAL_" and "Routine:" not in tag.TagName:
taglist.append(tag)
if '__DEFVAL_' in tag.TagName:
pass
elif 'Routine:' in tag.TagName:
pass
elif 'Map:' in tag.TagName:
pass
elif 'Task:' in tag.TagName:
pass
else:
self.TagList.append(tag)
if not programName:
if 'Program:' in tag.TagName:
programNames.append(tag.TagName)
# increment ot the next tag in the packet
packetStart = packetStart+tagLen+10
packetStart = packetStart+tagLen+20

def parseLgxTag(packet, programName):
tag = LGXTag()
length = unpack_from('<H', packet, 8)[0]
def parseLgxTag(self, packet, programName):

t = LgxTag()
length = unpack_from('<H', packet, 4)[0]
name = packet[6:length+6].decode('utf-8')
if programName:
tag.TagName = str(programName + '.' + packet[10:length+10].decode('utf-8'))
t.TagName = str(programName + '.' + name)
else:
tag.TagName = str(packet[10:length+10].decode('utf-8'))
tag.Offset = unpack_from('<H', packet, 0)[0]
tag.DataType = unpack_from('<B', packet, 4)[0]
t.TagName = str(name)
t.InstanceID = unpack_from('<H', packet, 0)[0]

val = unpack_from('<H', packet, length+6)[0]

return tag
t.SymbolType = val & 0xff
t.DataTypeValue = val & 0xfff
t.Array = (val & 0x6000) >> 13
t.Struct = (val & 0x8000) >> 15

if t.Array:
t.Size = unpack_from('<H', packet, length+8)[0]
else:
t.Size = 0
return t

cipErrorCodes = {0x00: 'Success',
0x01: 'Connection failure',
Expand Down

0 comments on commit 49df931

Please sign in to comment.