import tasko
from pycubed import cubesat
from lib.state_machine_utils import validate_config
[docs]class StateMachine:
"""Singleton State Machine Class"""
def __init__(self):
pass
[docs] def start(self, start_state: str):
"""Starts the state machine
Args:
:param start_state: The state to start the state machine in
:type start_state: str
"""
from StateMachineConfig import config, TaskMap, TransitionFunctionMap
self.config = config
self.transition_function_map = TransitionFunctionMap
validate_config(config, TaskMap, TransitionFunctionMap)
self.states = list(config.keys())
self.states.sort()
# init task objects
self.tasks = {key: task() for key, task in TaskMap.items()}
# set scheduled tasks to none
self.scheduled_tasks = {}
self.state = start_state
self.switch_to(start_state, force=True)
tasko.run()
[docs] def stop_all(self):
"""Stops all running tasko processes"""
for _, task in self.scheduled_tasks.items():
task.stop()
[docs] def switch_to(self, state_name: str, force=False):
"""Switches the state of the cubesat to the new state
Args:
:param state_name: The name of the state to switch to
:type state_name: str
"""
# prevent (or allow forced) illegal transitions
if not (state_name in self.config[self.state]['StepsTo'] or force):
raise ValueError(
f'You cannot transition from {self.state} to {state_name}')
self.previous_state = self.state
# execute transition functions
if self.state != state_name:
for fn in self.config[self.state]['ExitFunctions']:
fn = self.transition_function_map[fn]
fn(self.state, state_name, cubesat)
for fn in self.config[state_name]['EnterFunctions']:
fn = self.transition_function_map[fn]
fn(self.state, state_name, cubesat)
# reschedule tasks
self.stop_all()
self.scheduled_tasks = {}
self.state = state_name
state_config = self.config[state_name]
for task_name, props in state_config['Tasks'].items():
if props['ScheduleLater']:
schedule = tasko.schedule_later
else:
schedule = tasko.schedule
frequency = 1 / props['Interval']
priority = props['Priority']
task_fn = self.tasks[task_name]._run
self.scheduled_tasks[task_name] = schedule(
frequency, task_fn, priority)
state_machine = StateMachine()