Initial commit
This commit is contained in:
24
playground/asyncio_test.py
Normal file
24
playground/asyncio_test.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import asyncio
|
||||
|
||||
|
||||
async def tcp_echo_client(message):
|
||||
reader, writer = await asyncio.open_connection('server', 7072)
|
||||
print("Connected")
|
||||
|
||||
lines = [
|
||||
"displayattr .*\n".encode(),
|
||||
"inform on\n".encode(),
|
||||
]
|
||||
writer.writelines(lines)
|
||||
|
||||
while True:
|
||||
data = await reader.readline()
|
||||
data = data.decode()
|
||||
print("Received line", data)
|
||||
|
||||
|
||||
#asyncio.run(tcp_echo_client('Hello World!'))
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(tcp_echo_client("bla"))
|
||||
loop.close()
|
||||
50
playground/xknxtest.py
Normal file
50
playground/xknxtest.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
from xknx import XKNX
|
||||
from xknx.devices import Light
|
||||
from xknx.io import GatewayScanner, Tunnel
|
||||
from xknx.knx import DPTBinary, GroupAddress, PhysicalAddress, Telegram
|
||||
|
||||
|
||||
def direct():
|
||||
async def main():
|
||||
xknx = XKNX()
|
||||
await xknx.start()
|
||||
light = Light(xknx,
|
||||
name='GangLicht',
|
||||
group_address_switch='0/3/3')
|
||||
await light.set_on()
|
||||
await asyncio.sleep(1)
|
||||
await light.set_off()
|
||||
await xknx.stop()
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
||||
|
||||
|
||||
async def tunnel():
|
||||
xknx = XKNX()
|
||||
tunnel = Tunnel(
|
||||
xknx,
|
||||
PhysicalAddress("3.7.249"),
|
||||
local_ip='192.168.178.74',
|
||||
gateway_ip='192.168.178.80',
|
||||
gateway_port=3671,
|
||||
)
|
||||
await tunnel.connect_udp()
|
||||
await tunnel.connect()
|
||||
|
||||
await tunnel.send_telegram(Telegram(GroupAddress('0/3/3'), payload=DPTBinary(1)))
|
||||
await asyncio.sleep(2)
|
||||
await tunnel.send_telegram(Telegram(GroupAddress('0/3/3'), payload=DPTBinary(0)))
|
||||
await asyncio.sleep(2)
|
||||
|
||||
await tunnel.connectionstate()
|
||||
await tunnel.disconnect()
|
||||
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(tunnel())
|
||||
loop.close()
|
||||
Reference in New Issue
Block a user