Skip to content

Commit

Permalink
replace inline in a cleverder way
Browse files Browse the repository at this point in the history
  • Loading branch information
andete committed Feb 25, 2014
1 parent 7038bce commit 27add30
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 24 deletions.
65 changes: 54 additions & 11 deletions ble.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,15 @@

class BLE(QtCore.QObject):
scan_response = QtCore.Signal(dict)
connection_status = QtCore.Signal(int, str, int)
timeout = QtCore.Signal()

CONNECTED = 0

uuid_primary = [0x28, 0x00] # 0x2800
uuid_secundary = [0x28, 0x01] # 0x2801
uuid_client_characteristic_configuration = [0x29, 0x02] # 0x2902

def __init__(self, port, baud_rate, packet_mode = False):
super(BLE, self).__init__()
self.led = False
Expand All @@ -23,25 +30,33 @@ def __init__(self, port, baud_rate, packet_mode = False):
self.ser.flushOutput()
ser = self.ser
# disconnect if we are connected already
ble.send_command(ser, ble.ble_cmd_connection_disconnect(0))
self.send_command(ble.ble_cmd_connection_disconnect(0))
ble.check_activity(ser, 1)
# stop advertising if we are advertising already
ble.send_command(ser, ble.ble_cmd_gap_set_mode(0, 0))
self.send_command(ble.ble_cmd_gap_set_mode(0, 0))
ble.check_activity(ser, 1)
# stop scanning if we are scanning already
ble.send_command(ser, ble.ble_cmd_gap_end_procedure())
self.send_command(ble.ble_cmd_gap_end_procedure())
ble.check_activity(ser, 1)
# set scan parameters
ble.send_command(ser, ble.ble_cmd_gap_set_scan_parameters(0xC8, 0xC8, 1))
self.send_command(ble.ble_cmd_gap_set_scan_parameters(0xC8, 0xC8, 1))
ble.check_activity(ser, 1)
# start scanning now
ble.send_command(ser, ble.ble_cmd_gap_discover(1))
self.send_command(ble.ble_cmd_gap_discover(1))
ble.check_activity(ser, 1)
# IO port stuff for LED; doesn't work currently
ble.ble_cmd_hardware_io_port_config_pull(0, 0, 0)
ble.ble_cmd_hardware_io_port_config_direction(0, 1)
ble.ble_cmd_hardware_io_port_config_function(0, 0)
ble.ble_cmd_hardware_io_port_write(0, 1, 0)
self.send_command(ble.ble_cmd_hardware_io_port_config_pull(0, 0, 0))
self.send_command(ble.ble_cmd_hardware_io_port_config_direction(0, 1))
self.send_command(ble.ble_cmd_hardware_io_port_config_function(0, 0))
self.send_command(ble.ble_cmd_hardware_io_port_write(0, 1, 0))
# handle connections
self.ble.ble_evt_connection_status += self.handle_connection_status
# handle service info
self.ble.ble_evt_attclient_group_found += self.handle_attclient_group_found


def send_command(self, cmd):
return self.ble.send_command(self.ser, cmd)

def timeout(self, sender, args):
# might want to try the following lines to reset, though it probably
Expand All @@ -53,12 +68,40 @@ def timeout(self, sender, args):

def handle_scan_response(self, sender, args):
if self.led == False:
self.ble.ble_cmd_hardware_io_port_write(0, 1, 1)
self.send_command(self.ble.ble_cmd_hardware_io_port_write(0, 1, 1))
self.led = True
else:
self.ble.ble_cmd_hardware_io_port_write(0, 1, 0)
self.send_command(self.ble.ble_cmd_hardware_io_port_write(0, 1, 0))
self.led = False
self.scan_response.emit(args)

def handle_connection_status(self, sender, args):
print sender, args
if (args['flags'] & 0x05) == 0x05:
f = ':'.join(['%02X' % b for b in args['address'][::-1]])
h = args['connection']
print "Connected to %d %s" % (h, f)
self.connection_status.emit(h, f, self.CONNECTED)

def handle_attclient_group_found(self, sender, args):
uuid = reversed(args['uuid'])
print uuid

def check_activity(self):
return self.ble.check_activity(self.ser)

def connect_direct(self, target):
print "connecting to", target
address = target
addr_type = 0 # public
timeout = 10 # 1 sec
slave_latency = 0 # disabled
conn_interval_min = 100/1.25 # in ms
conn_interval_max = 1000/1.25 # in ms
self.send_command(self.ble.ble_cmd_gap_connect_direct(
address, addr_type, conn_interval_min,
conn_interval_max, timeout, slave_latency))

def primary_service_discovery(self, handle):
self.send_command(self.ble.ble_cmd_attclient_read_by_group_type(handle, 0x0001, 0xFFFF, list(reversed(BLE.uuid_service))))

40 changes: 27 additions & 13 deletions gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,56 @@ def _add(text, slot = None):
self.collect_view.setSelectionModel(self.selection_model)
self.selection_model.currentRowChanged.connect(self.row_changed)
self.selected_device = None

return self.collect_view

def make_device_widget(self, handle, mac):
return QtGui.QLabel("%d %s" % (handle, mac))

def tab_changed(self, i):
print "tab changed", i
pass

def row_changed(self, current, previous):
self.selected_device = self.collect_model.item(current.row(), 1).data(Qt.DisplayRole)
self.selected_device_raw = self.collect_model.item(current.row(), 1).data()

def run_collection(self):
self.collect_thread = CollectThread(self.ble)
self.ble.scan_response.connect(self.scan_response)
self.ble.connection_status.connect(self.connection_status)
self.collect_thread.start()

def scan_response(self, args):
s = QtGui.QStandardItem
time_ = time.strftime("%H:%M:%S %d/%m/%Y", time.localtime())
ftime = s(time_)
sender = ''.join(['%02X' % b for b in args["sender"][::-1]])
sender = ':'.join(['%02X' % b for b in args["sender"][::-1]])
name, data = parse_data(args['data'])
ident = "%s_%s_%s" % (sender, name, data)
ftime.setData(ident)
# TODO: only keep latest x? from one mac
# or possibly just replace identical doubles
self.collect_model.insertRow(0, [ftime, s(sender), s(name), s(data)])
self.collect_view.resizeColumnToContents(0)
self.collect_view.resizeColumnToContents(1)
self.collect_view.resizeColumnToContents(2)
self.collect_view.resizeColumnToContents(3)
for x in range(1, self.collect_model.rowCount()):
fsender = s(sender)
fsender.setData(args["sender"])
replaced = False
for x in range(0, self.collect_model.rowCount()):
if self.collect_model.item(x, 0).data() == ident:
self.collect_model.takeRow(x)
self.collect_model.item(x, 0).setData(time_, Qt.DisplayRole)
replaced = True
break # only one identical to remove normally
if not replaced:
self.collect_model.insertRow(0, [ftime, fsender, s(name), s(data)])
self.collect_view.resizeColumnToContents(0)
self.collect_view.resizeColumnToContents(1)
self.collect_view.resizeColumnToContents(2)
self.collect_view.resizeColumnToContents(3)

def connection_status(self, handle, mac, status):
print "connection_status called", status
if status == BLE.CONNECTED:
self.qtab.setCurrentIndex(self.qtab.addTab(self.make_device_widget(handle, mac), mac))
self.ble.primary_service_discovery(handle)

def connect(self):
print self.selected_device
if not self.selected_device is None:
self.ble.connect_direct(self.selected_device_raw)

def main():
QtCore.QCoreApplication.setOrganizationName("productize")
Expand Down

0 comments on commit 27add30

Please sign in to comment.