herkömmlichen Garagentorantrieb mit Raspberry und Python "smart" machen

User stellen ihre Haussteuerung vor

Moderator: Co-Administratoren

Antworten
rentier-s
Beiträge: 353
Registriert: 19.06.2017, 09:24
Hat sich bedankt: 20 Mal
Danksagung erhalten: 63 Mal

herkömmlichen Garagentorantrieb mit Raspberry und Python "smart" machen

Beitrag von rentier-s » 30.08.2022, 09:11

Hallo zusammen,

falls jemand Interesse daran hat, in meiner Garage werkelt ein Raspberry Pi, an dessen GPIO ich ein paar Schnittstellen zu meinem Garagentor angeschlossen und diesen ansatzweise "smart" gemacht habe.

Der Antrieb hat nur einen Knopf für hoch/runter/stopp, eine integrierte Lampe, die für zwei Minuten leuchtet sobald der Antrieb fährt, und eine Lichtschranke als Sicherheits-Stopp, wenn während des Schließens etwas den Torbereich blockiert. Normalerweise wird er über die mitgelieferten Funk-Fernbedienungen oder Wand-Taster bedient. Anstatt die Wand-Taster direkt am Antrieb anzuschließen, sind diese nun mit dem Raspberry verbunden und ein Relais drückt stattdessen den Knopf.

Zusätzlich habe ich zwei Reed-Kontakte dran gebaut, einer an der Wand und einer am Ausleger des Antriebs, damit kann der Raspberry die beiden Endlagen erkennen.

Das Programm ist stückweise zusammen gebastelt und könnte mit Sicherheit an einigen Stellen optimiert werden. Für meine Zwecke funktioniert es soweit. Ich habe es ein wenig kommentiert, damit man halbwegs versteht was ich da hin geschustert habe. Gestartet wird es als systemctl daemon und läuft mit den 20 Hz Abtastrate mit kaum merklicher CPU Auslastung, ganz im Gegensatz zu pigpio.

Bislang habe ich ein PHP-Skript, dass die PivCCU3 per HTTP-Request aufrufen kann, um einen Tastendruck zu übergeben. Damit kann ich aus Homematic heraus mittels CUxD-Exec Taster das Tor öffnen, schließen und anhalten, wie es auch mit den Wand-Tastern geht.

Außerdem habe ich einen virtuellen CUxD Rollladenaktor angelegt, an den die Öffnungshöhe des Tors zurück gemeldet wird, damit man in Homematic sehen kann, wo das Tor gerade steht. Wenn ich mal wieder Zeit und Lust habe, möchte ich das Programm erweitern, so dass es Soll-Behanghöhen 0 und 100 des virtuellen Aktors interpretieren und das Tor gezielt öffnen bzw. schließen kann. Beliebige Behanghöhen brauche ich nicht und das wäre mir wahrscheinlich zu komplex.

Code: Alles auswählen

#!/usr/bin/python

inputs = {
8: 'Taster Tor',				# Wand-Taster in der Garage, 1 = Taster losgelassen
9: 'Lichtschranke',				# Lichtschranke auf Kniehöhe, 1 = frei, 0 = blockiert
10: 'Garagentor geschlossen',	# Reed-Kontakt am Antriebsschlitten, 0 = Tor in geöffneter Position
11: 'Beleuchtung Torantrieb',	# Integrierte Beleuchtung des Torantriebs über Optokoppler, um zu erkennen, ob der Antrieb aktiv ist. Licht geht an, sobald der Antrieb fährt, und nach 2 Minuten Inaktivität aus.
25: 'Garagentor offen'			# Reed-Kontakt am Torrahmen, 0 = Tor vollständig geschlossen
}

outputs = {
13: 'Taster Torantrieb'			# Relais am Eingang für externe Taster des Torantriebs
}


import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

import os
import time
import json
import threading

import sys
sys.path.insert(0, '/usr/src/homematic')
from homematic_remote import Homematic_Remote
homematic = Homematic_Remote()						# selbst gebastelte Homematic Remote API Klasse, um die Behanghöhe des virtuellen Aktors zu übergeben


def printlog(string):
	global debug
	global logging
	if debug == True:
		print (string)
	if logging == True:
		logfile = open('/mnt/ramdisk/gpio.log', 'a');
		logfile.write( time.ctime() + ": " + str(string) + "\n" )
		logfile.close()

debug = False
sensorstatus = False
logging = True
args = len(sys.argv)
if args > 0:
	for arg in range(1, args):
		if sys.argv[arg] == "debug":
			debug = True
		if sys.argv[arg] == "log":
			logging = True
		if sys.argv[arg] == "sensor":
			sensorstatus = True


import signal
run = True

def handler_stop_signals(signum, frame):
	global run
	run = False

signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)


def gpio_button(pin, press_time):
	GPIO.output(pin, False)
	time.sleep(press_time)
	GPIO.output(pin, True)


def time_since(start):
	return round((time.time() - start), 2)

def revert(value, bol_result=None):
	if bol_result != None:
		return True if (value == 0 or value == False) else False
	else:
		return 1 if (value == 0 or value == False) else 0

def key_by_val(dic, val):
	return (list(dic.keys())[list(dic.values()).index(val)])


for pin in outputs:
	GPIO.setup(pin, GPIO.OUT)
	GPIO.output(pin, True)

val = {}
lastchange = {}
timers = {}
for pin in inputs:
	GPIO.setup(pin, GPIO.IN, pull_up_down = GPIO.PUD_UP)
	val[pin] = GPIO.input(pin)
	lastchange[pin] = {0: time.time(), 1: time.time()}


tor_laufzeit = 21			# Laufzeit des Tors in Sekunden

def write_tor_status():		# Status des Tors für externe Skripte (z.B. autoschliessen.php)
	global tor_hoehe
	global tor_bewegung
	global tor_letzterichtung
	dic = { 'hoehe': round(tor_hoehe, 2), 'bewegung': tor_bewegung, 'richtung': tor_letzterichtung }
	json.dump(dic, open('/mnt/ramdisk/gate.status', 'w'))

def toggle_tor_status():	# Zustand des Tors ermitteln
	
	global tor_hoehe
	global tor_laufzeit
	global tor_bewegung
	global tor_letzterichtung
	global homematic
	
	if tor_bewegung == 0:									# Tor hat sich vorher nicht bewegt, d.h. fährt jetzt los
		tor_letzterichtung = revert(tor_letzterichtung)	
		tor_bewegung = time.time()							# Startzeit der Fahrt
		timers['stop_bewegung'] = time.time() + tor_laufzeit + 2		# Timer auf Laufzeit des Tors setzen, falls die Fahrt aus irgendeinem Grund unterbrochen wird, den das Skript sonst nicht mitbekommt
		printlog (str(tor_bewegung) + " fährt von " + str(tor_hoehe) + " " + ( 'hoch' if (tor_letzterichtung == 1) else 'runter' ))

	else:													# Tor hat sich bewegt und hat jetzt gestoppt
		lauf = ((time.time() - tor_bewegung) / tor_laufzeit) * (-1 + (2 * tor_letzterichtung))
		printlog (str(time.time() - tor_bewegung) + " Sekunden = " + str(lauf) + ", Start bei " + str(tor_hoehe))
		tor_hoehe = tor_hoehe + lauf
		printlog ("angehalten bei " + str(tor_hoehe))
		if (tor_hoehe > 1):
			tor_hoehe = 1
		elif (tor_hoehe < 0):
			tor_hoehe = 0
		homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
		tor_bewegung = 0
		timers['stop_bewegung'] = 0

	write_tor_status()

# Startzustände ermitteln
if GPIO.input(key_by_val(inputs, 'Garagentor offen')) == 0:
	tor_hoehe = 1
	tor_letzterichtung = 1
	printlog ("Tor ist offen")
elif GPIO.input(key_by_val(inputs, 'Garagentor geschlossen')) == 0:
	tor_hoehe = 0
	tor_letzterichtung = 0
	printlog ("Tor ist geschlossen")
else:
	tor_hoehe = 0.5
	tor_letzterichtung = 0
	printlog ("Tor in unbekannter Startposition")
tor_bewegung = 0
homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
write_tor_status()


read_path = "/mnt/ramdisk/gpio_in.pipe"
if os.path.exists(read_path):
	os.remove(read_path)
try:
	os.mkfifo(read_path, 0o666)
except OSError as e:
	print ("Fehler beim Erstellen der Pipe ", e)
	printlog ("Fehler beim Erstellen der Pipe ", e)
os.chmod(read_path, 0o666)
input_pipe = os.open(read_path, os.O_RDONLY | os.O_NONBLOCK)


