diff --git a/tools/pyflix/flix.py b/tools/pyflix/flix.py index cb73cf1..fff2505 100644 --- a/tools/pyflix/flix.py +++ b/tools/pyflix/flix.py @@ -6,7 +6,7 @@ import os import time from queue import Queue, Empty -from typing import Literal, Optional, Callable, List, Dict, Any, Union +from typing import Literal, Optional, Callable, List, Dict, Any, Union, Sequence import logging import errno from threading import Thread, Timer @@ -40,6 +40,7 @@ class Flix: _connection_timeout = 3 _print_buffer: str = '' + _modes = ['MANUAL', 'ACRO', 'STAB', 'AUTO'] def __init__(self, system_id: int=1, wait_connection: bool=True): if not (0 <= system_id < 256): @@ -147,7 +148,7 @@ class Flix: def _handle_mavlink_message(self, msg: mavlink.MAVLink_message): if isinstance(msg, mavlink.MAVLink_heartbeat_message): - self.mode = ['MANUAL', 'ACRO', 'STAB', 'USER'][msg.custom_mode] + self.mode = self._modes[msg.custom_mode] self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0 self._trigger('mode', self.mode) self._trigger('armed', self.armed) @@ -231,6 +232,19 @@ class Flix: def _flu_to_mavlink(v: List[float]) -> List[float]: return Flix._mavlink_to_flu(v) + def _command_send(self, command: int, params: Sequence[float]): + if len(params) != 7: + raise ValueError('Command must have 7 parameters') + for attempt in range(3): + try: + logger.debug(f'Send command {command} with params {params} (attempt #{attempt + 1})') + self.mavlink.command_long_send(self.system_id, 0, command, 0, *params) # type: ignore + self.wait('mavlink.COMMAND_ACK', value=lambda msg: msg.command == command and msg.result == mavlink.MAV_RESULT_ACCEPTED, timeout=0.1) + return + except TimeoutError: + continue + raise RuntimeError(f'Failed to send command {command} after 3 attempts') + def _connected(self): # Reset disconnection timer self._disconnected_timer.cancel() @@ -275,6 +289,11 @@ class Flix: continue raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts') + def set_mode(self, mode: Union[str, int]): + if isinstance(mode, str): + mode = self._modes.index(mode.upper()) + self._command_send(mavlink.MAV_CMD_DO_SET_MODE, (0, mode, 0, 0, 0, 0, 0)) + def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1): raise NotImplementedError('Position control is not implemented yet') @@ -282,17 +301,37 @@ class Flix: raise NotImplementedError('Velocity control is not implemented yet') def set_attitude(self, attitude: List[float], thrust: float): - raise NotImplementedError('Automatic flight is not implemented yet') + if len(attitude) == 3: + attitude = Quaternion([attitude[0], attitude[1], attitude[2]]).q # type: ignore + elif len(attitude) != 4: + raise ValueError('Attitude must be [roll, pitch, yaw] or [w, x, y, z] quaternion') + if not (0 <= thrust <= 1): + raise ValueError('Thrust must be in range [0, 1]') + attitude = self._flu_to_mavlink(attitude) + for _ in range(2): # duplicate to ensure delivery + self.mavlink.set_attitude_target_send(0, self.system_id, 0, 0, + [attitude[0], attitude[1], attitude[2], attitude[3]], + 0, 0, 0, thrust) def set_rates(self, rates: List[float], thrust: float): - raise NotImplementedError('Automatic flight is not implemented yet') + if len(rates) != 3: + raise ValueError('Rates must be [roll_rate, pitch_rate, yaw_rate]') + if not (0 <= thrust <= 1): + raise ValueError('Thrust must be in range [0, 1]') + rates = self._flu_to_mavlink(rates) + for _ in range(2): # duplicate to ensure delivery + self.mavlink.set_attitude_target_send(0, self.system_id, 0, + mavlink.ATTITUDE_TARGET_TYPEMASK_ATTITUDE_IGNORE, + [1, 0, 0, 0], + rates[0], rates[1], rates[2], thrust) def set_motors(self, motors: List[float]): if len(motors) != 4: raise ValueError('motors must have 4 values') if not all(0 <= m <= 1 for m in motors): raise ValueError('motors must be in range [0, 1]') - raise NotImplementedError + for _ in range(2): # duplicate to ensure delivery + self.mavlink.set_actuator_control_target_send(time.time() * 1000, 0, self.system_id, 0, motors + [0] * 4) # type: ignore def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float): """Send pilot's controls. Warning: not intended for automatic control""" @@ -302,9 +341,6 @@ class Flix: raise ValueError('throttle must be in range [0, 1]') self.mavlink.manual_control_send(self.system_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore - def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'USER']): - raise NotImplementedError('Setting mode is not implemented yet') - def cli(self, cmd: str, wait_response: bool = True) -> str: cmd = cmd.strip() if cmd == 'reboot':