Commit 7558bf8c authored by sfleisch's avatar sfleisch
Browse files

add ampelschaltung FSM

parent 4166e29a
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import time #sleep, parse timestamp
import threading #threads
import signal # Ctrl+C
import logging #
CL_R = "\033[1;37;41m" #red
CL_Y = "\033[1;37;43m" #yellow
CL_G = "\033[1;37;42m" #green
CL_N = "\033[0m" #normal
# Logging
FORMAT = '%(asctime)s (%(threadName)-10s) %(message)s'
logging.basicConfig(filename="/tmp/ampelschaltung_state_machine.log", level=logging.DEBUG, format=FORMAT)
class Ampelschaltung(threading.Thread):
"""Implementation of a traffic light controller based on a finite state machine.
Major street crossing a minor street. Major street green by default, unless the minor street triggers
a green phase request by setting the flag in_triggered.
Specify the durations in the dictionary in_durations. Units are seconds. Example:
in_durations = {'yellow': 2, 'red_yellow': 1, 'all_red': 3, 'major_green_min': 10, 'minor_green': 5 }
Output signals can be read from the attribute out_signals (dictionary).
"""
def __init__(self, in_name, in_durations):
threading.Thread.__init__(self)
self.name = in_name
self.in_durations = in_durations
self.in_triggered = False
self.out_signals = {}
self.state = "st_major_green_min_time_not_passed"
self.next_state = None
self.terminate = False # set this to True from the outside to tell the thread to terminate
self._suppress_log = False
logging.debug("State machine instance %s created" % self.name)
def debug_output(self, in_T):
s = "major="
if self.out_signals['major_red']==1:
s += "%s %s" % (CL_R, CL_N)
else:
s += " "
if self.out_signals['major_yellow']==1:
s += "%s %s" % (CL_Y, CL_N)
else:
s += " "
if self.out_signals['major_green']==1:
s += "%s %s" % (CL_G, CL_N)
else:
s += " "
s += " minor="
if self.out_signals['minor_red']==1:
s += "%s %s" % (CL_R, CL_N)
else:
s += " "
if self.out_signals['minor_yellow']==1:
s += "%s %s" % (CL_Y, CL_N)
else:
s += " "
if self.out_signals['minor_green']==1:
s += "%s %s" % (CL_G, CL_N)
else:
s += " "
s += " state=%s " % self.state
s += " in_triggered=%s T=%s seconds terminate=%s." % (self.in_triggered, in_T, self.terminate)
logging.debug(s)
def run(self):
while not self.terminate:
if self.state=="st_major_green_min_time_not_passed":
self.out_signals = {'major_red':0, 'major_yellow':0, 'major_green':1, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = self.in_durations['major_green_min']
self.debug_output(T)
time.sleep(T)
if self.in_triggered:
self.next_state = "st_major_yellow"
else:
self.next_state = "st_major_green_min_time_passed"
elif self.state=="st_major_green_min_time_passed":
self.out_signals = {'major_red':0, 'major_yellow':0, 'major_green':1, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = .1
if not self._suppress_log:
self.debug_output(T)
self._suppress_log = True
time.sleep(T)
if self.in_triggered:
self.next_state = "st_major_yellow"
else:
self.next_state = "st_major_green_min_time_passed"
elif self.state=="st_major_yellow":
self.out_signals = {'major_red':0, 'major_yellow':1, 'major_green':0, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = self.in_durations['yellow']
self.debug_output(T)
self.next_state = "st_major_red"
time.sleep(T)
elif self.state=="st_major_red":
self.out_signals = {'major_red':1, 'major_yellow':0, 'major_green':0, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = self.in_durations['all_red']
self.debug_output(T)
self.next_state = "st_minor_red_yellow"
time.sleep(T)
elif self.state=="st_minor_red_yellow":
self.out_signals = {'major_red':1, 'major_yellow':0, 'major_green':0, 'minor_red':1, 'minor_yellow':1, 'minor_green':0 }
T = self.in_durations['red_yellow']
self.debug_output(T)
self.next_state = "st_minor_green"
time.sleep(T)
elif self.state=="st_minor_green":
self.out_signals = {'major_red':1, 'major_yellow':0, 'major_green':0, 'minor_red':0, 'minor_yellow':0, 'minor_green':1 }
T = self.in_durations['minor_green']
self.debug_output(T)
self.next_state = "st_minor_yellow"
time.sleep(T)
elif self.state=="st_minor_yellow":
self.in_triggered = False
self.out_signals = {'major_red':1, 'major_yellow':0, 'major_green':0, 'minor_red':0, 'minor_yellow':1, 'minor_green':0 }
T = self.in_durations['yellow']
self.debug_output(T)
self.next_state = "st_minor_red"
time.sleep(T)
elif self.state=="st_minor_red":
self.out_signals = {'major_red':1, 'major_yellow':0, 'major_green':0, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = self.in_durations['all_red']
self.debug_output(T)
self.next_state = "st_major_red_yellow"
time.sleep(T)
elif self.state=="st_major_red_yellow":
self.out_signals = {'major_red':1, 'major_yellow':1, 'major_green':0, 'minor_red':1, 'minor_yellow':0, 'minor_green':0 }
T = self.in_durations['red_yellow']
self.debug_output(T)
self.next_state = "st_major_green_min_time_not_passed"
time.sleep(T)
self.state = self.next_state
logging.debug("Thread terminating.")
threads = []
a1 = Ampelschaltung("a1", {'yellow': 2, 'red_yellow': 1, 'all_red': 3, 'major_green_min': 10, 'minor_green': 5 })
threads.append(a1)
# Start all threads
logging.debug("starting threads")
for x in threads:
logging.debug("starting thread '%s'" % x.name)
x.start()
def receive_signal(signum, stack):
global s_stop
if signum==2: #SIGINT
logging.debug("Received SIGINT, setting the stop flag")
s_stop = True
# Set the terminate flag for all threads
logging.debug("setting the terminate flag for all threads")
for x in threads:
x.terminate = True
elif signum==10: #SIGUSR1
logging.debug("Received SIGUSR1, setting trigger flag")
a1.in_triggered = True
signal.signal(signal.SIGUSR1, receive_signal)
signal.signal(signal.SIGINT, receive_signal)
s_stop = False
# Wait for any signal
logging.debug("waiting for signal")
while not s_stop:
time.sleep(1)
# And join all threads
logging.debug("joining all threads")
for x in threads:
x.join()
logging.debug("exiting")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment