63 lines
1.7 KiB
Python
63 lines
1.7 KiB
Python
from bluepy.btle import DefaultDelegate
|
|
from bluepy.btle import Scanner
|
|
|
|
from data_class import Data
|
|
from devices import get_device
|
|
from logger import get_logger
|
|
logger = get_logger(__name__)
|
|
|
|
# This is the list, where the responses will be stored from the `handleDiscovery`
|
|
devices = []
|
|
|
|
|
|
class ScanDelegate(DefaultDelegate):
|
|
def __init__(self):
|
|
DefaultDelegate.__init__(self)
|
|
|
|
def handleDiscovery(self, dev, isNewDev, isNewData):
|
|
global devices
|
|
|
|
for (sdid, desc, val) in dev.getScanData():
|
|
if self.is_temperature(sdid, val):
|
|
data_obj = Data(val)
|
|
|
|
if self.is_atc_device(dev, data_obj):
|
|
device_from_config = get_device(dev)
|
|
devices.append([dev, data_obj, device_from_config])
|
|
|
|
@staticmethod
|
|
def is_temperature(sdid, val):
|
|
if sdid != 22: return False
|
|
if len(val) != 30: return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def is_atc_device(dev, data_obj):
|
|
global devices
|
|
if 'A4:C1:38' not in dev.addr.upper(): return False
|
|
for device in devices:
|
|
if str(device[0].addr) == str(dev.addr): return False
|
|
|
|
device_from_config = get_device(dev)
|
|
|
|
try:logger.info(f"Device: {dev.addr.upper()} ({dev.addrType}), RSSI: {dev.rssi}dB, Room: {device_from_config.room}")
|
|
except:logger.info(f"Device: {dev.addr.upper()} ({dev.addrType}), RSSI: {dev.rssi}dB, Room: ?")
|
|
logger.info(f'\tTemp: {data_obj.temperature}°C, Humid: {data_obj.humidity}%, Batt: {data_obj.battery_percent}%')
|
|
return True
|
|
|
|
|
|
def cleanup():
|
|
global devices
|
|
devices = []
|
|
|
|
|
|
def start_discovery(timeout=20):
|
|
cleanup()
|
|
global devices
|
|
logger.info(f'Start discovery with timout {timeout}s...')
|
|
|
|
scanner = Scanner().withDelegate(ScanDelegate())
|
|
scanner.scan(timeout=timeout, passive=False)
|
|
|
|
return devices
|