Some fixes and updates in pyflix

Fix set_controls
Add set_armed method
This commit is contained in:
Oleg Kalachev
2025-09-25 16:53:49 +03:00
parent 09bf09e520
commit 0268c8ebcf
2 changed files with 19 additions and 8 deletions

View File

@@ -122,6 +122,13 @@ flix.cli('reboot') # reboot the drone
> [!TIP] > [!TIP]
> Use `help` command to get the list of available commands. > Use `help` command to get the list of available commands.
You can arm and disarm the drone using `set_armed` method (warning: the drone will fall if disarmed in the air):
```python
flix.set_armed(True) # arm the drone
flix.set_armed(False) # disarm the drone
```
You can imitate pilot's controls using `set_controls` method: You can imitate pilot's controls using `set_controls` method:
```python ```python
@@ -173,7 +180,7 @@ To exit from *AUTO* mode move control sticks and the drone will switch to *STAB*
## Usage alongside QGroundControl ## Usage alongside QGroundControl
You can use the Flix library alongside the QGroundControl app, using a proxy mode. To do that: You can use the Flix library alongside the QGroundControl app, using proxy mode. To do that:
1. Run proxy for `pyflix` and QGroundControl in background: 1. Run proxy for `pyflix` and QGroundControl in background:
@@ -241,11 +248,11 @@ You can send values from the firmware like this (`mavlink.ino`):
```cpp ```cpp
// Send float named value // Send float named value
mavlink_msg_named_value_float_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, t, "some_value", loopRate); mavlink_msg_named_value_float_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, t, "loop_rate", loopRate);
sendMessage(&msg); sendMessage(&msg);
// Send vector named value // Send vector named value
mavlink_msg_debug_vect_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, "some_vector", t, gyroBias.x, gyroBias.y, gyroBias.z); mavlink_msg_debug_vect_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, "gyro_bias", t, gyroBias.x, gyroBias.y, gyroBias.z);
sendMessage(&msg); sendMessage(&msg);
``` ```

View File

@@ -6,7 +6,7 @@
import os import os
import time import time
from queue import Queue, Empty from queue import Queue, Empty
from typing import Literal, Optional, Callable, List, Dict, Any, Union, Sequence from typing import Optional, Callable, List, Dict, Any, Union, Sequence
import logging import logging
import errno import errno
from threading import Thread, Timer from threading import Thread, Timer
@@ -36,7 +36,7 @@ class Flix:
system_id: int system_id: int
messages: Dict[str, Dict[str, Any]] # MAVLink messages storage messages: Dict[str, Dict[str, Any]] # MAVLink messages storage
values: Dict[Union[str, int], Union[float, List[float]]] = {} # named values values: Dict[Union[str, int], Union[float, List[float]]] # named values
_connection_timeout = 3 _connection_timeout = 3
_print_buffer: str = '' _print_buffer: str = ''
@@ -61,7 +61,6 @@ class Flix:
self.connection.target_system = system_id self.connection.target_system = system_id
self.mavlink: mavlink.MAVLink = self.connection.mav self.mavlink: mavlink.MAVLink = self.connection.mav
self._event_listeners: Dict[str, List[Callable[..., Any]]] = {} self._event_listeners: Dict[str, List[Callable[..., Any]]] = {}
self.messages = {}
self._disconnected_timer = Timer(0, self._disconnected) self._disconnected_timer = Timer(0, self._disconnected)
self._reader_thread = Thread(target=self._read_mavlink, daemon=True) self._reader_thread = Thread(target=self._read_mavlink, daemon=True)
self._reader_thread.start() self._reader_thread.start()
@@ -79,6 +78,8 @@ class Flix:
self.motors = [0, 0, 0, 0] self.motors = [0, 0, 0, 0]
self.acc = [0, 0, 0] self.acc = [0, 0, 0]
self.gyro = [0, 0, 0] self.gyro = [0, 0, 0]
self.messages = {}
self.values = {}
def on(self, event: str, callback: Callable): def on(self, event: str, callback: Callable):
event = event.lower() event = event.lower()
@@ -148,7 +149,7 @@ class Flix:
def _handle_mavlink_message(self, msg: mavlink.MAVLink_message): def _handle_mavlink_message(self, msg: mavlink.MAVLink_message):
if isinstance(msg, mavlink.MAVLink_heartbeat_message): if isinstance(msg, mavlink.MAVLink_heartbeat_message):
self.mode = self._modes[msg.custom_mode] self.mode = self._modes[msg.custom_mode] if msg.custom_mode < len(self._modes) else f'UNKNOWN({msg.custom_mode})'
self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0 self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0
self._trigger('mode', self.mode) self._trigger('mode', self.mode)
self._trigger('armed', self.armed) self._trigger('armed', self.armed)
@@ -299,6 +300,9 @@ class Flix:
mode = self._modes.index(mode.upper()) mode = self._modes.index(mode.upper())
self._command_send(mavlink.MAV_CMD_DO_SET_MODE, (0, mode, 0, 0, 0, 0, 0)) self._command_send(mavlink.MAV_CMD_DO_SET_MODE, (0, mode, 0, 0, 0, 0, 0))
def set_armed(self, armed: bool):
self._command_send(mavlink.MAV_CMD_COMPONENT_ARM_DISARM, (1 if armed else 0, 0, 0, 0, 0, 0, 0))
def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1): def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1):
raise NotImplementedError('Position control is not implemented yet') raise NotImplementedError('Position control is not implemented yet')
@@ -344,7 +348,7 @@ class Flix:
raise ValueError('roll, pitch, yaw must be in range [-1, 1]') raise ValueError('roll, pitch, yaw must be in range [-1, 1]')
if not 0 <= throttle <= 1: if not 0 <= throttle <= 1:
raise ValueError('throttle must be in range [0, 1]') raise ValueError('throttle must be in range [0, 1]')
self.mavlink.manual_control_send(self.system_id, int(roll * 1000), int(pitch * 1000), int(yaw * 1000), int(throttle * 1000), 0) # type: ignore self.mavlink.manual_control_send(self.system_id, int(pitch * 1000), int(roll * 1000), int(throttle * 1000), int(yaw * 1000), 0) # type: ignore
def cli(self, cmd: str, wait_response: bool = True) -> str: def cli(self, cmd: str, wait_response: bool = True) -> str:
cmd = cmd.strip() cmd = cmd.strip()