while run:
	for pin in inputs:
		if GPIO.input(pin) != val[pin]:
			time.sleep(0.02)					# entprellen
			new_val = GPIO.input(pin)
			if new_val != val[pin]:
				if sensorstatus == True:
					printlog (str(pin) + " '" + inputs[pin] + "': neuer Wert " + str(new_val) + " nach " + str(time_since(lastchange[pin][revert(new_val)])) + " Sekunden")
				pin_name = inputs[pin]
				
				if pin_name == 'Garagentor geschlossen':
					if new_val == 0:
						tor_letzterichtung = 0
						tor_bewegung = 0
						tor_hoehe = 0
						timers['stop_bewegung'] = 0
						homematic.setValueByDevname('Garagentor Ch1', 0, 'SET_STATE')
						printlog ("Sensor: Tor ist geschlossen")
					else:
						printlog ("Sensor: Tor ist nicht mehr geschlossen")
						if tor_bewegung == 0:									# Tor fährt hoch, ohne dass das Skript dies selbst ausgelöst hat, z.B. durch Funk-Fernbedienung
							printlog ("Sensor: Tor fährt hoch")
							remove_lock()
							tor_bewegung = time.time()
							timers['stop_bewegung'] = time.time() + tor_laufzeit + 2
						tor_letzterichtung = 1
						homematic.setValueByDevname('Garagentor Ch1', 0.5, 'SET_STATE')
					write_tor_status()
				
				if pin_name == 'Garagentor offen':
					if new_val == 0:
						tor_letzterichtung = 1
						tor_bewegung = 0
						tor_hoehe = 1
						timers['stop_bewegung'] = 0
						homematic.setValueByDevname('Garagentor Ch1', 1, 'SET_STATE')
						printlog ("Sensor: Tor ist offen")
					else:
						printlog ("Sensor: Tor ist nicht mehr offen")
						if tor_bewegung == 0:									# Tor fährt runter, ohne dass das Skript dies selbst ausgelöst hat, z.B. durch Funk-Fernbedienung
							printlog ("Sensor: Tor fährt runter")
							remove_lock()
							tor_bewegung = time.time()
							timers['stop_bewegung'] = time.time() + tor_laufzeit + 2
						tor_letzterichtung = 0
						homematic.setValueByDevname('Garagentor Ch1', 0.5, 'SET_STATE')
					write_tor_status()
				
				if pin_name == 'Beleuchtung Torantrieb':
					homematic.setValueByDevname('CUxD_GPIO_07_Beleuchtung-Torantrieb', revert(new_val), 'SET_STATE')
					if new_val == 0:
						if tor_bewegung == 0:									# Der Antrieb wurde aktiv, d.h. Tor fährt los, ohne dass das Skript dies selbst ausgelöst hat, z.B. durch Funk-Fernbedienung
							printlog ("Sensor: Beleuchtung Torantrieb an")
							remove_lock()
							toggle_tor_status()
				
				if pin_name == 'Lichtschranke':
					if new_val == 0:
						if ((tor_bewegung > 0) and (tor_letzterichtung == 0)):		# Lichtschranke während des nach unten fahrens blockiert, Antrieb stoppt (Sicherheitsfunktion)
							toggle_tor_status()
				
				if pin_name == 'Taster Tor':
					if new_val == 1:
						if (time_since(lastchange[pin][0]) > 1.5):		# Ende langer Tastendruck
							if ( (GPIO.input(key_by_val(inputs, 'Garagentor offen')) == 0) or (GPIO.input(key_by_val(inputs, 'Garagentor geschlossen')) == 0) or ((tor_bewegung == 1) and (tor_letzterichtung == 1)) ):
								os.system('/usr/src/gpio/autoschliessen.php >/dev/null 2>&1 &')		# aktiviert im Hintergrund ein Skript, dass das Tor nach dem Durchfahren der Lichtschranke automatisch schließt
								printlog("Auto-Schließen")
						else:											# Ende kurzer Tastendruck
							toggle_tor_status()
							button = threading.Thread(target=gpio_button(key_by_val(outputs, 'Taster Torantrieb'), 0.3))	# "Taste" am Antrieb drücken
							button.start
				
				lastchange[pin][new_val] = time.time()
				val[pin] = new_val
	
	
	for timer in timers:
		if (0 < timers[timer] <= time.time()):
			timers[timer] = 0
			
			printlog ("Timer: " + timer)
			
			if timer == "stop_bewegung":
				if tor_bewegung > 0:
					printlog ("Bewegung zu lange")
					tor_bewegung = 0
					write_tor_status()
	
	
	data = os.read(input_pipe, 1024)
	if len(data) != 0:
		data = str(data)
		printlog ("Pipe input: " + str(data))
		
		if ('taster tor' in data):		# externe Skripts können den Antrieb betätigen, indem sie 'taster tor' an die Pipe schreiben
			toggle_tor_status()
			button = threading.Thread(target=gpio_button(key_by_val(outputs, 'Taster Torantrieb'), 0.3))
			button.start
	
	
	time.sleep(0.05)	# 50ms pro Durchlauf warten => 20 Hz Abtastrate


