Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backends/scanner: add handle_early_stop hooks #1146

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions bleak/backends/bluezdbus/advertisement_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""

import logging
from typing import Iterable, NamedTuple, Tuple, Union, no_type_check
from typing import Callable, Iterable, NamedTuple, Tuple, Union, no_type_check

from dbus_fast.service import ServiceInterface, dbus_property, method, PropertyAccess

Expand All @@ -34,6 +34,9 @@ class OrPattern(NamedTuple):
OrPatternLike = Union[OrPattern, Tuple[int, AdvertisementDataType, bytes]]


ReleasedCallback = Callable[[], None]


class AdvertisementMonitor(ServiceInterface):
"""
Implementation of the org.bluez.AdvertisementMonitor1 D-Bus interface.
Expand All @@ -49,21 +52,24 @@ class AdvertisementMonitor(ServiceInterface):
"""

def __init__(
self,
or_patterns: Iterable[OrPatternLike],
self, or_patterns: Iterable[OrPatternLike], released_callback: ReleasedCallback
):
"""
Args:
or_patterns:
List of or patterns that will be returned by the ``Patterns`` property.
released_callback:
A callback that is called when the D-bus "Release" method is called.
"""
super().__init__(defs.ADVERTISEMENT_MONITOR_INTERFACE)
# dbus_fast marshaling requires list instead of tuple
self._or_patterns = [list(p) for p in or_patterns]
self._released_callback = released_callback

@method()
def Release(self):
logger.debug("Release")
self._released_callback()

@method()
def Activate(self):
Expand Down
35 changes: 33 additions & 2 deletions bleak/backends/bluezdbus/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,14 @@ class DeviceRemovedCallbackAndState(NamedTuple):
"""


DiscoveryStoppedCallback = Callable[[], None]


class DiscoveryStoppedCallbackAndState(NamedTuple):
callback: DiscoveryStoppedCallback
adapter_path: str


DeviceConnectedChangedCallback = Callable[[bool], None]
"""
A callback that is called when a device's "Connected" property changes.
Expand Down Expand Up @@ -167,6 +175,7 @@ def __init__(self):

