#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import json import time import multiprocessing config = json.load(open("/etc/dht22_sensing.json")) def send_to_home_assistant(temperature, humidity): # print(f"Sending temperature {temperature}, humidity {humidity}") headers = { 'x-ha-access': config['token'], 'Authorization': "Bearer {}".format(config['token']) } temperature_url = "{}/api/states/sensor.{}".format( config['ha_url'], config['ha_temp_sensor_name']) temperatur_data = { "state": str(temperature), "attributes": { "device_class": "temperature", "friendly_name": config['ha_temp_friendly_name'], "unit_of_measurement": "°C" } } requests.post(temperature_url, json=temperatur_data, headers=headers) humidity_url = "{}/api/states/sensor.{}".format( config['ha_url'], config['ha_humidity_sensor_name']) humidity_data = { "state": str(humidity), "attributes": { "device_class": "humidity", "friendly_name": config['ha_humidity_friendly_name'], "unit_of_measurement": "%" } } requests.post(humidity_url, json=humidity_data, headers=headers) def sense(): import adafruit_dht import board dht_device = adafruit_dht.DHT22(getattr(board, config['dht_pin'])) try: send_to_home_assistant(dht_device.temperature, dht_device.humidity) except RuntimeError as error: # Errors happen fairly often, DHT's are hard to read, just keep going print(error.args[0]) if __name__ == "__main__": multiprocessing.set_start_method('spawn') while True: process = multiprocessing.Process(target=sense) process.start() process.join() time.sleep(float(config['polling_sleep_time_seconds']))