import asyncio from xknx import XKNX from xknx.devices import Light from xknx.io import GatewayScanner, Tunnel from xknx.knx import DPTBinary, GroupAddress, PhysicalAddress, Telegram def direct(): async def main(): xknx = XKNX() await xknx.start() light = Light(xknx, name='GangLicht', group_address_switch='0/3/3') await light.set_on() await asyncio.sleep(1) await light.set_off() await xknx.stop() # pylint: disable=invalid-name loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close() async def tunnel(): xknx = XKNX() tunnel = Tunnel( xknx, PhysicalAddress("3.7.249"), local_ip='192.168.178.74', gateway_ip='192.168.178.80', gateway_port=3671, ) await tunnel.connect_udp() await tunnel.connect() await tunnel.send_telegram(Telegram(GroupAddress('0/3/3'), payload=DPTBinary(1))) await asyncio.sleep(2) await tunnel.send_telegram(Telegram(GroupAddress('0/3/3'), payload=DPTBinary(0))) await asyncio.sleep(2) await tunnel.connectionstate() await tunnel.disconnect() loop = asyncio.get_event_loop() loop.run_until_complete(tunnel()) loop.close()