vncmqtt/grab.py

135 lines
4.1 KiB
Python

import base64
import hashlib
import logging
import json
import os
import sys
import time
import imageio
from vncdotool import api
from twisted.internet import reactor
import paho.mqtt.client as paho
# LOGGING
# setup logging to console
class SkipLevelsFilter(logging.Filter):
def __init__(self, skip_levels=[]):
self.skip_levels = skip_levels
def filter(self, rec):
return rec.levelno not in self.skip_levels
logger = logging.getLogger('keywords_pipeline')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# stderr for >=ERROR
stderr_handler = logging.StreamHandler(sys.stderr)
stderr_handler.setLevel(logging.ERROR)
stderr_handler.setFormatter(formatter)
# stdout for everything but ERROR.
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.addFilter(SkipLevelsFilter([logging.ERROR]))
stdout_handler.setLevel(logging.DEBUG)
stdout_handler.setFormatter(formatter)
# attach the handlers and set the default level to DEBUG for the remainder of the logger setup
logger.addHandler(stdout_handler)
logger.addHandler(stderr_handler)
logger.setLevel(logging.DEBUG)
# now extract the desired loglevel or use INFO if not specified, DEBUG is still the _active_ level though
# so if anything fails here, it will be logged even if it's a debug message
LOGLEVEL = os.getenv('LOGLEVEL') or "INFO"
numeric_level = getattr(logging, LOGLEVEL.upper(), None)
if not isinstance(numeric_level, int):
logger.critical('Invalid log level: %s' % LOGLEVEL)
os.exit(78)
# here's the debug message which will be logged before changing the level.
logger.debug(f'setting loglevel to {LOGLEVEL}')
logger.setLevel(numeric_level)
# END LOGGING
MQTT_USER=os.environ['MQTT_USER']
MQTT_PASSWORD=os.environ['MQTT_PASSWORD']
MQTT_HOST=os.environ['MQTT_HOST']
VNC_HOST=os.environ['VNC_HOST']
VNC_PASSWORD=os.environ['VNC_PASSWORD']
MQTT_TOPIC=os.environ['MQTT_TOPIC']
DEVICE_NAME=os.environ.get('DEVICE_NAME', None)
DEVICE_ID=os.environ.get('DEVICE_ID', None)
INTERVAL=float(os.environ.get('INTERVAL', '5'))
SANITIZED_HOST=VNC_HOST.replace('.','_')
NAME=DEVICE_NAME or SANITIZED_HOST
DEVICE_ID = DEVICE_ID or SANITIZED_HOST
OBJECT_ID = base64.b32encode(hashlib.sha1(NAME.encode('utf-8')).digest()).decode('utf-8')
def blackwhite(imgf):
r = imageio.read(imgf)
screen = r.get_data(0).flatten()
byt = screen.tobytes()
count = len(byt)
blackcount = byt.count(b'\x00')
whitecount = byt.count(b'\xff')
return blackcount,whitecount,count
mqtt = paho.Client()
mqtt.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt.connect(MQTT_HOST)
mqtt.loop_start()
mqtt.publish(f"homeassistant/camera/{DEVICE_ID}/config", json.dumps({
"name": NAME,
"topic": MQTT_TOPIC + "/image",
"unique_id": DEVICE_ID+"-T",
"device_class": "camera",
"device": {
"identifiers": DEVICE_ID,
"manufacturer": "VNC-MQTT",
"model": "v0.1",
},
}), retain=True)
t = None
while True:
if t is None:
t = time.time()
with api.connect(VNC_HOST, VNC_PASSWORD) as vnc:
logger.debug("refreshing for capture")
vnc.refreshScreen()
logger.debug("capturing")
vnc.captureScreen('capture.png')
logger.debug("publishing to mqtt")
with open('capture.png', 'rb') as f:
capture = f.read()
(black,white,total) = blackwhite('capture.png')
if black > (total / 2): # if it's more than half black, it's a failure
logger.warning(f"screen is more than half black. considering as failure.")
continue
if white > (total / 2): # if it's more than half white, it's a failure
logger.warning(f"screen is more than half white. considering as failure.")
continue
with open('capture.time', 'wb') as f:
pass # touch :D
p = mqtt.publish(topic=MQTT_TOPIC + "/image", payload=capture)
time_left = INTERVAL - (time.time() - t)
if time_left > 0:
logger.debug(f"sleeping for {time_left}s")
time.sleep(time_left)
else:
logger.warning(f"interval missed ({0-time_left}s overdue)")
t = None
mqtt.loop_end()
reactor.stop()