printlog ("Programmende")
print ("\nProgrammende\n")
GPIO.cleanup()
os.close(input_pipe)
if os.path.exists(read_path):
	os.remove(read_path)

rentier-s
Beiträge: 353
Registriert: 19.06.2017, 09:24
Hat sich bedankt: 20 Mal
Danksagung erhalten: 63 Mal

Re: herkömmlichen Garagentorantrieb mit Raspberry und Python "smart" machen

Beitrag von rentier-s » 16.01.2023, 14:10

Das inzwischen fertige Skript kann nun per HM das Tor aus jeder Lage auf, zu, und an jede beliebige Position dazwischen fahren. Wenn es während der Fahrt angehalten wird, durch Tastendruck oder Blockieren der Lichtschranke, meldet das Skript die aktuelle Höhe an HM zurück. Auch eine Stopp Funktion ist dazu gekommen, so dass sich das Tor jetzt wirklich nahezu wie ein Rollladenaktor verhält.

Ist ein ziemliches Monster geworden, außerdem mein erstes eigenes Programm in Python, aber ich dachte mir, ich teile es trotzdem mal.

Code: Alles auswählen

#!/usr/bin/python

inputs = {
10: 'Garagentor geschlossen',
9: 'Lichtschranke',
11: 'Beleuchtung Torantrieb',
25: 'Garagentor offen',
8: 'Taster Tor'
}

outputs = {
13: 'Taster Torantrieb'
}

tor_laufzeit_auf = 20
tor_laufzeit_zu = 17
tor_anlaufzeit = 2

######################################################

import time

import sys

debug = False
sensorstatus = False
logging = False
args = len(sys.argv)
if args > 0:
	for arg in range(1, args):
		if sys.argv[arg] == "debug":
			debug = True
		if sys.argv[arg] == "log":
			logging = True
		if sys.argv[arg] == "sensor":
			sensorstatus = True

def printlog(string):
	global debug
	global logging
	if debug == True:
		print (time.ctime() + ": " + string)
	if logging == True:
		logfile = open('/var/run/gpio.log', 'a');
		logfile.write( time.ctime() + ": " + str(string) + "\n" )
		logfile.close()

timers = {}
timers['stop_tor_bewegung'] = 0

import threading
stop_button_mehrfach = False

def stop_threads():
	global stop_button_mehrfach
	global timers
	stop_button_mehrfach = True
	timers['tor_stop'] = 0

import RPi.GPIO as GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)

def gpio_button(pin, press_time):
	GPIO.output(pin, False)
	time.sleep(press_time)
	GPIO.output(pin, True)

from homematic_remoteapi import Homematic_Remote
homematic = Homematic_Remote()

for pin in outputs:
	GPIO.setup(pin, GPIO.OUT)
	GPIO.output(pin, True)

val = {}
lastchange = {}
for pin in inputs:
	GPIO.setup(pin, GPIO.IN, pull_up_down = GPIO.PUD_UP)
	val[pin] = GPIO.input(pin)
	lastchange[pin] = {0: time.time(), 1: time.time()}

def time_since(start):
	return round((time.time() - start), 2)

def revert(value, bol_result=None):
	if bol_result != None:
		return True if (value == 0 or value == False) else False
	else:
		return 1 if (value == 0 or value == False) else 0

def key_by_val(dic, val):
	return (list(dic.keys())[list(dic.values()).index(val)])

import json

def write_tor_status():
	global tor_hoehe
	global tor_bewegung
	global tor_letzterichtung
	dic = { 'hoehe': round(tor_hoehe, 2), 'bewegung': tor_bewegung, 'richtung': tor_letzterichtung }
	json.dump(dic, open('/var/run/gate.status', 'w'))