self._advertisement_callbacks: List[CallbackAndState] = []
self._device_removed_callbacks: List[DeviceRemovedCallbackAndState] = []
self._discovery_stopped_callbacks: List[DiscoveryStoppedCallbackAndState] = []
self._device_watchers: Set[DeviceWatcher] = set()
self._condition_callbacks: Set[Callable] = set()
self._services_cache: Dict[str, BleakGATTServiceCollection] = {}
Expand Down Expand Up @@ -312,6 +321,7 @@ async def active_scan(
filters: Dict[str, Variant],
advertisement_callback: AdvertisementCallback,
device_removed_callback: DeviceRemovedCallback,
discovery_stopped_callback: DiscoveryStoppedCallback,
) -> Callable[[], Coroutine]:
"""
Configures the advertisement data filters and starts scanning.
Expand All @@ -323,6 +333,9 @@ async def active_scan(
A callable that will be called when new advertisement data is received.
device_removed_callback:
A callable that will be called when a device is removed from BlueZ.
discovery_stopped_callback:
A callable that will be called if discovery is stopped early
(before stop was requested by calling the return value).
Returns:
An async function that is used to stop scanning and remove the filters.
Expand All @@ -342,6 +355,13 @@ async def active_scan(
)
self._device_removed_callbacks.append(device_removed_callback_and_state)

discovery_stopped_callback_and_state = DiscoveryStoppedCallbackAndState(
discovery_stopped_callback, adapter_path
)
self._discovery_stopped_callbacks.append(
discovery_stopped_callback_and_state
)

try:
# Apply the filters
reply = await self._bus.call(
Expand Down Expand Up @@ -375,6 +395,9 @@ async def stop() -> None:
self._device_removed_callbacks.remove(
device_removed_callback_and_state
)
self._discovery_stopped_callbacks.remove(
discovery_stopped_callback_and_state
)

async with self._bus_lock:
reply = await self._bus.call(
Expand Down Expand Up @@ -413,6 +436,7 @@ async def passive_scan(
filters: List[OrPatternLike],
advertisement_callback: AdvertisementCallback,
device_removed_callback: DeviceRemovedCallback,
discovery_stopped_callback: DiscoveryStoppedCallback,
) -> Callable[[], Coroutine]:
"""
Configures the advertisement data filters and starts scanning.
Expand Down Expand Up @@ -444,7 +468,7 @@ async def passive_scan(
self._device_removed_callbacks.append(device_removed_callback_and_state)

try:
monitor = AdvertisementMonitor(filters)
monitor = AdvertisementMonitor(filters, discovery_stopped_callback)

# this should be a unique path to allow multiple python interpreters
# running bleak and multiple scanners within a single interpreter
Expand Down Expand Up @@ -828,7 +852,14 @@ def _parse_msg(self, message: Message):
# then call any callbacks so they will be called with the
# updated state

if interface == defs.DEVICE_INTERFACE:
if interface == defs.ADAPTER_INTERFACE:
if "Discovering" in changed and not self_interface["Discovering"]:
for (
discovery_stopped_callback,
_,
) in self._discovery_stopped_callbacks:
discovery_stopped_callback()
elif interface == defs.DEVICE_INTERFACE:
# handle advertisement watchers

self._run_advertisement_callbacks(
Expand Down
2 changes: 2 additions & 0 deletions bleak/backends/bluezdbus/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,15 @@ async def start(self):
self._or_patterns,
self._handle_advertising_data,
self._handle_device_removed,
self.handle_early_stop,
)
else:
self._stop = await manager.active_scan(
adapter_path,
self._filters,
self._handle_advertising_data,
self._handle_device_removed,
self.handle_early_stop,
)

async def stop(self):
Expand Down
18 changes: 15 additions & 3 deletions bleak/backends/corebluetooth/CentralManagerDelegate.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import sys
import threading
from typing import Any, Callable, Dict, Optional
from typing import Any, Callable, Dict, Iterable, Optional

if sys.version_info < (3, 11):
from async_timeout import timeout as async_timeout
Expand Down Expand Up @@ -49,6 +49,7 @@


DisconnectCallback = Callable[[], None]
ScanningStoppedCallback = Callable[[], None]


class CentralManagerDelegate(NSObject):
Expand Down Expand Up @@ -93,6 +94,8 @@ def init(self) -> Optional["CentralManagerDelegate"]:
if self.central_manager.state() != CBManagerStatePoweredOn:
raise BleakError("Bluetooth device is turned off")

self._scanning_stopped_callback: Optional[ScanningStoppedCallback]

# isScanning property was added in 10.13
if objc.macos_available(10, 13):
self.central_manager.addObserver_forKeyPath_options_context_(
Expand All @@ -110,7 +113,11 @@ def __del__(self):
# User defined functions

@objc.python_method
async def start_scan(self, service_uuids) -> None:
async def start_scan(
self,
service_uuids: Iterable[str],
scanning_stopped_callback: ScanningStoppedCallback,
) -> None:
service_uuids = (
NSArray.alloc().initWithArray_(
list(map(CBUUID.UUIDWithString_, service_uuids))
Expand All @@ -133,8 +140,11 @@ async def start_scan(self, service_uuids) -> None:
else:
await asyncio.sleep(0.1)

self._scanning_stopped_callback = scanning_stopped_callback

@objc.python_method
async def stop_scan(self) -> None:
self._scanning_stopped_callback = None
self.central_manager.stopScan()

# The `isScanning` property was added in macOS 10.13, so before that
Expand Down Expand Up @@ -199,11 +209,13 @@ def _changed_is_scanning(self, is_scanning: bool) -> None:
else:
if self._did_stop_scanning_event:
self._did_stop_scanning_event.set()
if self._scanning_stopped_callback:
self._scanning_stopped_callback()

def observeValueForKeyPath_ofObject_change_context_(
self, keyPath: NSString, object: Any, change: NSDictionary, context: int
) -> None:
logger.debug("'%s' changed", keyPath)
logger.debug("'%s' changed: %r", keyPath, change)

if keyPath != "isScanning":
return
Expand Down
2 changes: 1 addition & 1 deletion bleak/backends/corebluetooth/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def callback(p: CBPeripheral, a: Dict[str, Any], r: int) -> None:
self._callback(device, advertisement_data)

self._manager.callbacks[id(self)] = callback
await self._manager.start_scan(self._service_uuids)
await self._manager.start_scan(self._service_uuids, self.handle_early_stop)

async def stop(self):
await self._manager.stop_scan()
Expand Down
4 changes: 4 additions & 0 deletions bleak/backends/p4android/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ def handle_permissions(permissions, grantResults):
)
self.__javascanner.flushPendingScanResults(self.__callback.java)

# REVISIT: we shouldn't wait and check for error here, instead we should
# just allow the stopped early callback to handle this and let the user
# decide what to do.
try:
async with async_timeout(0.2):
await scanfuture
Expand Down Expand Up @@ -292,6 +295,7 @@ def result_state(self, status_str, name, *data):

@java_method("(I)V")
def onScanFailed(self, errorCode):
self._loop.call_soon_threadsafe(self._scanner.handle_early_stop)
self.result_state(defs.ScanFailed(errorCode).name, "onScan")

@java_method("(Landroid/bluetooth/le/ScanResult;)V")
Expand Down
3 changes: 3 additions & 0 deletions bleak/backends/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def create_or_update_device(

return device

def handle_early_stop(self) -> None:
...

@abc.abstractmethod
async def start(self):
"""Start scanning for devices"""
Expand Down
30 changes: 25 additions & 5 deletions bleak/backends/winrt/scanner.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import asyncio
import functools
import logging
import sys
from typing import Dict, List, NamedTuple, Optional
from uuid import UUID

from bleak_winrt.windows.devices.bluetooth.advertisement import (
BluetoothLEScanningMode,
BluetoothLEAdvertisementWatcher,
BluetoothLEAdvertisementReceivedEventArgs,
BluetoothLEAdvertisementType,
BluetoothLEAdvertisementWatcher,
BluetoothLEAdvertisementWatcherStatus,
BluetoothLEAdvertisementWatcherStoppedEventArgs,
BluetoothLEScanningMode,
)

if sys.version_info[:2] < (3, 8):
Expand Down Expand Up @@ -222,12 +225,29 @@ def _received_handler(

self._callback(device, advertisement_data)

def _stopped_handler(self, sender, e):
def _handle_stopped_threadsafe(
self,
loop: asyncio.AbstractEventLoop,
sender: BluetoothLEAdvertisementWatcher,
e: BluetoothLEAdvertisementWatcherStoppedEventArgs,
) -> None:
logger.debug("watcher status: %s, error: %s", sender.status.name, e.error.name)

loop.call_soon_threadsafe(
self._handle_stopped,
sender.status == BluetoothLEAdvertisementWatcherStatus.ABORTED,
)

def _handle_stopped(self, from_error: bool) -> None:
logger.debug(
"{0} devices found. Watcher status: {1}.".format(
len(self.seen_devices), self.watcher.status
len(self.seen_devices), self.watcher.status.name
)
)

if from_error:
self.handle_early_stop()

self._stopped_event.set()

async def start(self):
Expand All @@ -245,7 +265,7 @@ async def start(self):
lambda s, e: event_loop.call_soon_threadsafe(self._received_handler, s, e)
)
self._stopped_token = self.watcher.add_stopped(
lambda s, e: event_loop.call_soon_threadsafe(self._stopped_handler, s, e)
functools.partial(self._handle_stopped_threadsafe, event_loop)
)

if self._signal_strength_filter is not None:
Expand Down