# https://github.com/jholtmann/ip_discovery """Find other gateways and serve the udp socket""" import os import FindMyIP from socket import * from helpers import get_unix_time from logger import get_logger DEBUG = True if os.environ.get('DEBUG') is not None else False DISCOVERY_ACK = 'IP_DISCOVERY_ACK'.encode() # ACK for broadcast DISCOVERY_OK = 'IP_DISCOVERY_OK'.encode() # Not used yet DISCOVERY_RSP_GTW = 'IP_DISCOVERY_RSP_GTW'.encode() # RSP for gateway DISCOVERY_RSP_MSH = 'IP_DISCOVERY_RSP_MSH'.encode() # RSP for mesh DISCOVERY_TIMEOUT = 0.5 SOCKET_TIMEOUT = 0.2 PORT_SERVER = 9434 PORT_CLIENT = 9435 def create_udp_socket(): sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP) sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) return sock def start_discovery_server(): """Serves the UDP socket for UDP broadcast discovery""" logger = get_logger(__name__) sock = create_udp_socket() server_address = ('', PORT_SERVER) local_ip = FindMyIP.internal() try: sock.bind(server_address) logger.info("Started discovery socket") while True: data, addr = sock.recvfrom(4096) logger.debug(f"Received a packet from {addr}") logger.debug(f"{addr[0]} | {local_ip}") logger.debug(data) if str(addr[0]) == str(local_ip): continue logger.debug("IP accepted") if data == DISCOVERY_ACK: logger.debug("ACK accepted") sock.sendto(DISCOVERY_RSP_GTW, (addr[0], PORT_CLIENT)) logger.debug(f"Send ACK to {addr}") except Exception as err: sock.close() logger.error(err) def start_discovery_client(): print("Started discovery client") sock = create_udp_socket() sock.settimeout(SOCKET_TIMEOUT) server_address = ('255.255.255.255', PORT_SERVER) start_time_stamp = get_unix_time() delta = round(get_unix_time() - start_time_stamp, 2) discovered_devices = [] try: sock.bind(('', PORT_CLIENT)) while delta <= DISCOVERY_TIMEOUT: delta = round(get_unix_time() - start_time_stamp, 2) sock.sendto(DISCOVERY_ACK, server_address) data, addr = sock.recvfrom(4096) if str(addr[0]) in discovered_devices: continue if data == DISCOVERY_RSP_GTW or data == DISCOVERY_RSP_MSH: discovered_devices.append(str(addr[0])) except Exception as err: print(err) finally: sock.close() return discovered_devices devices = start_discovery_client() print(f"Devices: {devices}")