def toggle_tor_status():
	
	global tor_laufzeit_auf
	global tor_laufzeit_zu
	global tor_anlaufzeit
	global tor_hoehe
	global tor_bewegung
	global tor_letzterichtung
	global homematic
	
	if tor_bewegung == 0:
		tor_letzterichtung = revert(tor_letzterichtung)
		tor_bewegung = time.time()
		if tor_letzterichtung == 0:
			restlaufzeit = round(((tor_hoehe - 0.05) * tor_laufzeit_zu), 2) + tor_anlaufzeit
		else:
			restlaufzeit = round(((1 - tor_hoehe + 0.05) * tor_laufzeit_auf), 2) + tor_anlaufzeit
		timers['stop_tor_bewegung'] = time.time() + restlaufzeit + 3
		printlog ("fährt von " + str(tor_hoehe) + " " + ( 'hoch' if (tor_letzterichtung == 1) else 'runter' ) + ", Restlaufzeit " + str(restlaufzeit))
	else:
		laufzeit = max(0, time.time() - tor_bewegung - tor_anlaufzeit)
		if tor_letzterichtung == 0:
			lauf = -1 * round((laufzeit / tor_laufzeit_zu) + 0.05, 2)
		else:
			lauf = round((laufzeit / tor_laufzeit_auf) + 0.05, 2)
		printlog (str(time.time() - tor_bewegung) + " Sekunden = " + str(lauf) + ", Start bei " + str(tor_hoehe))
		tor_hoehe = max(0, min(1, round(tor_hoehe + lauf, 3)))
		printlog ("angehalten bei " + str(tor_hoehe))
		tor_bewegung = 0
		timers['stop_tor_bewegung'] = 0
		homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')

	write_tor_status()

if GPIO.input(key_by_val(inputs, 'Garagentor offen')) == 0:
	tor_hoehe = 1
	tor_letzterichtung = 1
	printlog ("Tor ist offen")
elif GPIO.input(key_by_val(inputs, 'Garagentor geschlossen')) == 0:
	tor_hoehe = 0
	tor_letzterichtung = 0
	printlog ("Tor ist geschlossen")
else:
	tor_hoehe = 0.5
	tor_letzterichtung = 0
	printlog ("Tor in unbekannter Startposition")

tor_bewegung = 0
homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
write_tor_status()

def tor_taster(anzahl = 1):
	global stop_button_mehrfach
	global outputs
	anzahl = max(1, min(3, anzahl))
	if anzahl == 1:
		stop_button_mehrfach = True
		toggle_tor_status()
		button = threading.Thread(target=gpio_button(key_by_val(outputs, 'Taster Torantrieb'), 0.3))
		button.start
	else:
		stop_button_mehrfach = False
		for taste in range(anzahl):
			if stop_button_mehrfach == True:
				break
			if (0 < taste < anzahl - 1):
				if (((tor_hoehe == 0) or (tor_hoehe == 1)) and (tor_bewegung == 0)):
					continue
			toggle_tor_status()
			gpio_button(key_by_val(outputs, 'Taster Torantrieb'), 0.3)
			if (taste < anzahl -1):
				for w in range(10):
					if stop_button_mehrfach == True:
						break
					time.sleep(0.1)

def tor_oeffnen():
	
	global tor_hoehe
	global tor_bewegung
	global tor_letzterichtung
	
	stop_threads()
	
	if tor_bewegung == 0:
		if tor_hoehe == 0:
			tor_taster()
		elif tor_hoehe < 1:
			if tor_letzterichtung == 0:
				tor_taster()
			else:
				button_mehrfach = threading.Thread(target=tor_taster(3))
				button_mehrfach.start
	else:
		if tor_letzterichtung == 0:
			button_mehrfach = threading.Thread(target=tor_taster(2))
			button_mehrfach.start

def tor_schliessen():
	
	global tor_hoehe
	global tor_bewegung
	global tor_letzterichtung
	global inputs
	
	stop_threads()
	
	if GPIO.input(key_by_val(inputs, 'Lichtschranke')) == 0:
		homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
		return
	
	if tor_bewegung == 0:
		if tor_hoehe == 1:
			tor_taster()
		elif tor_hoehe > 0:
			if tor_letzterichtung == 1:
				tor_taster()
			else:
				button_mehrfach = threading.Thread(target=tor_taster(3))
				button_mehrfach.start
	else:
		if tor_letzterichtung == 1:
			button_mehrfach = threading.Thread(target=tor_taster(2))
			button_mehrfach.start

