diff --git a/.github/workflows/tools.yml b/.github/workflows/tools.yml index 2559b2a..88d1f25 100644 --- a/.github/workflows/tools.yml +++ b/.github/workflows/tools.yml @@ -19,6 +19,21 @@ jobs: echo -e "t,x,y,z\n0,1,2,3\n1,4,5,6" > log.csv ./csv_to_ulog log.csv test $(stat -c %s log.ulg) -eq 196 + pyflix: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Install Python build tools + run: pip install build + - name: Build pyflix + run: python3 -m build tools + - name: Upload artifacts + uses: actions/upload-artifact@v4 + with: + name: pyflix + path: | + tools/dist/pyflix-*.tar.gz + tools/dist/pyflix-*.whl python_tools: runs-on: ubuntu-latest steps: diff --git a/.gitignore b/.gitignore index 1057537..8c71511 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,8 @@ *.elf build/ tools/log/ +tools/dist/ +*.egg-info/ .dependencies .vscode/* !.vscode/settings.json diff --git a/README.md b/README.md index bd1007a..35c45a1 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ * Precise simulation with Gazebo. * Wi-Fi and MAVLink support. * Wireless command line interface and analyzing. +* Python library. * Textbook on flight control theory and practice ([in development](https://quadcopter.dev)). * *Position control (using external camera) and autonomous flights¹*. @@ -53,6 +54,7 @@ The simulator is implemented using Gazebo and runs the original Arduino code: * [Building and running the code](docs/build.md). * [Troubleshooting](docs/troubleshooting.md). * [Firmware architecture overview](docs/firmware.md). +* [Python library tutorial](tools/pyflix/README.md). * [Log analysis](docs/log.md). * [User builds gallery](docs/user.md). diff --git a/flix/mavlink.ino b/flix/mavlink.ino index 98315ce..88653ed 100644 --- a/flix/mavlink.ino +++ b/flix/mavlink.ino @@ -37,7 +37,7 @@ void sendMavlink() { mavlink_msg_heartbeat_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, MAV_TYPE_QUADROTOR, MAV_AUTOPILOT_GENERIC, MAV_MODE_FLAG_MANUAL_INPUT_ENABLED | (armed * MAV_MODE_FLAG_SAFETY_ARMED) | ((mode == STAB) * MAV_MODE_FLAG_STABILIZE_ENABLED), - 0, MAV_STATE_STANDBY); + mode, MAV_STATE_STANDBY); sendMessage(&msg); mavlink_msg_extended_sys_state_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, @@ -208,7 +208,9 @@ void sendMavlinkPrint() { strlcpy(data, str + i, sizeof(data)); mavlink_message_t msg; mavlink_msg_serial_control_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, - SERIAL_CONTROL_DEV_SHELL, 0, 0, 0, strlen(data), (uint8_t *)data, 0, 0); + SERIAL_CONTROL_DEV_SHELL, + i + MAVLINK_MSG_SERIAL_CONTROL_FIELD_DATA_LEN < strlen(str) ? SERIAL_CONTROL_FLAG_MULTI : 0, // more chunks to go + 0, 0, strlen(data), (uint8_t *)data, 0, 0); sendMessage(&msg); } mavlinkPrintBuffer.clear(); diff --git a/tools/cli.py b/tools/cli.py new file mode 100755 index 0000000..4559932 --- /dev/null +++ b/tools/cli.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 + +# Remote CLI for Flix + +from pyflix import Flix + +flix = Flix() + +flix.on('print', lambda text: print(text, end='')) + +while True: + command = input() + flix.cli(command, wait_response=False) diff --git a/tools/example.py b/tools/example.py new file mode 100755 index 0000000..909d408 --- /dev/null +++ b/tools/example.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 + +import math +from pyflix import Flix + +print('=== Connect...') +flix = Flix() + +print('Connected:', flix.connected) +print('Mode:', flix.mode) +print('Armed:', flix.armed) +print('Landed:', flix.landed) +print('Rates:', *[f'{math.degrees(r):.0f}°/s' for r in flix.rates]) +print('Attitude:', *[f'{math.degrees(a):.0f}°' for a in flix.attitude_euler]) +print('Motors:', flix.motors) +print('Acc', flix.acc) +print('Gyro', flix.gyro) + +print('=== Execute commands...') +print('> time') +print(flix.cli('time')) +print('> imu') +print(flix.cli('imu')) + +print('=== Get parameter...') +pitch_p = flix.get_param('PITCH_P') +print('PITCH_P = ', pitch_p) + +print('=== Set parameter...') +flix.set_param('PITCH_P', pitch_p) + +print('=== Wait for gyro update...') +print('Gyro: ', flix.wait('gyro')) + +print('=== Wait for HEARTBEAT message...') +print(flix.wait('mavlink.HEARTBEAT')) + +print('=== When until landed = False (remove drone from the surface)') +flix.wait('landed', value=False) diff --git a/tools/log.py b/tools/log.py new file mode 100755 index 0000000..547f48a --- /dev/null +++ b/tools/log.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +# Download flight log remotely and save to file + +import os +import datetime +from pyflix import Flix + +DIR = os.path.dirname(os.path.realpath(__file__)) + +flix = Flix() + +print('Downloading log...') +lines = flix.cli('log').splitlines() + +# sort by timestamp +header = lines.pop(0) +lines.sort(key=lambda line: float(line.split(',')[0])) + +log = open(f'{DIR}/log/{datetime.datetime.now().isoformat()}.csv', 'wb') +content = header.encode() + b'\n' + b'\n'.join(line.encode() for line in lines) +log.write(content) +print(f'Written {os.path.relpath(log.name, os.curdir)}') diff --git a/tools/pyflix/README.md b/tools/pyflix/README.md new file mode 100644 index 0000000..ac06773 --- /dev/null +++ b/tools/pyflix/README.md @@ -0,0 +1,225 @@ +# Flix Python library + +The Flix Python library allows you to remotely connect to a Flix quadcopter. It provides access to telemetry data, supports executing CLI commands, and controlling the drone's flight. + +To use the library, connect to the drone's Wi-Fi. To use it with the simulator, ensure the script runs on the same local network as the simulator. + +## Installation + +If you have cloned the [repo](https://github.com/okalachev/flix), install the library from the repo: + +```bash +cd /path/to/flix/repo +pip install -e tools +``` + +Alternatively, install from pip: + +```bash +pip install pyflix +``` + +## Usage + +The API is accessed through the `Flix` class: + +```python +from flix import Flix +flix = Flix() # create a Flix object and wait for connection +``` + +### Telemetry + +Basic telemetry is available through object properties. The properties names generally match the corresponding variables in the firmware itself: + +```python +print(flix.connected) # True if connected to the drone +print(flix.mode) # current flight mode (str) +print(flix.armed) # True if the drone is armed +print(flix.landed) # True if the drone is landed +print(flix.attitude) # attitude quaternion [w, x, y, z] +print(flix.attitude_euler) # attitude as Euler angles [roll, pitch, yaw] +print(flix.rates) # angular rates [roll_rate, pitch_rate, yaw_rate] +print(flix.channels) # raw RC channels (list) +print(flix.motors) # motors outputs (list) +print(flix.acc) # accelerometer output (list) +print(flix.gyro) # gyroscope output (list) +``` + +> [!NOTE] +> The library uses the Front-Left-Up coordinate system — the same as in the firmware. All angles are in radians. + +### Events + +The Flix object implements the *Observable* pattern, allowing to listen for events. You can subscribe to events using `on` method: + +```python +flix.on('connected', lambda: print('Connected to Flix')) +flix.on('disconnected', lambda: print('Disconnected from Flix')) +flix.on('print', lambda text: print(f'Flix says: {text}')) +``` + +You can also wait for specific events using `wait` method. This method returns the data associated with the event: + +```python +gyro = flix.wait('gyro') # wait for gyroscope update +attitude = flix.wait('attitude', timeout=3) # wait for attitude update, raise TimeoutError after 3 seconds +``` + +The `value` argument specifies a condition for filtering events. It can be either an expected value or a callable: + +```python +flix.wait('armed', value=True) # wait until armed +flix.wait('armed', value=False) # wait until disarmed +flix.wait('motors', value=lambda motors: all(m == 0 for m in motors)) # wait until all motors stop +flix.wait('attitude_euler', value=lambda att: att[0] > 0) # wait until roll angle is positive +``` + +Full list of events: + +|Event|Description|Associated data| +|-----|-----------|----------------| +|`connected`|Connected to the drone|| +|`disconnected`|Connection is lost|| +|`armed`|Armed state update|Armed state (*bool*)| +|`mode`|Flight mode update|Flight mode (*str*)| +|`landed`|Landed state update|Landed state (*bool*)| +|`print`|The drone sends text to the console|Text| +|`attitude`|Attitude update|Attitude quaternion (*list*)| +|`attitude_euler`|Attitude update|Euler angles (*list*)| +|`rates`|Angular rates update|Angular rates (*list*)| +|`channels`|Raw RC channels update|Raw RC channels (*list*)| +|`motors`|Motors outputs update|Motors outputs (*list*)| +|`acc`|Accelerometer update|Accelerometer output (*list*)| +|`gyro`|Gyroscope update|Gyroscope output (*list*)| +|`mavlink`|Received MAVLink message|Message object| +|`mavlink.`|Received specific MAVLink message|Message object| +|`mavlink.`|Received specific MAVLink message|Message object| +|`value`|Named value update (see below)|Name, value| +|`value.`|Specific named value update (see bellow)|Value| + +> [!NOTE] +> Update events trigger on every new data from the drone, and do not mean the value is changed. + +### Common methods + +Get and set firmware parameters using `get_param` and `set_param` methods: + +```python +print(flix.get_param('PITCH_P')) # get parameter value +flix.set_param('PITCH_P', 5) # set parameter value +``` + +Execute CLI commands using `cli` method. This method returns command response: + +```python +imu = flix.cli('imu') # get detailed IMU data +time = flix.cli('time') # get detailed time data +flix.cli('reboot') # reboot the drone +``` + +> [!TIP] +> Use `help` command to get the list of available commands. + +### Automatic flight + +The flight control feature is in development. List of methods intended for automatic flight control: + +* `set_position` +* `set_attitude` +* `set_rates` +* `set_motors` +* `set_controls` +* `set_mode` + +## Usage alongside QGroundControl + +You can use the Flix library alongside the QGroundControl app, using a proxy mode. To do that: + +1. Run proxy for `pyflix` and QGroundControl in background: + + ```bash + flix-proxy + ``` + +2. Go to QGroundControl settings ⇒ *Comm Links*. +3. Add new link with the following settings: + * *Name*: Proxy + * *Automatically Connect on Start*: ✓ + * *Type*: UDP + * *Port*: 14560 +4. Restart QGroundControl. + +Now you can run `pyflix` scripts and QGroundControl simultaneously. + +## Tools + +The following scripts demonstrate how to use the library: + +* [`cli.py`](../cli.py) — remote access to the drone's command line interface. +* [`log.py`](../log.py) — download flight logs from the drone. +* [`example.py`](../example.py) — a simple example, prints telemetry data and waits for events. + +## Advanced usage + +### MAVLink + +You can access the most recently received messages using `messages` property: + +```python +print(flix.messages.get('HEARTBEAT')) # print the latest HEARTBEAT message +``` + +You can wait for a specific message using `wait` method: + +```python +heartbeat = flix.wait('mavlink.HEARTBEAT') +``` + +You can send raw messages using `mavlink` property: + +```python +from pymavlink.dialects.v20 import common as mavlink + +flix.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID, + mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, 0, 0) +``` + +### Named values + +You can pass arbitrary named values from the firmware to the Python script using `NAMED_VALUE_FLOAT`, `NAMED_VALUE_INT`, `DEBUG`, `DEBUG_VECT`, and `DEBUG_FLOAT_ARRAY` MAVLink messages. + +All these named values will appear in the `values` dictionary: + +```python +print(flix.values['some_value']) +print(flix.values['some_vector']) +``` + +You can send values from the firmware like this (`mavlink.ino`): + +```cpp +// Send float named value +mavlink_msg_named_value_float_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, t, "some_value", loopRate); +sendMessage(&msg); + +// Send vector named value +mavlink_msg_debug_vect_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, "some_vector", t, gyroBias.x, gyroBias.y, gyroBias.z); +sendMessage(&msg); +``` + +### Logging + +You can control Flix library verbosity using Python's `logging` module: + +```python +import logging + +logger = logging.getLogger('flix') +logger.setLevel(logging.DEBUG) # be more verbose +logger.setLevel(logging.WARNING) # be less verbose +``` + +## Stability + +The library is in development stage. The API is not stable. diff --git a/tools/pyflix/__init__.py b/tools/pyflix/__init__.py new file mode 100644 index 0000000..67ad6a7 --- /dev/null +++ b/tools/pyflix/__init__.py @@ -0,0 +1 @@ +from .flix import Flix diff --git a/tools/pyflix/flix.py b/tools/pyflix/flix.py new file mode 100644 index 0000000..303090b --- /dev/null +++ b/tools/pyflix/flix.py @@ -0,0 +1,324 @@ +# Copyright (c) 2025 Oleg Kalachev +# Repository: https://github.com/okalachev/flix + +"""Python API for Flix drone.""" + +import os +import time +from queue import Queue, Empty +from typing import Literal, Optional, Callable, List, Dict, Any, Union +import logging +import errno +from threading import Thread, Timer +from pymavlink import mavutil +from pymavlink.quaternion import Quaternion +from pymavlink.dialects.v20 import common as mavlink + +logger = logging.getLogger('flix') +if not logger.hasHandlers(): + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('%(name)s - %(levelname)s - %(message)s')) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + +class Flix: + connected: bool = False + mode: str = '' + armed: bool = False + landed: bool = False + attitude: List[float] + attitude_euler: List[float] # roll, pitch, yaw + rates: List[float] + channels: List[int] + motors: List[float] + acc: List[float] + gyro: List[float] + + mav_id: int + messages: Dict[str, Dict[str, Any]] # MAVLink messages storage + values: Dict[Union[str, int], Union[float, List[float]]] = {} # named values + + _connection_timeout = 3 + _print_buffer: str = '' + + def __init__(self, mav_id: int=1, wait_connection: bool=True): + if not (0 <= mav_id < 256): + raise ValueError('mav_id must be in range [0, 255]') + self._setup_mavlink() + self.mav_id = mav_id + self._init_state() + try: + # Direct connection + logger.debug('Listening on port 14550') + self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14550', source_system=255) # type: ignore + except OSError as e: + if e.errno != errno.EADDRINUSE: + raise + # Port busy - using proxy + logger.debug('Listening on port 14560 (proxy)') + self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14555', source_system=254) # type: ignore + self.connection.target_system = mav_id + self.mavlink: mavlink.MAVLink = self.connection.mav + self._event_listeners: Dict[str, List[Callable[..., Any]]] = {} + self.messages = {} + self._disconnected_timer = Timer(0, self._disconnected) + self._reader_thread = Thread(target=self._read_mavlink, daemon=True) + self._reader_thread.start() + self._heartbeat_thread = Thread(target=self._send_heartbeat, daemon=True) + self._heartbeat_thread.start() + if wait_connection: + self.wait('mavlink.HEARTBEAT') + time.sleep(0.2) # give some time to receive initial state + + def _init_state(self): + self.attitude = [1, 0, 0, 0] + self.attitude_euler = [0, 0, 0] + self.rates = [0, 0, 0] + self.channels = [0, 0, 0, 0, 0, 0, 0, 0] + self.motors = [0, 0, 0, 0] + self.acc = [0, 0, 0] + self.gyro = [0, 0, 0] + + def on(self, event: str, callback: Callable): + event = event.lower() + if event not in self._event_listeners: + self._event_listeners[event] = [] + self._event_listeners[event].append(callback) + + def off(self, callback: Callable): + for event in self._event_listeners: + if callback in self._event_listeners[event]: + self._event_listeners[event].remove(callback) + + def _trigger(self, event: str, *args): + event = event.lower() + for callback in self._event_listeners.get(event, []): + try: + callback(*args) + except Exception as e: + logger.error(f'Error in event listener for event {event}: {e}') + + def wait(self, event: str, value: Union[Any, Callable[..., bool]] = lambda *args: True, timeout=None) -> Any: + """Wait for an event""" + event = event.lower() + q = Queue() + def callback(*args): + if len(args) == 0: + result = None + elif len(args) == 1: + result = args[0] + else: + result = args + if callable(value) and value(*args): + q.put_nowait(result) + elif value == result: + q.put_nowait(result) + self.on(event, callback) + try: + return q.get(timeout=timeout) + except Empty: + raise TimeoutError + finally: + self.off(callback) + + @staticmethod + def _setup_mavlink(): + # otherwise it will use MAVLink 1.0 until connected + os.environ['MAVLINK20'] = '1' + mavutil.set_dialect('common') + + def _read_mavlink(self): + while True: + try: + msg: Optional[mavlink.MAVLink_message] = self.connection.recv_match(blocking=True) + if msg is None: + continue + self._connected() + msg_dict = msg.to_dict() + msg_dict['_timestamp'] = time.time() # add timestamp + self.messages[msg.get_type()] = msg_dict + self._trigger('mavlink', msg) + self._trigger(f'mavlink.{msg.get_type()}', msg) # trigger mavlink. + self._trigger(f'mavlink.{msg.get_msgId()}', msg) # trigger mavlink. + self._handle_mavlink_message(msg) + + except Exception as e: + logger.error(f'Error reading MAVLink message: {e}') + + def _handle_mavlink_message(self, msg: mavlink.MAVLink_message): + if isinstance(msg, mavlink.MAVLink_heartbeat_message): + self.mode = ['MANUAL', 'ACRO', 'STAB', 'USER'][msg.custom_mode] + self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0 + self._trigger('mode', self.mode) + self._trigger('armed', self.armed) + + if isinstance(msg, mavlink.MAVLink_extended_sys_state_message): + self.landed = msg.landed_state == mavlink.MAV_LANDED_STATE_ON_GROUND + self._trigger('landed', self.landed) + + if isinstance(msg, mavlink.MAVLink_attitude_quaternion_message): + self.attitude = self._mavlink_to_flu([msg.q1, msg.q2, msg.q3, msg.q4]) + self.rates = self._mavlink_to_flu([msg.rollspeed, msg.pitchspeed, msg.yawspeed]) + self.attitude_euler = list(Quaternion(self.attitude).euler) # type: ignore + self._trigger('attitude', self.attitude) + self._trigger('attitude_euler', self.attitude_euler) + + if isinstance(msg, mavlink.MAVLink_rc_channels_raw_message): + self.channels = [msg.chan1_raw, msg.chan2_raw, msg.chan3_raw, msg.chan4_raw, + msg.chan5_raw, msg.chan6_raw, msg.chan7_raw, msg.chan8_raw] + self._trigger('channels', self.channels) + + if isinstance(msg, mavlink.MAVLink_actuator_output_status_message): + self.motors = msg.actuator[:4] # type: ignore + self._trigger('motors', self.motors) + + if isinstance(msg, mavlink.MAVLink_scaled_imu_message): + self.acc = self._mavlink_to_flu([msg.xacc / 1000, msg.yacc / 1000, msg.zacc / 1000]) + self.gyro = self._mavlink_to_flu([msg.xgyro / 1000, msg.ygyro / 1000, msg.zgyro / 1000]) + self._trigger('acc', self.acc) + self._trigger('gyro', self.gyro) + + if isinstance(msg, mavlink.MAVLink_serial_control_message): + # new chunk of data + text = bytes(msg.data)[:msg.count].decode('utf-8', errors='ignore') + logger.debug(f'Console: {repr(text)}') + self._trigger('print', text) + self._print_buffer += text + if msg.flags & mavlink.SERIAL_CONTROL_FLAG_MULTI == 0: + # last chunk + self._trigger('print_full', self._print_buffer) + self._print_buffer = '' + + if isinstance(msg, mavlink.MAVLink_statustext_message): + logger.info(f'Flix #{msg.get_srcSystem()}: {msg.text}') + self._trigger('status', msg.text) + + if isinstance(msg, (mavlink.MAVLink_named_value_float_message, mavlink.MAVLink_named_value_int_message)): + self.values[msg.name] = msg.value + self._trigger('value', msg.name, msg.value) + self._trigger(f'value.{msg.name}', msg.value) + + if isinstance(msg, mavlink.MAVLink_debug_message): + self.values[msg.ind] = msg.value + self._trigger('value', msg.ind, msg.value) + self._trigger(f'value.{msg.ind}', msg.value) + + if isinstance(msg, mavlink.MAVLink_debug_vect_message): + self.values[msg.name] = [msg.x, msg.y, msg.z] + self._trigger('value', msg.name, self.values[msg.name]) + self._trigger(f'value.{msg.name}', self.values[msg.name]) + + if isinstance(msg, mavlink.MAVLink_debug_float_array_message): + self.values[msg.name] = list(msg.data) + self._trigger('value', msg.name, self.values[msg.name]) + self._trigger(f'value.{msg.name}', self.values[msg.name]) + + def _send_heartbeat(self): + while True: + self.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0) + time.sleep(1) + + @staticmethod + def _mavlink_to_flu(v: List[float]) -> List[float]: + if len(v) == 3: # vector + return [v[0], -v[1], -v[2]] + elif len(v) == 4: # quaternion + return [v[0], v[1], -v[2], -v[3]] + else: + raise ValueError(f'List must have 3 (vector) or 4 (quaternion) elements') + + @staticmethod + def _flu_to_mavlink(v: List[float]) -> List[float]: + return Flix._mavlink_to_flu(v) + + def _connected(self): + # Reset disconnection timer + self._disconnected_timer.cancel() + self._disconnected_timer = Timer(self._connection_timeout, self._disconnected) + self._disconnected_timer.start() + + if not self.connected: + logger.info('Connection is established') + self.connected = True + self._trigger('connected') + + def _disconnected(self): + logger.info('Connection is lost') + self.connected = False + self._trigger('disconnected') + + def get_param(self, name: str) -> float: + if len(name.encode('ascii')) > 16: + raise ValueError('Parameter name must be 16 characters or less') + for attempt in range(3): + try: + logger.debug(f'Get param {name} (attempt #{attempt + 1})') + self.mavlink.param_request_read_send(self.mav_id, 0, name.encode('ascii'), -1) + msg: mavlink.MAVLink_param_value_message = \ + self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1) + return msg.param_value + except TimeoutError: + continue + raise RuntimeError(f'Failed to get parameter {name} after 3 attempts') + + def set_param(self, name: str, value: float): + if len(name.encode('ascii')) > 16: + raise ValueError('Parameter name must be 16 characters or less') + for attempt in range(3): + try: + logger.debug(f'Set param {name} to {value} (attempt #{attempt + 1})') + self.mavlink.param_set_send(self.mav_id, 0, name.encode('ascii'), value, mavlink.MAV_PARAM_TYPE_REAL32) + self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1) + return + except TimeoutError: + # on timeout try again + continue + raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts') + + def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1): + raise NotImplementedError('Position control is not implemented yet') + + def set_attitude(self, attitude: List[float], thrust: float): + raise NotImplementedError('Automatic flight is not implemented yet') + + def set_rates(self, rates: List[float], thrust: float): + raise NotImplementedError('Automatic flight is not implemented yet') + + def set_motors(self, motors: List[float]): + if len(motors) != 4: + raise ValueError('motors must have 4 values') + if not all(0 <= m <= 1 for m in motors): + raise ValueError('motors must be in range [0, 1]') + raise NotImplementedError + + def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float): + """Send pilot's controls. Warning: not intended for automatic control""" + if not (-1 <= roll <= 1 and -1 <= pitch <= 1 and -1 <= yaw <= 1): + raise ValueError('roll, pitch, yaw must be in range [-1, 1]') + if not 0 <= throttle <= 1: + raise ValueError('throttle must be in range [0, 1]') + self.mavlink.manual_control_send(self.mav_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore + + def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'USER']): + raise NotImplementedError('Setting mode is not implemented yet') + + def cli(self, cmd: str, wait_response: bool = True) -> str: + cmd = cmd.strip() + if cmd == 'reboot': + wait_response = False # reboot command doesn't respond + cmd_bytes = (cmd + '\n').encode('utf-8') + if len(cmd_bytes) > 70: + raise ValueError(f'Command is too long: {len(cmd_bytes)} > 70') + cmd_bytes = cmd_bytes.ljust(70, b'\0') + response_prefix = f'> {cmd}\n' + for attempt in range(3): + logger.debug(f'Send command {cmd} (attempt #{attempt + 1})') + try: + self.mavlink.serial_control_send(0, 0, 0, 0, len(cmd_bytes), cmd_bytes) + if not wait_response: + return '' + response = self.wait('print_full', timeout=0.1, value=lambda text: text.startswith(response_prefix)) + return response[len(response_prefix):].strip() + except TimeoutError: + continue + raise RuntimeError(f'Failed to send command {cmd} after 3 attempts') diff --git a/tools/pyflix/proxy.py b/tools/pyflix/proxy.py new file mode 100755 index 0000000..dbd1581 --- /dev/null +++ b/tools/pyflix/proxy.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 + +"""Proxy for running pyflix library alongside QGroundControl app.""" + +import socket + +LOCAL = ('0.0.0.0', 14550) # from Flix +TARGETS = ( + ('127.0.0.1', 14560), # to QGroundControl + ('127.0.0.1', 14555), # to pyflix +) + +def main(): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind(LOCAL) + + source_addr = None + packets = 0 + + print('Proxy started - run QGroundControl') + + while True: + data, addr = sock.recvfrom(1024) # read entire UDP packet + if addr in TARGETS: # packet from target + if source_addr is None: + continue + sock.sendto(data, source_addr) + else: # packet from source + source_addr = addr + for target in TARGETS: + sock.sendto(data, target) + + packets += 1 + print(f'\rPackets: {packets}', end='') + +if __name__ == '__main__': + main() diff --git a/tools/pyproject.toml b/tools/pyproject.toml new file mode 100644 index 0000000..29a8d37 --- /dev/null +++ b/tools/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "pyflix" +version = "0.5" +description = "Python API for Flix drone" +authors = [{ name="Oleg Kalachev", email="okalachev@gmail.com" }] +license = "MIT" +readme = "pyflix/README.md" +requires-python = ">=3.8" +dependencies = [ + "pymavlink", +] + +[project.scripts] +flix-proxy = "pyflix.proxy:main" + +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[tool.setuptools] +packages = ["pyflix"] + +[tool.setuptools.package-data] +pyflix = ["README.md"] + +[project.urls] +Homepage = "https://github.com/okalachev/flix/tree/master/tools/pyflix" +Issues = "https://github.com/okalachev/flix/issues"