mirror of
https://github.com/okalachev/flix.git
synced 2025-07-29 20:38:59 +00:00
Implement set_mode, set_attitude and set_rates in pyflix
This commit is contained in:
parent
7e8bd3e834
commit
2066d05a60
@ -6,7 +6,7 @@
|
||||
import os
|
||||
import time
|
||||
from queue import Queue, Empty
|
||||
from typing import Literal, Optional, Callable, List, Dict, Any, Union
|
||||
from typing import Literal, Optional, Callable, List, Dict, Any, Union, Sequence
|
||||
import logging
|
||||
import errno
|
||||
from threading import Thread, Timer
|
||||
@ -40,6 +40,7 @@ class Flix:
|
||||
|
||||
_connection_timeout = 3
|
||||
_print_buffer: str = ''
|
||||
_modes = ['MANUAL', 'ACRO', 'STAB', 'AUTO']
|
||||
|
||||
def __init__(self, system_id: int=1, wait_connection: bool=True):
|
||||
if not (0 <= system_id < 256):
|
||||
@ -147,7 +148,7 @@ class Flix:
|
||||
|
||||
def _handle_mavlink_message(self, msg: mavlink.MAVLink_message):
|
||||
if isinstance(msg, mavlink.MAVLink_heartbeat_message):
|
||||
self.mode = ['MANUAL', 'ACRO', 'STAB', 'USER'][msg.custom_mode]
|
||||
self.mode = self._modes[msg.custom_mode]
|
||||
self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0
|
||||
self._trigger('mode', self.mode)
|
||||
self._trigger('armed', self.armed)
|
||||
@ -231,6 +232,19 @@ class Flix:
|
||||
def _flu_to_mavlink(v: List[float]) -> List[float]:
|
||||
return Flix._mavlink_to_flu(v)
|
||||
|
||||
def _command_send(self, command: int, params: Sequence[float]):
|
||||
if len(params) != 7:
|
||||
raise ValueError('Command must have 7 parameters')
|
||||
for attempt in range(3):
|
||||
try:
|
||||
logger.debug(f'Send command {command} with params {params} (attempt #{attempt + 1})')
|
||||
self.mavlink.command_long_send(self.system_id, 0, command, 0, *params) # type: ignore
|
||||
self.wait('mavlink.COMMAND_ACK', value=lambda msg: msg.command == command and msg.result == mavlink.MAV_RESULT_ACCEPTED, timeout=0.1)
|
||||
return
|
||||
except TimeoutError:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to send command {command} after 3 attempts')
|
||||
|
||||
def _connected(self):
|
||||
# Reset disconnection timer
|
||||
self._disconnected_timer.cancel()
|
||||
@ -275,6 +289,11 @@ class Flix:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts')
|
||||
|
||||
def set_mode(self, mode: Union[str, int]):
|
||||
if isinstance(mode, str):
|
||||
mode = self._modes.index(mode.upper())
|
||||
self._command_send(mavlink.MAV_CMD_DO_SET_MODE, (0, mode, 0, 0, 0, 0, 0))
|
||||
|
||||
def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1):
|
||||
raise NotImplementedError('Position control is not implemented yet')
|
||||
|
||||
@ -282,17 +301,37 @@ class Flix:
|
||||
raise NotImplementedError('Velocity control is not implemented yet')
|
||||
|
||||
def set_attitude(self, attitude: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
if len(attitude) == 3:
|
||||
attitude = Quaternion([attitude[0], attitude[1], attitude[2]]).q # type: ignore
|
||||
elif len(attitude) != 4:
|
||||
raise ValueError('Attitude must be [roll, pitch, yaw] or [w, x, y, z] quaternion')
|
||||
if not (0 <= thrust <= 1):
|
||||
raise ValueError('Thrust must be in range [0, 1]')
|
||||
attitude = self._flu_to_mavlink(attitude)
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_attitude_target_send(0, self.system_id, 0, 0,
|
||||
[attitude[0], attitude[1], attitude[2], attitude[3]],
|
||||
0, 0, 0, thrust)
|
||||
|
||||
def set_rates(self, rates: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
if len(rates) != 3:
|
||||
raise ValueError('Rates must be [roll_rate, pitch_rate, yaw_rate]')
|
||||
if not (0 <= thrust <= 1):
|
||||
raise ValueError('Thrust must be in range [0, 1]')
|
||||
rates = self._flu_to_mavlink(rates)
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_attitude_target_send(0, self.system_id, 0,
|
||||
mavlink.ATTITUDE_TARGET_TYPEMASK_ATTITUDE_IGNORE,
|
||||
[1, 0, 0, 0],
|
||||
rates[0], rates[1], rates[2], thrust)
|
||||
|
||||
def set_motors(self, motors: List[float]):
|
||||
if len(motors) != 4:
|
||||
raise ValueError('motors must have 4 values')
|
||||
if not all(0 <= m <= 1 for m in motors):
|
||||
raise ValueError('motors must be in range [0, 1]')
|
||||
raise NotImplementedError
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_actuator_control_target_send(time.time() * 1000, 0, self.system_id, 0, motors + [0] * 4) # type: ignore
|
||||
|
||||
def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float):
|
||||
"""Send pilot's controls. Warning: not intended for automatic control"""
|
||||
@ -302,9 +341,6 @@ class Flix:
|
||||
raise ValueError('throttle must be in range [0, 1]')
|
||||
self.mavlink.manual_control_send(self.system_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore
|
||||
|
||||
def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'USER']):
|
||||
raise NotImplementedError('Setting mode is not implemented yet')
|
||||
|
||||
def cli(self, cmd: str, wait_response: bool = True) -> str:
|
||||
cmd = cmd.strip()
|
||||
if cmd == 'reboot':
|
||||
|
Loading…
x
Reference in New Issue
Block a user