import os
read_path = "/var/run/gpio_in.pipe"
if os.path.exists(read_path):
	os.remove(read_path)
try:
	os.mkfifo(read_path, 0o666)
except OSError as e:
	print ("Fehler beim Erstellen der Pipe ", e)
	printlog ("Fehler beim Erstellen der Pipe ", e)
os.chmod(read_path, 0o666)
input_pipe = os.open(read_path, os.O_RDONLY | os.O_NONBLOCK)

import signal
run = True

def handler_stop_signals(signum, frame):
	global run
	run = False

signal.signal(signal.SIGINT, handler_stop_signals)
signal.signal(signal.SIGTERM, handler_stop_signals)

while run:
	for pin in inputs:
		if GPIO.input(pin) != val[pin]:
			time.sleep(0.05)
			new_val = GPIO.input(pin)
			if new_val != val[pin]:
				if sensorstatus == True:
					printlog (str(pin) + " '" + inputs[pin] + "': neuer Wert " + str(new_val) + " nach " + str(time_since(lastchange[pin][revert(new_val)])) + " Sekunden")
				pin_name = inputs[pin]
				
				if pin_name == 'Garagentor geschlossen':
					if new_val == 0:
						if tor_hoehe == 1:
							tor_laufzeit_zu = round(time_since(lastchange[key_by_val(inputs, 'Garagentor offen')][1]), 2)
							printlog("Laufzeit Zu wurde ermittelt " + str(tor_laufzeit_zu))
						tor_letzterichtung = 0
						tor_bewegung = 0
						tor_hoehe = 0
						timers['stop_tor_bewegung'] = 0
						timers['auto_schliessen'] = 0
						homematic.setValueByDevname('Garagentor Ch1', 0, 'SET_STATE')
						printlog ("Sensor: Tor ist geschlossen")
					else:
						printlog ("Sensor: Tor ist nicht mehr geschlossen")
						if tor_bewegung == 0:
							printlog ("Sensor: Tor fährt hoch")
							stop_threads()
							tor_bewegung = time.time()
							timers['stop_tor_bewegung'] = time.time() + tor_laufzeit_auf + 3
						tor_letzterichtung = 1
						homematic.setValueByDevname('Garagentor Ch1', 0.5, 'SET_STATE')
					write_tor_status()
				
				if pin_name == 'Garagentor offen':
					if new_val == 0:
						if tor_hoehe == 0:
							tor_laufzeit_auf = round(time_since(lastchange[key_by_val(inputs, 'Garagentor geschlossen')][1]), 2)
							printlog("Laufzeit Auf wurde ermittelt " + str(tor_laufzeit_auf))
						tor_letzterichtung = 1
						tor_bewegung = 0
						tor_hoehe = 1
						timers['stop_tor_bewegung'] = 0
						homematic.setValueByDevname('Garagentor Ch1', 1, 'SET_STATE')
						printlog ("Sensor: Tor ist offen")
					else:
						printlog ("Sensor: Tor ist nicht mehr offen")
						if tor_bewegung == 0:
							printlog ("Sensor: Tor fährt runter")
							stop_threads()
							tor_bewegung = time.time()
							timers['stop_tor_bewegung'] = time.time() + tor_laufzeit_zu + 3
						tor_letzterichtung = 0
						homematic.setValueByDevname('Garagentor Ch1', 0.5, 'SET_STATE')
					write_tor_status()
				
				if pin_name == 'Beleuchtung Torantrieb':
					if new_val == 0:
						if tor_bewegung == 0:
							printlog ("Sensor: Beleuchtung Torantrieb an")
							stop_threads()
							toggle_tor_status()
				
				
				if pin_name == 'Lichtschranke':
					if new_val == 0:
						if ((tor_bewegung > 0) and (tor_letzterichtung == 0)):
							toggle_tor_status()
							stop_threads()
							homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
				
				if pin_name == 'Taster Tor':
					if new_val == 0:
						stop_threads()
					else:
						if ((GPIO.input(key_by_val(inputs, 'Lichtschranke')) == 0) and (tor_bewegung == 0) and (tor_letzterichtung == 1)):
							homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
						else:
							tor_taster()
				
				lastchange[pin][new_val] = time.time()
				val[pin] = new_val
	
	
	data = os.read(input_pipe, 1024)
	if len(data) != 0:
		data = str(data)
		printlog ("Pipe input: " + str(data))
		
		if ('taster tor' in data):
			stop_threads()
			if ((GPIO.input(key_by_val(inputs, 'Lichtschranke')) == 0) and (tor_bewegung == 0) and (tor_letzterichtung == 1)):
				homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
			else:
				tor_taster()
		
		if ('tor stop' in data):
			stop_threads()
			if tor_bewegung > 0:
				tor_taster()
				homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
		
		if ('tor oeffnen' in data):
			tor_oeffnen()
		
		if ('tor schliessen' in data):
			tor_schliessen()
		
		if ('tor level' in data):
			stop_threads()
			ziel_hoehe = data[12:-1]
			printlog('Zielhoehe ' + ziel_hoehe)
			if ziel_hoehe.isnumeric():
				ziel_hoehe = float(ziel_hoehe)
				if ziel_hoehe > 1:
					ziel_hoehe = ziel_hoehe / 100
				if ziel_hoehe >= 0.9:
					tor_oeffnen()
				elif ziel_hoehe <= 0.1:
					tor_schliessen()
				else:
					if tor_bewegung > 0:
						laufzeit = max(0, time.time() - tor_bewegung - tor_anlaufzeit)
						if tor_letzterichtung == 0:
							lauf = -1 * ((laufzeit / tor_laufzeit_zu) + 0.05)
						else:
							lauf = (laufzeit / tor_laufzeit_auf) + 0.05
						tor_zwischenhoehe = max(0, min(1, round(tor_hoehe + lauf, 3)))
						if (ziel_hoehe - 0.1 < tor_zwischenhoehe < ziel_hoehe + 0.1):
							tor_taster()
							homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
						else:
							laufzeit = 0
							if tor_letzterichtung == 1:
								if ziel_hoehe > tor_zwischenhoehe:
									laufzeit = round((ziel_hoehe - tor_zwischenhoehe) * tor_laufzeit_auf, 2)
								elif ziel_hoehe < tor_zwischenhoehe:
									if GPIO.input(key_by_val(inputs, 'Lichtschranke')) == 0:
										tor_taster()
										homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
									else:
										laufzeit = -1 * round(((tor_zwischenhoehe - ziel_hoehe) * tor_laufzeit_zu) + 3, 2)
							else:
								if ziel_hoehe < tor_zwischenhoehe:
									laufzeit = round((tor_zwischenhoehe - ziel_hoehe) * tor_laufzeit_zu, 2)
								elif ziel_hoehe > tor_zwischenhoehe + 0.05:
									laufzeit = -1 * round(((ziel_hoehe - tor_zwischenhoehe) * tor_laufzeit_auf) + 3, 2)
							
							printlog('Berechnete Laufzeit ' + str(tor_zwischenhoehe) + ' -> ' + str(ziel_hoehe) + ' = ' + str(laufzeit) + ' Sekunden')
							if laufzeit > 0:
								timers['tor_stop'] = time.time() + laufzeit
							elif laufzeit < 0:
								timers['tor_stop'] = time.time() - laufzeit
								button_mehrfach = threading.Thread(target=tor_taster(2))
								button_mehrfach.start
					
					else:
						laufzeit = 0
						if tor_letzterichtung == 0:
							if ziel_hoehe > tor_hoehe + 0.05:
								laufzeit = round(((ziel_hoehe - tor_hoehe - 0.05) * tor_laufzeit_auf) + tor_anlaufzeit, 2)
							elif ziel_hoehe < tor_hoehe - 0.05:
								laufzeit = -1 * round(((tor_hoehe - ziel_hoehe) * tor_laufzeit_zu) + 3, 2)
						else:
							if GPIO.input(key_by_val(inputs, 'Lichtschranke')) == 0:
								homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
							else:
								if ziel_hoehe < tor_hoehe - 0.05:
									laufzeit = round(((tor_hoehe - ziel_hoehe - 0.05) * tor_laufzeit_zu) + tor_anlaufzeit, 2)
								elif ziel_hoehe > tor_hoehe + 0.05:
									laufzeit = -1 * round(((ziel_hoehe - tor_hoehe) * tor_laufzeit_auf) + 3, 2)
						
						printlog('Berechnete Laufzeit ' + str(tor_hoehe) + ' -> ' + str(ziel_hoehe) + ' = ' + str(laufzeit) + ' Sekunden')
						if laufzeit > 2:
							timers['tor_stop'] = time.time() + laufzeit
							tor_taster()
						elif laufzeit < -4:
							timers['tor_stop'] = time.time() - laufzeit
							button_mehrfach = threading.Thread(target=tor_taster(3))
							button_mehrfach.start
						else:
							homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
	
	
	for timer in timers:
		if (0 < timers[timer] <= time.time()):
			timers[timer] = 0
			
			printlog ("Timer: " + timer)
			
			if timer == "stop_tor_bewegung":
				if tor_bewegung > 0:
					printlog ("Bewegung dauert zu lange")
					tor_bewegung = 0
					write_tor_status()
			
			if timer == "tor_stop":
				stop_threads()
				if tor_bewegung > 0:
					tor_taster()
					homematic.setValueByDevname('Garagentor Ch1', round(tor_hoehe, 2), 'SET_STATE')
			
	
	time.sleep(0.05)

