Add Python library (#20)

This commit is contained in:
Oleg Kalachev 2025-07-22 14:17:08 +03:00 committed by GitHub
parent 779fa13e80
commit 1f47aa6d62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 713 additions and 2 deletions

View File

@ -19,6 +19,21 @@ jobs:
echo -e "t,x,y,z\n0,1,2,3\n1,4,5,6" > log.csv
./csv_to_ulog log.csv
test $(stat -c %s log.ulg) -eq 196
pyflix:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Python build tools
run: pip install build
- name: Build pyflix
run: python3 -m build tools
- name: Upload artifacts
uses: actions/upload-artifact@v4
with:
name: pyflix
path: |
tools/dist/pyflix-*.tar.gz
tools/dist/pyflix-*.whl
python_tools:
runs-on: ubuntu-latest
steps:

2
.gitignore vendored
View File

@ -2,6 +2,8 @@
*.elf
build/
tools/log/
tools/dist/
*.egg-info/
.dependencies
.vscode/*
!.vscode/settings.json

View File

@ -22,6 +22,7 @@
* Precise simulation with Gazebo.
* Wi-Fi and MAVLink support.
* Wireless command line interface and analyzing.
* Python library.
* Textbook on flight control theory and practice ([in development](https://quadcopter.dev)).
* *Position control (using external camera) and autonomous flights¹*.
@ -53,6 +54,7 @@ The simulator is implemented using Gazebo and runs the original Arduino code:
* [Building and running the code](docs/build.md).
* [Troubleshooting](docs/troubleshooting.md).
* [Firmware architecture overview](docs/firmware.md).
* [Python library tutorial](tools/pyflix/README.md).
* [Log analysis](docs/log.md).
* [User builds gallery](docs/user.md).

View File

@ -37,7 +37,7 @@ void sendMavlink() {
mavlink_msg_heartbeat_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, MAV_TYPE_QUADROTOR, MAV_AUTOPILOT_GENERIC,
MAV_MODE_FLAG_MANUAL_INPUT_ENABLED | (armed * MAV_MODE_FLAG_SAFETY_ARMED) | ((mode == STAB) * MAV_MODE_FLAG_STABILIZE_ENABLED),
0, MAV_STATE_STANDBY);
mode, MAV_STATE_STANDBY);
sendMessage(&msg);
mavlink_msg_extended_sys_state_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg,
@ -208,7 +208,9 @@ void sendMavlinkPrint() {
strlcpy(data, str + i, sizeof(data));
mavlink_message_t msg;
mavlink_msg_serial_control_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg,
SERIAL_CONTROL_DEV_SHELL, 0, 0, 0, strlen(data), (uint8_t *)data, 0, 0);
SERIAL_CONTROL_DEV_SHELL,
i + MAVLINK_MSG_SERIAL_CONTROL_FIELD_DATA_LEN < strlen(str) ? SERIAL_CONTROL_FLAG_MULTI : 0, // more chunks to go
0, 0, strlen(data), (uint8_t *)data, 0, 0);
sendMessage(&msg);
}
mavlinkPrintBuffer.clear();

13
tools/cli.py Executable file
View File

@ -0,0 +1,13 @@
#!/usr/bin/env python3
# Remote CLI for Flix
from pyflix import Flix
flix = Flix()
flix.on('print', lambda text: print(text, end=''))
while True:
command = input()
flix.cli(command, wait_response=False)

39
tools/example.py Executable file
View File

@ -0,0 +1,39 @@
#!/usr/bin/env python3
import math
from pyflix import Flix
print('=== Connect...')
flix = Flix()
print('Connected:', flix.connected)
print('Mode:', flix.mode)
print('Armed:', flix.armed)
print('Landed:', flix.landed)
print('Rates:', *[f'{math.degrees(r):.0f}°/s' for r in flix.rates])
print('Attitude:', *[f'{math.degrees(a):.0f}°' for a in flix.attitude_euler])
print('Motors:', flix.motors)
print('Acc', flix.acc)
print('Gyro', flix.gyro)
print('=== Execute commands...')
print('> time')
print(flix.cli('time'))
print('> imu')
print(flix.cli('imu'))
print('=== Get parameter...')
pitch_p = flix.get_param('PITCH_P')
print('PITCH_P = ', pitch_p)
print('=== Set parameter...')
flix.set_param('PITCH_P', pitch_p)
print('=== Wait for gyro update...')
print('Gyro: ', flix.wait('gyro'))
print('=== Wait for HEARTBEAT message...')
print(flix.wait('mavlink.HEARTBEAT'))
print('=== When until landed = False (remove drone from the surface)')
flix.wait('landed', value=False)

23
tools/log.py Executable file
View File

@ -0,0 +1,23 @@
#!/usr/bin/env python3
# Download flight log remotely and save to file
import os
import datetime
from pyflix import Flix
DIR = os.path.dirname(os.path.realpath(__file__))
flix = Flix()
print('Downloading log...')
lines = flix.cli('log').splitlines()
# sort by timestamp
header = lines.pop(0)
lines.sort(key=lambda line: float(line.split(',')[0]))
log = open(f'{DIR}/log/{datetime.datetime.now().isoformat()}.csv', 'wb')
content = header.encode() + b'\n' + b'\n'.join(line.encode() for line in lines)
log.write(content)
print(f'Written {os.path.relpath(log.name, os.curdir)}')

225
tools/pyflix/README.md Normal file
View File

@ -0,0 +1,225 @@
# Flix Python library
The Flix Python library allows you to remotely connect to a Flix quadcopter. It provides access to telemetry data, supports executing CLI commands, and controlling the drone's flight.
To use the library, connect to the drone's Wi-Fi. To use it with the simulator, ensure the script runs on the same local network as the simulator.
## Installation
If you have cloned the [repo](https://github.com/okalachev/flix), install the library from the repo:
```bash
cd /path/to/flix/repo
pip install -e tools
```
Alternatively, install from pip:
```bash
pip install pyflix
```
## Usage
The API is accessed through the `Flix` class:
```python
from flix import Flix
flix = Flix() # create a Flix object and wait for connection
```
### Telemetry
Basic telemetry is available through object properties. The properties names generally match the corresponding variables in the firmware itself:
```python
print(flix.connected) # True if connected to the drone
print(flix.mode) # current flight mode (str)
print(flix.armed) # True if the drone is armed
print(flix.landed) # True if the drone is landed
print(flix.attitude) # attitude quaternion [w, x, y, z]
print(flix.attitude_euler) # attitude as Euler angles [roll, pitch, yaw]
print(flix.rates) # angular rates [roll_rate, pitch_rate, yaw_rate]
print(flix.channels) # raw RC channels (list)
print(flix.motors) # motors outputs (list)
print(flix.acc) # accelerometer output (list)
print(flix.gyro) # gyroscope output (list)
```
> [!NOTE]
> The library uses the Front-Left-Up coordinate system — the same as in the firmware. All angles are in radians.
### Events
The Flix object implements the *Observable* pattern, allowing to listen for events. You can subscribe to events using `on` method:
```python
flix.on('connected', lambda: print('Connected to Flix'))
flix.on('disconnected', lambda: print('Disconnected from Flix'))
flix.on('print', lambda text: print(f'Flix says: {text}'))
```
You can also wait for specific events using `wait` method. This method returns the data associated with the event:
```python
gyro = flix.wait('gyro') # wait for gyroscope update
attitude = flix.wait('attitude', timeout=3) # wait for attitude update, raise TimeoutError after 3 seconds
```
The `value` argument specifies a condition for filtering events. It can be either an expected value or a callable:
```python
flix.wait('armed', value=True) # wait until armed
flix.wait('armed', value=False) # wait until disarmed
flix.wait('motors', value=lambda motors: all(m == 0 for m in motors)) # wait until all motors stop
flix.wait('attitude_euler', value=lambda att: att[0] > 0) # wait until roll angle is positive
```
Full list of events:
|Event|Description|Associated data|
|-----|-----------|----------------|
|`connected`|Connected to the drone||
|`disconnected`|Connection is lost||
|`armed`|Armed state update|Armed state (*bool*)|
|`mode`|Flight mode update|Flight mode (*str*)|
|`landed`|Landed state update|Landed state (*bool*)|
|`print`|The drone sends text to the console|Text|
|`attitude`|Attitude update|Attitude quaternion (*list*)|
|`attitude_euler`|Attitude update|Euler angles (*list*)|
|`rates`|Angular rates update|Angular rates (*list*)|
|`channels`|Raw RC channels update|Raw RC channels (*list*)|
|`motors`|Motors outputs update|Motors outputs (*list*)|
|`acc`|Accelerometer update|Accelerometer output (*list*)|
|`gyro`|Gyroscope update|Gyroscope output (*list*)|
|`mavlink`|Received MAVLink message|Message object|
|`mavlink.<message_name>`|Received specific MAVLink message|Message object|
|`mavlink.<message_id>`|Received specific MAVLink message|Message object|
|`value`|Named value update (see below)|Name, value|
|`value.<name>`|Specific named value update (see bellow)|Value|
> [!NOTE]
> Update events trigger on every new data from the drone, and do not mean the value is changed.
### Common methods
Get and set firmware parameters using `get_param` and `set_param` methods:
```python
print(flix.get_param('PITCH_P')) # get parameter value
flix.set_param('PITCH_P', 5) # set parameter value
```
Execute CLI commands using `cli` method. This method returns command response:
```python
imu = flix.cli('imu') # get detailed IMU data
time = flix.cli('time') # get detailed time data
flix.cli('reboot') # reboot the drone
```
> [!TIP]
> Use `help` command to get the list of available commands.
### Automatic flight
The flight control feature is in development. List of methods intended for automatic flight control:
* `set_position`
* `set_attitude`
* `set_rates`
* `set_motors`
* `set_controls`
* `set_mode`
## Usage alongside QGroundControl
You can use the Flix library alongside the QGroundControl app, using a proxy mode. To do that:
1. Run proxy for `pyflix` and QGroundControl in background:
```bash
flix-proxy
```
2. Go to QGroundControl settings ⇒ *Comm Links*.
3. Add new link with the following settings:
* *Name*: Proxy
* *Automatically Connect on Start*: ✓
* *Type*: UDP
* *Port*: 14560
4. Restart QGroundControl.
Now you can run `pyflix` scripts and QGroundControl simultaneously.
## Tools
The following scripts demonstrate how to use the library:
* [`cli.py`](../cli.py) — remote access to the drone's command line interface.
* [`log.py`](../log.py) — download flight logs from the drone.
* [`example.py`](../example.py) — a simple example, prints telemetry data and waits for events.
## Advanced usage
### MAVLink
You can access the most recently received messages using `messages` property:
```python
print(flix.messages.get('HEARTBEAT')) # print the latest HEARTBEAT message
```
You can wait for a specific message using `wait` method:
```python
heartbeat = flix.wait('mavlink.HEARTBEAT')
```
You can send raw messages using `mavlink` property:
```python
from pymavlink.dialects.v20 import common as mavlink
flix.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID,
mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, 0, 0)
```
### Named values
You can pass arbitrary named values from the firmware to the Python script using `NAMED_VALUE_FLOAT`, `NAMED_VALUE_INT`, `DEBUG`, `DEBUG_VECT`, and `DEBUG_FLOAT_ARRAY` MAVLink messages.
All these named values will appear in the `values` dictionary:
```python
print(flix.values['some_value'])
print(flix.values['some_vector'])
```
You can send values from the firmware like this (`mavlink.ino`):
```cpp
// Send float named value
mavlink_msg_named_value_float_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, t, "some_value", loopRate);
sendMessage(&msg);
// Send vector named value
mavlink_msg_debug_vect_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, "some_vector", t, gyroBias.x, gyroBias.y, gyroBias.z);
sendMessage(&msg);
```
### Logging
You can control Flix library verbosity using Python's `logging` module:
```python
import logging
logger = logging.getLogger('flix')
logger.setLevel(logging.DEBUG) # be more verbose
logger.setLevel(logging.WARNING) # be less verbose
```
## Stability
The library is in development stage. The API is not stable.

1
tools/pyflix/__init__.py Normal file
View File

@ -0,0 +1 @@
from .flix import Flix

324
tools/pyflix/flix.py Normal file
View File

@ -0,0 +1,324 @@
# Copyright (c) 2025 Oleg Kalachev <okalachev@gmail.com>
# Repository: https://github.com/okalachev/flix
"""Python API for Flix drone."""
import os
import time
from queue import Queue, Empty
from typing import Literal, Optional, Callable, List, Dict, Any, Union
import logging
import errno
from threading import Thread, Timer
from pymavlink import mavutil
from pymavlink.quaternion import Quaternion
from pymavlink.dialects.v20 import common as mavlink
logger = logging.getLogger('flix')
if not logger.hasHandlers():
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class Flix:
connected: bool = False
mode: str = ''
armed: bool = False
landed: bool = False
attitude: List[float]
attitude_euler: List[float] # roll, pitch, yaw
rates: List[float]
channels: List[int]
motors: List[float]
acc: List[float]
gyro: List[float]
mav_id: int
messages: Dict[str, Dict[str, Any]] # MAVLink messages storage
values: Dict[Union[str, int], Union[float, List[float]]] = {} # named values
_connection_timeout = 3
_print_buffer: str = ''
def __init__(self, mav_id: int=1, wait_connection: bool=True):
if not (0 <= mav_id < 256):
raise ValueError('mav_id must be in range [0, 255]')
self._setup_mavlink()
self.mav_id = mav_id
self._init_state()
try:
# Direct connection
logger.debug('Listening on port 14550')
self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14550', source_system=255) # type: ignore
except OSError as e:
if e.errno != errno.EADDRINUSE:
raise
# Port busy - using proxy
logger.debug('Listening on port 14560 (proxy)')
self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14555', source_system=254) # type: ignore
self.connection.target_system = mav_id
self.mavlink: mavlink.MAVLink = self.connection.mav
self._event_listeners: Dict[str, List[Callable[..., Any]]] = {}
self.messages = {}
self._disconnected_timer = Timer(0, self._disconnected)
self._reader_thread = Thread(target=self._read_mavlink, daemon=True)
self._reader_thread.start()
self._heartbeat_thread = Thread(target=self._send_heartbeat, daemon=True)
self._heartbeat_thread.start()
if wait_connection:
self.wait('mavlink.HEARTBEAT')
time.sleep(0.2) # give some time to receive initial state
def _init_state(self):
self.attitude = [1, 0, 0, 0]
self.attitude_euler = [0, 0, 0]
self.rates = [0, 0, 0]
self.channels = [0, 0, 0, 0, 0, 0, 0, 0]
self.motors = [0, 0, 0, 0]
self.acc = [0, 0, 0]
self.gyro = [0, 0, 0]
def on(self, event: str, callback: Callable):
event = event.lower()
if event not in self._event_listeners:
self._event_listeners[event] = []
self._event_listeners[event].append(callback)
def off(self, callback: Callable):
for event in self._event_listeners:
if callback in self._event_listeners[event]:
self._event_listeners[event].remove(callback)
def _trigger(self, event: str, *args):
event = event.lower()
for callback in self._event_listeners.get(event, []):
try:
callback(*args)
except Exception as e:
logger.error(f'Error in event listener for event {event}: {e}')
def wait(self, event: str, value: Union[Any, Callable[..., bool]] = lambda *args: True, timeout=None) -> Any:
"""Wait for an event"""
event = event.lower()
q = Queue()
def callback(*args):
if len(args) == 0:
result = None
elif len(args) == 1:
result = args[0]
else:
result = args
if callable(value) and value(*args):
q.put_nowait(result)
elif value == result:
q.put_nowait(result)
self.on(event, callback)
try:
return q.get(timeout=timeout)
except Empty:
raise TimeoutError
finally:
self.off(callback)
@staticmethod
def _setup_mavlink():
# otherwise it will use MAVLink 1.0 until connected
os.environ['MAVLINK20'] = '1'
mavutil.set_dialect('common')
def _read_mavlink(self):
while True:
try:
msg: Optional[mavlink.MAVLink_message] = self.connection.recv_match(blocking=True)
if msg is None:
continue
self._connected()
msg_dict = msg.to_dict()
msg_dict['_timestamp'] = time.time() # add timestamp
self.messages[msg.get_type()] = msg_dict
self._trigger('mavlink', msg)
self._trigger(f'mavlink.{msg.get_type()}', msg) # trigger mavlink.<message_type>
self._trigger(f'mavlink.{msg.get_msgId()}', msg) # trigger mavlink.<message_id>
self._handle_mavlink_message(msg)
except Exception as e:
logger.error(f'Error reading MAVLink message: {e}')
def _handle_mavlink_message(self, msg: mavlink.MAVLink_message):
if isinstance(msg, mavlink.MAVLink_heartbeat_message):
self.mode = ['MANUAL', 'ACRO', 'STAB', 'USER'][msg.custom_mode]
self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0
self._trigger('mode', self.mode)
self._trigger('armed', self.armed)
if isinstance(msg, mavlink.MAVLink_extended_sys_state_message):
self.landed = msg.landed_state == mavlink.MAV_LANDED_STATE_ON_GROUND
self._trigger('landed', self.landed)
if isinstance(msg, mavlink.MAVLink_attitude_quaternion_message):
self.attitude = self._mavlink_to_flu([msg.q1, msg.q2, msg.q3, msg.q4])
self.rates = self._mavlink_to_flu([msg.rollspeed, msg.pitchspeed, msg.yawspeed])
self.attitude_euler = list(Quaternion(self.attitude).euler) # type: ignore
self._trigger('attitude', self.attitude)
self._trigger('attitude_euler', self.attitude_euler)
if isinstance(msg, mavlink.MAVLink_rc_channels_raw_message):
self.channels = [msg.chan1_raw, msg.chan2_raw, msg.chan3_raw, msg.chan4_raw,
msg.chan5_raw, msg.chan6_raw, msg.chan7_raw, msg.chan8_raw]
self._trigger('channels', self.channels)
if isinstance(msg, mavlink.MAVLink_actuator_output_status_message):
self.motors = msg.actuator[:4] # type: ignore
self._trigger('motors', self.motors)
if isinstance(msg, mavlink.MAVLink_scaled_imu_message):
self.acc = self._mavlink_to_flu([msg.xacc / 1000, msg.yacc / 1000, msg.zacc / 1000])
self.gyro = self._mavlink_to_flu([msg.xgyro / 1000, msg.ygyro / 1000, msg.zgyro / 1000])
self._trigger('acc', self.acc)
self._trigger('gyro', self.gyro)
if isinstance(msg, mavlink.MAVLink_serial_control_message):
# new chunk of data
text = bytes(msg.data)[:msg.count].decode('utf-8', errors='ignore')
logger.debug(f'Console: {repr(text)}')
self._trigger('print', text)
self._print_buffer += text
if msg.flags & mavlink.SERIAL_CONTROL_FLAG_MULTI == 0:
# last chunk
self._trigger('print_full', self._print_buffer)
self._print_buffer = ''
if isinstance(msg, mavlink.MAVLink_statustext_message):
logger.info(f'Flix #{msg.get_srcSystem()}: {msg.text}')
self._trigger('status', msg.text)
if isinstance(msg, (mavlink.MAVLink_named_value_float_message, mavlink.MAVLink_named_value_int_message)):
self.values[msg.name] = msg.value
self._trigger('value', msg.name, msg.value)
self._trigger(f'value.{msg.name}', msg.value)
if isinstance(msg, mavlink.MAVLink_debug_message):
self.values[msg.ind] = msg.value
self._trigger('value', msg.ind, msg.value)
self._trigger(f'value.{msg.ind}', msg.value)
if isinstance(msg, mavlink.MAVLink_debug_vect_message):
self.values[msg.name] = [msg.x, msg.y, msg.z]
self._trigger('value', msg.name, self.values[msg.name])
self._trigger(f'value.{msg.name}', self.values[msg.name])
if isinstance(msg, mavlink.MAVLink_debug_float_array_message):
self.values[msg.name] = list(msg.data)
self._trigger('value', msg.name, self.values[msg.name])
self._trigger(f'value.{msg.name}', self.values[msg.name])
def _send_heartbeat(self):
while True:
self.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
time.sleep(1)
@staticmethod
def _mavlink_to_flu(v: List[float]) -> List[float]:
if len(v) == 3: # vector
return [v[0], -v[1], -v[2]]
elif len(v) == 4: # quaternion
return [v[0], v[1], -v[2], -v[3]]
else:
raise ValueError(f'List must have 3 (vector) or 4 (quaternion) elements')
@staticmethod
def _flu_to_mavlink(v: List[float]) -> List[float]:
return Flix._mavlink_to_flu(v)
def _connected(self):
# Reset disconnection timer
self._disconnected_timer.cancel()
self._disconnected_timer = Timer(self._connection_timeout, self._disconnected)
self._disconnected_timer.start()
if not self.connected:
logger.info('Connection is established')
self.connected = True
self._trigger('connected')
def _disconnected(self):
logger.info('Connection is lost')
self.connected = False
self._trigger('disconnected')
def get_param(self, name: str) -> float:
if len(name.encode('ascii')) > 16:
raise ValueError('Parameter name must be 16 characters or less')
for attempt in range(3):
try:
logger.debug(f'Get param {name} (attempt #{attempt + 1})')
self.mavlink.param_request_read_send(self.mav_id, 0, name.encode('ascii'), -1)
msg: mavlink.MAVLink_param_value_message = \
self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1)
return msg.param_value
except TimeoutError:
continue
raise RuntimeError(f'Failed to get parameter {name} after 3 attempts')
def set_param(self, name: str, value: float):
if len(name.encode('ascii')) > 16:
raise ValueError('Parameter name must be 16 characters or less')
for attempt in range(3):
try:
logger.debug(f'Set param {name} to {value} (attempt #{attempt + 1})')
self.mavlink.param_set_send(self.mav_id, 0, name.encode('ascii'), value, mavlink.MAV_PARAM_TYPE_REAL32)
self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1)
return
except TimeoutError:
# on timeout try again
continue
raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts')
def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1):
raise NotImplementedError('Position control is not implemented yet')
def set_attitude(self, attitude: List[float], thrust: float):
raise NotImplementedError('Automatic flight is not implemented yet')
def set_rates(self, rates: List[float], thrust: float):
raise NotImplementedError('Automatic flight is not implemented yet')
def set_motors(self, motors: List[float]):
if len(motors) != 4:
raise ValueError('motors must have 4 values')
if not all(0 <= m <= 1 for m in motors):
raise ValueError('motors must be in range [0, 1]')
raise NotImplementedError
def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float):
"""Send pilot's controls. Warning: not intended for automatic control"""
if not (-1 <= roll <= 1 and -1 <= pitch <= 1 and -1 <= yaw <= 1):
raise ValueError('roll, pitch, yaw must be in range [-1, 1]')
if not 0 <= throttle <= 1:
raise ValueError('throttle must be in range [0, 1]')
self.mavlink.manual_control_send(self.mav_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore
def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'USER']):
raise NotImplementedError('Setting mode is not implemented yet')
def cli(self, cmd: str, wait_response: bool = True) -> str:
cmd = cmd.strip()
if cmd == 'reboot':
wait_response = False # reboot command doesn't respond
cmd_bytes = (cmd + '\n').encode('utf-8')
if len(cmd_bytes) > 70:
raise ValueError(f'Command is too long: {len(cmd_bytes)} > 70')
cmd_bytes = cmd_bytes.ljust(70, b'\0')
response_prefix = f'> {cmd}\n'
for attempt in range(3):
logger.debug(f'Send command {cmd} (attempt #{attempt + 1})')
try:
self.mavlink.serial_control_send(0, 0, 0, 0, len(cmd_bytes), cmd_bytes)
if not wait_response:
return ''
response = self.wait('print_full', timeout=0.1, value=lambda text: text.startswith(response_prefix))
return response[len(response_prefix):].strip()
except TimeoutError:
continue
raise RuntimeError(f'Failed to send command {cmd} after 3 attempts')

37
tools/pyflix/proxy.py Executable file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env python3
"""Proxy for running pyflix library alongside QGroundControl app."""
import socket
LOCAL = ('0.0.0.0', 14550) # from Flix
TARGETS = (
('127.0.0.1', 14560), # to QGroundControl
('127.0.0.1', 14555), # to pyflix
)
def main():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(LOCAL)
source_addr = None
packets = 0
print('Proxy started - run QGroundControl')
while True:
data, addr = sock.recvfrom(1024) # read entire UDP packet
if addr in TARGETS: # packet from target
if source_addr is None:
continue
sock.sendto(data, source_addr)
else: # packet from source
source_addr = addr
for target in TARGETS:
sock.sendto(data, target)
packets += 1
print(f'\rPackets: {packets}', end='')
if __name__ == '__main__':
main()

28
tools/pyproject.toml Normal file
View File

@ -0,0 +1,28 @@
[project]
name = "pyflix"
version = "0.5"
description = "Python API for Flix drone"
authors = [{ name="Oleg Kalachev", email="okalachev@gmail.com" }]
license = "MIT"
readme = "pyflix/README.md"
requires-python = ">=3.8"
dependencies = [
"pymavlink",
]
[project.scripts]
flix-proxy = "pyflix.proxy:main"
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools]
packages = ["pyflix"]
[tool.setuptools.package-data]
pyflix = ["README.md"]
[project.urls]
Homepage = "https://github.com/okalachev/flix/tree/master/tools/pyflix"
Issues = "https://github.com/okalachev/flix/issues"