From f89ffd3e5cd1d9cd35909311b63d12e078e22c6c Mon Sep 17 00:00:00 2001 From: Joost Yervante Damad Date: Mon, 3 Mar 2014 14:56:41 +0100 Subject: [PATCH] re-organize code --- ble.py | 79 ++++++++++++++++++++++++++++++++-------------- collect.py | 13 ++++---- data/__init__.py | 20 ++++++++++++ data/productize.py | 22 +++++++++++++ gui.py | 3 +- productize.py | 46 --------------------------- 6 files changed, 104 insertions(+), 79 deletions(-) create mode 100644 data/__init__.py create mode 100644 data/productize.py diff --git a/ble.py b/ble.py index 8a39bea..33b860d 100644 --- a/ble.py +++ b/ble.py @@ -3,33 +3,15 @@ import bglib from PySide import QtCore - +# https://www.bluetooth.org/en-us/specification/assigned-numbers/generic-access-profile +GAP_AD_TYPE_FLAGS = 0x01 +GAP_AD_TYPE_LOCALNAME_COMPLETE = 0x09 +GAP_AD_TYPE_TX_POWER = 0x0A +GAP_AD_TYPE_SLAVE_CON_INTERVAL_RANGE = 0x12 +GAP_AD_TYPE_VENDOR = 0xFF UUID = dict( - generic = ([0x18, 0x00], "Generic Access"), - generic_attr = ([0x18, 0x01], "Generic Attribute"), - immediate_alert = ([0x18, 0x02], "Immediate Alert"), - link_loss = ([0x18, 0x03], "Link Loss"), - tx_power = ([0x18, 0x04], "Tx Power"), - time = ([0x18, 0x05], "Current Time"), - ref_time = ([0x18, 0x06], "Reference Time Update"), - next_dst = ([0x18, 0x07], "Next DST Change Service"), - glucose = ([0x18, 0x08], "Glucose"), - thermo = ([0x18, 0x09], "Health Thermometer"), - device_info = ([0x18, 0x0A], "Device Information"), - heart_rate = ([0x18, 0x0D], "Heart Rate"), - phone_alert = ([0x18, 0x0E], "Phone Alert Status"), - battery = ([0x18, 0x0F], "Battery"), - blood = ([0x18, 0x10], "Blood Pressure"), - alert = ([0x18, 0x11], "Alert Notification"), - human_interface = ([0x18, 0x12], "Human Interface"), - scan_para = ([0x18, 0x13], "Scan Parameters"), - run_speed = ([0x18, 0x14], "Running Speed and Cadence"), - cycle_speed = ([0x18, 0x16], "Cycling Speed and Cadence"), - cycle_pow = ([0x18, 0x18], "Cycling Power"), - loc_nav = ([0x18, 0x19], "Location and Navigation"), - primary = ([0x28, 0x00], "Primary" ), secundary = ([0x28, 0x01], "Secundary"), include = ([0x28, 0x02], "Include"), @@ -89,12 +71,14 @@ class BLE(QtCore.QObject): CONNECTED = 0 def __init__(self, baud_rate, packet_mode = False): + import data super(BLE, self).__init__() self.led = False self.port = None self.baud_rate = baud_rate self.packet_mode = packet_mode self.address = None + self.uuid = data.UUID() def address_response(self, sender, args): self.address = ':'.join(['%02X' % b for b in args['address'][::-1]]) @@ -239,3 +223,50 @@ def primary_service_discovery(self, handle): def find_information(self, handle, start, end): self.send_command(self.ble.ble_cmd_attclient_find_information(handle, start, end)) + + def flags_to_string(self, val): + flags = [] + if val & (2**0) > 0: + flags.append('LE_lim') + if val & (2**1) > 0: + flags.append('LE_gen') + if val & (2**2) > 0: + flags.append('no_BR/EDR') + if val & (2**3) > 0: + flags.append('sim1') + if val & (2**4) > 0: + flags.append('sim2') + return '|'.join(flags) + + def data_to_string(self, data): + pos = 0 + dl = [] + name = '' + while pos < len(data): + field_len = data[pos] + field_type = data[pos+1] + field_data = data[pos+2:(pos+2+field_len-1)] + if field_type == GAP_AD_TYPE_FLAGS: + dl.append("flags:%s" % self.flags_to_string(field_data[0])) + elif field_type == GAP_AD_TYPE_LOCALNAME_COMPLETE: + name = ''.join(['%c' % b for b in field_data]) + # dl.append("name:%s" % name) + elif field_type == GAP_AD_TYPE_TX_POWER: + dl.append("tx:%ddB" % (field_data[0])) + elif field_type == GAP_AD_TYPE_SLAVE_CON_INTERVAL_RANGE: + x1 = (field_data[0] + 256*field_data[1])*1.25 + x2 = (field_data[2] + 256*field_data[3])*1.25 + dl.append('slv_con_int_ran:%d-%dms' % (x1, x2)) + elif field_type == GAP_AD_TYPE_VENDOR: + dl.append(self.uuid.vendor_to_string(field_data)) + else: + dl.append("unknown_field:0x%x" % field_type) + pos += field_len + 1 + return (name, ' '.join(dl)) + +# https://www.bluetooth.org/en-us/specification/assigned-numbers/generic-access-profile +GAP_AD_TYPE_FLAGS = 0x01 +GAP_AD_TYPE_LOCALNAME_COMPLETE = 0x09 +GAP_AD_TYPE_TX_POWER = 0x0A +GAP_AD_TYPE_SLAVE_CON_INTERVAL_RANGE = 0x12 +GAP_AD_TYPE_VENDOR = 0xFF diff --git a/collect.py b/collect.py index 8f791eb..46dad59 100755 --- a/collect.py +++ b/collect.py @@ -9,10 +9,10 @@ from PySide import QtGui, QtCore from PySide.QtCore import Qt -from productize import parse_data +from data import UUID from ble import BLE, ActivityThread -def print_scan_response(args): +def print_scan_response(ble, args): print "gap_scan_response", t = datetime.datetime.now() disp_list = [] @@ -22,7 +22,7 @@ def print_scan_response(args): disp_list.append("from: %s" % ''.join(['%02X' % b for b in args["sender"][::-1]])) disp_list.append("adt: %d" % args["address_type"]) disp_list.append("bond: %d" % args["bond"]) - disp_list.append("name: %s data: %s" % parse_data(args['data'])) + disp_list.append("name: %s data: %s" % ble.data_to_string(args['data'])) print ' '.join(disp_list) def run(): @@ -35,12 +35,11 @@ def run(): signal.signal(signal.SIGINT, signal.SIG_DFL) - port_name = "/dev/ttyACM0" baud_rate = 115200 - - ble = BLE(port_name, baud_rate) + ble = BLE(baud_rate) + ble.start() ct = ActivityThread(ble) - ble.scan_response.connect(print_scan_response) + ble.scan_response.connect(lambda x: print_scan_response(ble, x)) ct.start() return app.exec_() diff --git a/data/__init__.py b/data/__init__.py new file mode 100644 index 0000000..a11a431 --- /dev/null +++ b/data/__init__.py @@ -0,0 +1,20 @@ +import services, productize + +class UUID: + + def __init__(self): + self.service = {} + self.attr = {} + self.vendor = [] + sources = [services, productize] + for source in sources: + self.service.update(source.service) + self.attr.update(source.attr) + self.vendor.append(source.Vendor()) + + def vendor_to_string(self, data): + vendor_id = data[0] + 256*data[1] + for v in self.vendor: + if vendor_id in v.ids: + return v.to_string(data[2:]) + return "vendor:%04X" % vendor_id diff --git a/data/productize.py b/data/productize.py new file mode 100644 index 0000000..2a15d16 --- /dev/null +++ b/data/productize.py @@ -0,0 +1,22 @@ +service = dict() + +attr = dict() + +class Vendor: + + ids = [0xFFFF] + + def _parse_product(self, product_id, data): + if product_id == 1: + raw_lux = data[0] + 256*data[1] + luxf = raw_lux*1.24/2048 + luxf = luxf / (10E-6*27.4E3) + luxf = 10 **luxf + return "lux:%d" % int(luxf) + return "unknown_product" + + def to_string(self, data): + product_id = data[0] + 256*data[1] + return self._parse_product(product_id, data[2:]) + + diff --git a/gui.py b/gui.py index aece7df..c22069f 100755 --- a/gui.py +++ b/gui.py @@ -7,7 +7,6 @@ from PySide import QtGui, QtCore from PySide.QtCore import Qt -from productize import parse_data import ble from ble import BLE @@ -194,7 +193,7 @@ def s(x): time_ = time.strftime("%H:%M:%S %d/%m/%Y", time.localtime()) ftime = s(time_) sender = ':'.join(['%02X' % b for b in args["sender"][::-1]]) - name, data = parse_data(args['data']) + name, data = ble.parse_data(args['data']) ident = "%s_%s_%s" % (sender, name, data) ftime.setData(ident) fsender = s(sender) diff --git a/productize.py b/productize.py index 6f63a0c..86072af 100644 --- a/productize.py +++ b/productize.py @@ -1,10 +1,3 @@ -# https://www.bluetooth.org/en-us/specification/assigned-numbers/generic-access-profile - -GAP_AD_TYPE_FLAGS = 0x01 -GAP_AD_TYPE_LOCALNAME_COMPLETE = 0x09 -GAP_AD_TYPE_TX_POWER = 0x0A -GAP_AD_TYPE_SLAVE_CON_INTERVAL_RANGE = 0x12 -GAP_AD_TYPE_VENDOR = 0xFF def parse_product(product_id, data): if product_id == 1: @@ -23,42 +16,3 @@ def parse_vendor(data): else: return "vendor:%04X" % vendor -def parse_flags(val): - flags = [] - if val & (2**0) > 0: - flags.append('LE_lim') - if val & (2**1) > 0: - flags.append('LE_gen') - if val & (2**2) > 0: - flags.append('no_BR/EDR') - if val & (2**3) > 0: - flags.append('sim1') - if val & (2**4) > 0: - flags.append('sim2') - return '|'.join(flags) - -def parse_data(data): - pos = 0 - dl = [] - name = '' - while pos < len(data): - field_len = data[pos] - field_type = data[pos+1] - field_data = data[pos+2:(pos+2+field_len-1)] - if field_type == GAP_AD_TYPE_FLAGS: - dl.append("flags:%s" % parse_flags(field_data[0])) - elif field_type == GAP_AD_TYPE_LOCALNAME_COMPLETE: - name = ''.join(['%c' % b for b in field_data]) - # dl.append("name:%s" % name) - elif field_type == GAP_AD_TYPE_TX_POWER: - dl.append("tx:%ddB" % (field_data[0])) - elif field_type == GAP_AD_TYPE_SLAVE_CON_INTERVAL_RANGE: - x1 = (field_data[0] + 256*field_data[1])*1.25 - x2 = (field_data[2] + 256*field_data[3])*1.25 - dl.append('slv_con_int_ran:%d-%dms' % (x1, x2)) - elif field_type == GAP_AD_TYPE_VENDOR: - dl.append(parse_vendor(field_data)) - else: - dl.append("unknown_field:0x%x" % field_type) - pos += field_len + 1 - return (name, ' '.join(dl))