printlog ("Programmende")
print ("\nProgrammende\n")
GPIO.cleanup()
os.close(input_pipe)
if os.path.exists(read_path):
	os.remove(read_path)

Auf dem Pi läuft ein kleines PHP, um die Befehle von CUxD entgegen zu nehmen, und gibt sie per Pipe an das Skript weiter.

Code: Alles auswählen

<?php

function write_gpio_pipe($str) {
	$pipepath = '/var/run/gpio_in.pipe';
	if (!file_exists($pipepath)) die('Pipe nicht gefunden', 1, 1);
	$pipe = fopen($pipepath, 'a');
	fwrite($pipe, trim($str));
	fclose($pipe);
}

if (preg_match('|level=([\d\.]+)|', $_SERVER['QUERY_STRING'], $level)) write_gpio_pipe('tor level '.$level[1]);
elseif ($_SERVER['QUERY_STRING'] == 'stop') write_gpio_pipe('tor stop');

?>

Die zugehörigen Befehle für den CUxD System(28) Typ Jalousie

Code: Alles auswählen

CMD_SHORT: wget --no-check-certificate -q -O - 'https://raspberry/garagentor.php?stop')
CMD_LONG: wget --no-check-certificate -q -O - 'https://raspberry/garagentor.php?level=$VALUE$')

Die zugehörige homematic_remoteapi.py Klasse

Code: Alles auswählen

import requests
from requests.auth import HTTPBasicAuth
import urllib3
urllib3.disable_warnings()
import xmltodict
import threading

class Homematic_Remote:
	
	def __init__(self, host = None):
		if host == None:
			self.setHost(gethostbyname('ccu3'))
		else:
			self.setHost(host)
	
	def setHost(self, host):
		self.host = host
	
	def send_request(self, query):
		url = "https://" + self.host + ":48181/remote.exe"
		#print('result='+query)
		try:
			ret = requests.post(url, data='result='+query, verify=False, auth=('RemoteAPI-User','RemoteAPI-Pass'))
			dic = xmltodict.parse(ret.text)
			xml = dic['xml']
			if 'result' in xml:
				return xml['result']
			else:
				return False
		except requests.exceptions.ConnectionError:
			return False
		except requests.exceptions.Timeout:
			return False
		except requests.exceptions.RequestException as e:
			return False
		except Exception:
			return False
	
	def send_request_background(self, query):
		hmreq = threading.Thread(target=self.send_request(query))
		hmreq.start
	
	def getValueByDevname(self, device, type=None):
		req_type = type if (type != None and type != '') else 'STATE'
		ret = self.send_request('dom.GetObject("' + device + '").DPByHssDP("' + req_type + '").Value();')
		if ret.lower() in ['true', 'false']:
			ret = True if (ret.lower() == 'true') else False
		#print(ret)
		return ret
	
	def setValueByDevname(self, device, value, type=None):
		req_type = type if (type != None and type != '') else 'STATE'
		if value in ['true', 'false']:
			pass
		elif isinstance(value, (int, float)) == True:
			pass
		elif value in [True, False]:
			value = str(value).lower()
		else:
			value = '"' + value + '"'
		self.send_request_background('dom.GetObject("' + device + '").DPByHssDP("' + req_type + '").State(' + str(value) + ');')


Antworten

Zurück zu „Projektvorstellungen“