mirror of
https://github.com/okalachev/flix.git
synced 2025-07-28 20:08:53 +00:00
Add pyflix library
This commit is contained in:
parent
779fa13e80
commit
570dc6eac9
15
.github/workflows/tools.yml
vendored
15
.github/workflows/tools.yml
vendored
@ -19,6 +19,21 @@ jobs:
|
||||
echo -e "t,x,y,z\n0,1,2,3\n1,4,5,6" > log.csv
|
||||
./csv_to_ulog log.csv
|
||||
test $(stat -c %s log.ulg) -eq 196
|
||||
pyflix:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Install Python build tools
|
||||
run: pip install build
|
||||
- name: Build pyflix
|
||||
run: python3 -m build tools
|
||||
- name: Upload artifacts
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: pyflix
|
||||
path: |
|
||||
tools/dist/pyflix-*.tar.gz
|
||||
tools/dist/pyflix-*.whl
|
||||
python_tools:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
|
13
tools/cli.py
Executable file
13
tools/cli.py
Executable file
@ -0,0 +1,13 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# Remote CLI for Flix
|
||||
|
||||
from pyflix import Flix
|
||||
|
||||
flix = Flix()
|
||||
|
||||
flix.on('print', lambda text: print(text, end=''))
|
||||
|
||||
while True:
|
||||
command = input()
|
||||
flix.cli(command, wait_response=False)
|
39
tools/example.py
Executable file
39
tools/example.py
Executable file
@ -0,0 +1,39 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import math
|
||||
from pyflix import Flix
|
||||
|
||||
print('=== Connect...')
|
||||
flix = Flix()
|
||||
|
||||
print('Connected:', flix.connected)
|
||||
print('Mode:', flix.mode)
|
||||
print('Armed:', flix.armed)
|
||||
print('Landed:', flix.landed)
|
||||
print('Rates:', *[f'{math.degrees(r):.0f}°/s' for r in flix.rates])
|
||||
print('Attitude:', *[f'{math.degrees(a):.0f}°' for a in flix.attitude_euler])
|
||||
print('Motors:', flix.motors)
|
||||
print('Acc', flix.acc)
|
||||
print('Gyro', flix.gyro)
|
||||
|
||||
print('=== Execute commands...')
|
||||
print('> time')
|
||||
print(flix.cli('time'))
|
||||
print('> imu')
|
||||
print(flix.cli('imu'))
|
||||
|
||||
print('=== Get parameter...')
|
||||
pitch_p = flix.get_param('PITCH_P')
|
||||
print('PITCH_P = ', pitch_p)
|
||||
|
||||
print('=== Set parameter...')
|
||||
flix.set_param('PITCH_P', pitch_p)
|
||||
|
||||
print('=== Wait for gyro update...')
|
||||
print('Gyro: ', flix.wait('gyro'))
|
||||
|
||||
print('=== Wait for HEARTBEAT message...')
|
||||
print(flix.wait('mavlink.HEARTBEAT'))
|
||||
|
||||
print('=== When until landed = False (remove drone from the surface)')
|
||||
flix.wait('landed', value=False)
|
196
tools/pyflix/README.md
Normal file
196
tools/pyflix/README.md
Normal file
@ -0,0 +1,196 @@
|
||||
# Flix Python library
|
||||
|
||||
The Flix Python library allows you to remotely connect to a Flix quadcopter from a Python script. It provides access to telemetry data, supports executing CLI commands, and controlling the drone's flight.
|
||||
|
||||
To use the library, connect to the drone's Wi-Fi. To use it with the simulator, ensure the script runs on the same local network as the simulator.
|
||||
|
||||
You also can run the script alongside the QGroundControl app. To do this, go to QGroundControl settings ⇒ *Telemetry* (*MAVLink* in older versions), and enable *MAVLink forwarding*. Set the host to `localhost:14445. The Flix library automatically detects the connection type (direct or forwarded).
|
||||
|
||||
## Installation
|
||||
|
||||
If you have cloned the repo, install the library from the repo:
|
||||
|
||||
```bash
|
||||
cd /path/to/flix/repo
|
||||
pip install -e tools/pkg
|
||||
```
|
||||
|
||||
Alternatively, install from pip:
|
||||
|
||||
```bash
|
||||
pip install pyflix
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
The API is accessed through the `Flix` class:
|
||||
|
||||
```python
|
||||
from flix import Flix
|
||||
flix = Flix() # create a Flix object and wait for connection
|
||||
```
|
||||
|
||||
### Telemetry
|
||||
|
||||
Basic telemetry is available through object properties. Their names generally match the corresponding variables in the firmware itself:
|
||||
|
||||
```python
|
||||
print(flix.connected) # True if connected to the drone
|
||||
print(flix.mode) # current flight mode (str)
|
||||
print(flix.armed) # True if the drone is armed
|
||||
print(flix.landed) # True if the drone is landed
|
||||
print(flix.attitude) # attitude quaternion [w, x, y, z]
|
||||
print(flix.attitude_euler) # attitude as Euler angles [roll, pitch, yaw]
|
||||
print(flix.rates) # angular rates [roll_rate, pitch_rate, yaw_rate]
|
||||
print(flix.channels) # raw RC channels (list)
|
||||
print(flix.motors) # motors outputs (list)
|
||||
print(flix.acc) # accelerometer output (list)
|
||||
print(flix.gyro) # gyroscope output (list)
|
||||
```
|
||||
|
||||
> [!NOTE]
|
||||
> The library uses the Front-Left-Up coordinate system — the same as in the firmware. All angles are in radians.
|
||||
|
||||
### Events
|
||||
|
||||
The Flix object implements the *Observable* pattern, allowing to listen for events. You can subscribe to events using `on` method:
|
||||
|
||||
```python
|
||||
flix.on('connected', lambda: print('Connected to Flix'))
|
||||
flix.on('disconnected', lambda: print('Disconnected from Flix'))
|
||||
flix.on('print', lambda text: print(f'Flix says: {text}'))
|
||||
```
|
||||
|
||||
You can also wait for specific events using `wait` method. This method returns the data associated with the event:
|
||||
|
||||
```python
|
||||
gyro = flix.wait('gyro') # wait for gyroscope update
|
||||
attitude = flix.wait('attitude', timeout=3) # wait for attitude update, raise TimeoutError after 3 seconds
|
||||
```
|
||||
|
||||
The `value` argument specifies a condition for filtering events. It can be either an expected value or a callable:
|
||||
|
||||
```python
|
||||
flix.wait('armed', value=True) # wait until armed
|
||||
flix.wait('armed', value=False) # wait until disarmed
|
||||
flix.wait('motors', value=lambda motors: all(m == 0 for m in motors)) # wait until all motors stop
|
||||
flix.wait('attitude_euler', value=lambda att: att[0] > 0) # wait until roll angle is positive
|
||||
```
|
||||
|
||||
Full list of events:
|
||||
|
||||
|Event|Description|Associated data|
|
||||
|-----|-----------|----------------|
|
||||
|`connected`|Connected to the drone||
|
||||
|`disconnected`|Connection is lost||
|
||||
|`armed`|Armed state update|Armed state (*bool*)|
|
||||
|`mode`|Flight mode update|Flight mode (*str*)|
|
||||
|`landed`|Landed state update|Landed state (*bool*)|
|
||||
|`print`|The drone sends text to the console|Text|
|
||||
|`attitude`|Attitude update|Attitude quaternion (*list*)|
|
||||
|`attitude_euler`|Attitude update|Euler angles (*list*)|
|
||||
|`rates`|Angular rates update|Angular rates (*list*)|
|
||||
|`channels`|Raw RC channels update|Raw RC channels (*list*)|
|
||||
|`motors`|Motors outputs update|Motors outputs (*list*)|
|
||||
|`acc`|Accelerometer update|Accelerometer output (*list*)|
|
||||
|`gyro`|Gyroscope update|Gyroscope output (*list*)|
|
||||
|`mavlink`|Received MAVLink message|Message object|
|
||||
|`mavlink.<message_name>`|Received specific MAVLink message|Message object|
|
||||
|`mavlink.<message_id>`|Received specific MAVLink message|Message object|
|
||||
|`value`|Named value update (see below)|Name, value|
|
||||
|`value.<name>`|Specific named value update (see bellow)|Value|
|
||||
|
||||
> [!NOTE]
|
||||
> Update events trigger on every new data from the drone, and do not mean the value is changed.
|
||||
|
||||
### Common methods
|
||||
|
||||
Get and set firmware parameters using `get_param` and `set_param` methods:
|
||||
|
||||
```python
|
||||
print(flix.get_param('PITCH_P')) # get parameter value
|
||||
flix.set_param('PITCH_P', 5) # set parameter value
|
||||
```
|
||||
|
||||
Execute CLI commands using `cli` method. This method returns command response:
|
||||
|
||||
```python
|
||||
imu = flix.cli('imu') # get detailed IMU data
|
||||
time = flix.cli('time') # get detailed time data
|
||||
flix.cli('reboot') # reboot the drone
|
||||
```
|
||||
|
||||
> [!TIP]
|
||||
> Use `help` command to get the list of available commands.
|
||||
|
||||
### Automatic flight
|
||||
|
||||
The flight control feature is in development. List of methods intended for automatic flight control:
|
||||
|
||||
* `set_position`
|
||||
* `set_attitude`
|
||||
* `set_rates`
|
||||
* `set_motors`
|
||||
* `set_controls`
|
||||
* `set_mode`
|
||||
|
||||
## Tools
|
||||
|
||||
The following scripts demonstrate how to use the library:
|
||||
|
||||
* [`cli.py`](../cli.py) — remote access to the drone's command line interface.
|
||||
* `log.py` — download flight logs from the drone.
|
||||
* [`example.py`](../example.py) — a simple example, prints telemetry data and waits for events.
|
||||
* `flight.py` — a simple flight script example.
|
||||
|
||||
## Advanced usage
|
||||
|
||||
### MAVLink
|
||||
|
||||
You can access the most recently received messages using `messages` property:
|
||||
|
||||
```python
|
||||
print(flix.messages.get('HEARTBEAT')) # print the latest HEARTBEAT message
|
||||
```
|
||||
|
||||
You can wait for a specific message using `wait` method:
|
||||
|
||||
```python
|
||||
heartbeat = flix.wait('mavlink.HEARTBEAT')
|
||||
```
|
||||
|
||||
You can send raw messages using `mavlink` property:
|
||||
|
||||
```python
|
||||
from pymavlink.dialects.v20 import common as mavlink
|
||||
|
||||
flix.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID,
|
||||
0, mavlink.MAV_MODE_FLAG_CUSTOM_MODE_ENABLED, 0, 0)
|
||||
```
|
||||
|
||||
### Named values
|
||||
|
||||
You can pass arbitrary named values from the firmware to the Python script using `NAMED_VALUE_FLOAT`, `NAMED_VALUE_INT`, `DEBUG`, `DEBUG_VECT`, and `DEBUG_FLOAT_ARRAY` MAVLink messages.
|
||||
|
||||
All these named values will appear in the `values` dictionary:
|
||||
|
||||
```python
|
||||
print(flix.values['some_value'])
|
||||
print(flix.values['some_vector'])
|
||||
```
|
||||
|
||||
You can send values from the firmware like this (`mavlink.ino`):
|
||||
|
||||
```cpp
|
||||
// Send float named value
|
||||
mavlink_msg_named_value_float_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, t, "some_value", loopRate);
|
||||
sendMessage(&msg);
|
||||
|
||||
// Send vector named value
|
||||
mavlink_msg_debug_vect_pack(SYSTEM_ID, MAV_COMP_ID_AUTOPILOT1, &msg, "some_vector", t, gyroBias.x, gyroBias.y, gyroBias.z);
|
||||
sendMessage(&msg);
|
||||
```
|
||||
|
||||
## Stability
|
||||
|
||||
The library is in development stage. The API is not stable.
|
1
tools/pyflix/__init__.py
Normal file
1
tools/pyflix/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .flix import Flix, logger
|
314
tools/pyflix/flix.py
Normal file
314
tools/pyflix/flix.py
Normal file
@ -0,0 +1,314 @@
|
||||
# Copyright (c) 2025 Oleg Kalachev <okalachev@gmail.com>
|
||||
# Repository: https://github.com/okalachev/flix
|
||||
|
||||
"""Python API for Flix drone."""
|
||||
|
||||
import os
|
||||
import time
|
||||
from queue import Queue, Empty
|
||||
from typing import Literal, Optional, Callable, List, Any, Union
|
||||
import logging
|
||||
import errno
|
||||
from threading import Thread, Timer
|
||||
from pymavlink import mavutil
|
||||
from pymavlink.quaternion import Quaternion
|
||||
from pymavlink.dialects.v20 import common as mavlink
|
||||
|
||||
logger = logging.getLogger('flix')
|
||||
if not logger.hasHandlers():
|
||||
handler = logging.StreamHandler()
|
||||
handler.setFormatter(logging.Formatter('%(name)s - %(levelname)s - %(message)s'))
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
class Flix:
|
||||
connected: bool = False
|
||||
mode: str = ''
|
||||
armed: bool = False
|
||||
landed: bool = False
|
||||
attitude: List[float] = [1, 0, 0, 0]
|
||||
attitude_euler: List[float] = [0, 0, 0] # roll, pitch, yaw
|
||||
rates: List[float] = [0, 0, 0]
|
||||
channels: List[int] = [0, 0, 0, 0, 0, 0, 0, 0]
|
||||
motors: List[float] = [0, 0, 0, 0]
|
||||
acc: List[float] = [0, 0, 0]
|
||||
gyro: List[float] = [0, 0, 0]
|
||||
|
||||
mav_id: int
|
||||
messages: dict[str, dict[str, Any]] # MAVLink messages storage
|
||||
values: dict[Union[str, int], Union[float, List[float]]] = {} # debug values
|
||||
|
||||
_connection_timeout = 3
|
||||
_print_buffer: str = ''
|
||||
|
||||
def __init__(self, mav_id: int=1, wait_connection: bool=True):
|
||||
if not (0 <= mav_id < 256):
|
||||
raise ValueError('mav_id must be in range [0, 255]')
|
||||
self._setup_mavlink()
|
||||
self.mav_id = mav_id
|
||||
try:
|
||||
# Direct connection
|
||||
logger.debug('Listening on port 14550')
|
||||
self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14550', source_system=255) # type: ignore
|
||||
except OSError as e:
|
||||
if e.errno != errno.EADDRINUSE:
|
||||
raise
|
||||
# Port busy with QGC - using forwarding
|
||||
logger.debug('Listening on port 14445 (QGC forwarding)')
|
||||
self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14445', source_system=254) # type: ignore
|
||||
self.connection.target_system = mav_id
|
||||
self.mavlink: mavlink.MAVLink = self.connection.mav
|
||||
self._event_listeners: dict[str, List[Callable[..., Any]]] = {}
|
||||
self.messages = {}
|
||||
self._reader_thread = Thread(target=self._read_mavlink, daemon=True)
|
||||
self._reader_thread.start()
|
||||
self._heartbeat_thread = Thread(target=self._send_heartbeat, daemon=True)
|
||||
self._heartbeat_thread.start()
|
||||
self._disconnected_timer = Timer(0, self._disconnected)
|
||||
if wait_connection:
|
||||
self.wait('mavlink.HEARTBEAT')
|
||||
time.sleep(0.2) # give some time to receive initial state
|
||||
|
||||
def on(self, event: str, callback: Callable):
|
||||
event = event.lower()
|
||||
if event not in self._event_listeners:
|
||||
self._event_listeners[event] = []
|
||||
self._event_listeners[event].append(callback)
|
||||
|
||||
def off(self, callback: Callable):
|
||||
for event in self._event_listeners:
|
||||
if callback in self._event_listeners[event]:
|
||||
self._event_listeners[event].remove(callback)
|
||||
|
||||
def _trigger(self, event: str, *args):
|
||||
event = event.lower()
|
||||
for callback in self._event_listeners.get(event, []):
|
||||
try:
|
||||
callback(*args)
|
||||
except Exception as e:
|
||||
logger.error(f'Error in event listener for event {event}: {e}')
|
||||
|
||||
def wait(self, event: str, value: Union[Any, Callable[..., bool]] = lambda *args: True, timeout=None) -> Any:
|
||||
"""Wait for an event"""
|
||||
event = event.lower()
|
||||
q = Queue()
|
||||
def callback(*args):
|
||||
if len(args) == 0:
|
||||
result = None
|
||||
elif len(args) == 1:
|
||||
result = args[0]
|
||||
else:
|
||||
result = args
|
||||
if callable(value) and value(*args):
|
||||
q.put_nowait(result)
|
||||
elif value == result:
|
||||
q.put_nowait(result)
|
||||
self.on(event, callback)
|
||||
try:
|
||||
return q.get(timeout=timeout)
|
||||
except Empty:
|
||||
raise TimeoutError
|
||||
finally:
|
||||
self.off(callback)
|
||||
|
||||
@staticmethod
|
||||
def _setup_mavlink():
|
||||
# otherwise it will use MAVLink 1.0 until connected
|
||||
os.environ['MAVLINK20'] = '1'
|
||||
mavutil.set_dialect('common')
|
||||
|
||||
def _read_mavlink(self):
|
||||
while True:
|
||||
try:
|
||||
msg: Optional[mavlink.MAVLink_message] = self.connection.recv_match(blocking=True)
|
||||
if msg is None:
|
||||
continue
|
||||
self._connected()
|
||||
msg_dict = msg.to_dict()
|
||||
msg_dict['_timestamp'] = time.time() # add timestamp
|
||||
self.messages[msg.get_type()] = msg_dict
|
||||
self._trigger('mavlink', msg)
|
||||
self._trigger(f'mavlink.{msg.get_type()}', msg) # trigger mavlink.<message_type>
|
||||
self._trigger(f'mavlink.{msg.get_msgId()}', msg) # trigger mavlink.<message_id>
|
||||
self._handle_mavlink_message(msg)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Error reading MAVLink message: {e}')
|
||||
|
||||
def _handle_mavlink_message(self, msg: mavlink.MAVLink_message):
|
||||
if isinstance(msg, mavlink.MAVLink_heartbeat_message):
|
||||
self.mode = ['MANUAL', 'ACRO', 'STAB', 'AUTO', 'USER'][msg.custom_mode]
|
||||
self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0
|
||||
self._trigger('mode', self.mode)
|
||||
self._trigger('armed', self.armed)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_extended_sys_state_message):
|
||||
self.landed = msg.landed_state == mavlink.MAV_LANDED_STATE_ON_GROUND
|
||||
self._trigger('landed', self.landed)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_attitude_quaternion_message):
|
||||
self.attitude = self._mavlink_to_flu([msg.q1, msg.q2, msg.q3, msg.q4])
|
||||
self.rates = self._mavlink_to_flu([msg.rollspeed, msg.pitchspeed, msg.yawspeed])
|
||||
self.attitude_euler = list(Quaternion(self.attitude).euler) # type: ignore
|
||||
self._trigger('attitude', self.attitude)
|
||||
self._trigger('attitude_euler', self.attitude_euler)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_rc_channels_raw_message):
|
||||
self.channels = [msg.chan1_raw, msg.chan2_raw, msg.chan3_raw, msg.chan4_raw,
|
||||
msg.chan5_raw, msg.chan6_raw, msg.chan7_raw, msg.chan8_raw]
|
||||
self._trigger('channels', self.channels)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_actuator_output_status_message):
|
||||
self.motors = msg.actuator[:4] # type: ignore
|
||||
self._trigger('motors', self.motors)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_scaled_imu_message):
|
||||
self.acc = self._mavlink_to_flu([msg.xacc / 1000, msg.yacc / 1000, msg.zacc / 1000])
|
||||
self.gyro = self._mavlink_to_flu([msg.xgyro / 1000, msg.ygyro / 1000, msg.zgyro / 1000])
|
||||
self._trigger('acc', self.acc)
|
||||
self._trigger('gyro', self.gyro)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_serial_control_message):
|
||||
# new chunk of data
|
||||
text = bytes(msg.data)[:msg.count].decode('utf-8', errors='ignore')
|
||||
self._trigger('print', text)
|
||||
self._print_buffer += text
|
||||
if msg.flags & mavlink.SERIAL_CONTROL_FLAG_MULTI == 0:
|
||||
# last chunk
|
||||
self._trigger('print_full', self._print_buffer)
|
||||
self._print_buffer = ''
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_statustext_message):
|
||||
logger.info(f'Flix #{msg.get_srcSystem()}: {msg.text}')
|
||||
self._trigger('status', msg.text)
|
||||
|
||||
if isinstance(msg, (mavlink.MAVLink_named_value_float_message, mavlink.MAVLink_named_value_int_message)):
|
||||
self.values[msg.name] = msg.value
|
||||
self._trigger('value', msg.name, msg.value)
|
||||
self._trigger(f'value.{msg.name}', msg.value)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_debug_message):
|
||||
self.values[msg.ind] = msg.value
|
||||
self._trigger('value', msg.ind, msg.value)
|
||||
self._trigger(f'value.{msg.ind}', msg.value)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_debug_vect_message):
|
||||
self.values[msg.name] = [msg.x, msg.y, msg.z]
|
||||
self._trigger('value', msg.name, self.values[msg.name])
|
||||
self._trigger(f'value.{msg.name}', self.values[msg.name])
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_debug_float_array_message):
|
||||
self.values[msg.name] = list(msg.data)
|
||||
self._trigger('value', msg.name, self.values[msg.name])
|
||||
self._trigger(f'value.{msg.name}', self.values[msg.name])
|
||||
|
||||
def _send_heartbeat(self):
|
||||
while True:
|
||||
self.mavlink.heartbeat_send(mavlink.MAV_TYPE_GCS, mavlink.MAV_AUTOPILOT_INVALID, 0, 0, 0)
|
||||
time.sleep(1)
|
||||
|
||||
@staticmethod
|
||||
def _mavlink_to_flu(v: List[float]) -> List[float]:
|
||||
if len(v) == 3: # vector
|
||||
return [v[0], -v[1], -v[2]]
|
||||
elif len(v) == 4: # quaternion
|
||||
return [v[0], v[1], -v[2], -v[3]]
|
||||
else:
|
||||
raise ValueError(f'List must have 3 (vector) or 4 (quaternion) elements')
|
||||
|
||||
@staticmethod
|
||||
def _flu_to_mavlink(v: List[float]) -> List[float]:
|
||||
return Flix._mavlink_to_flu(v)
|
||||
|
||||
def _connected(self):
|
||||
# Reset disconnection timer
|
||||
self._disconnected_timer.cancel()
|
||||
self._disconnected_timer = Timer(self._connection_timeout, self._disconnected)
|
||||
self._disconnected_timer.start()
|
||||
|
||||
if not self.connected:
|
||||
logger.info('Connection is established')
|
||||
self.connected = True
|
||||
self._trigger('connected')
|
||||
|
||||
def _disconnected(self):
|
||||
logger.info('Connection is lost')
|
||||
self.connected = False
|
||||
self._trigger('disconnected')
|
||||
|
||||
def get_param(self, name: str) -> float:
|
||||
if len(name.encode('ascii')) > 16:
|
||||
raise ValueError('Parameter name must be 16 characters or less')
|
||||
for attempt in range(3):
|
||||
try:
|
||||
self.mavlink.param_request_read_send(self.mav_id, 0, name.encode('ascii'), -1)
|
||||
msg: mavlink.MAVLink_param_value_message = \
|
||||
self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1)
|
||||
return msg.param_value
|
||||
except TimeoutError:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to get parameter {name} after 3 attempts')
|
||||
|
||||
def set_param(self, name: str, value: float):
|
||||
if len(name.encode('ascii')) > 16:
|
||||
raise ValueError('Parameter name must be 16 characters or less')
|
||||
for attempt in range(3):
|
||||
try:
|
||||
self.mavlink.param_set_send(self.mav_id, 0, name.encode('ascii'), value, mavlink.MAV_PARAM_TYPE_REAL32)
|
||||
self.wait('mavlink.PARAM_VALUE', value=lambda msg: msg.param_id == name, timeout=0.1)
|
||||
return
|
||||
except TimeoutError:
|
||||
# on timeout try again
|
||||
continue
|
||||
raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts')
|
||||
|
||||
def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1):
|
||||
raise NotImplementedError('Position control is not implemented yet')
|
||||
|
||||
def set_attitude(self, attitude: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
|
||||
def set_rates(self, rates: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
|
||||
def set_motors(self, motors: list[float]):
|
||||
if len(motors) != 4:
|
||||
raise ValueError('motors must have 4 values')
|
||||
if not all(0 <= m <= 1 for m in motors):
|
||||
raise ValueError('motors must be in range [0, 1]')
|
||||
raise NotImplementedError
|
||||
|
||||
def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float):
|
||||
"""Send pilot's controls. Warning: not intended for automatic control"""
|
||||
if -1 <= roll <= 1 and -1 <= pitch <= 1 and -1 <= yaw <= 1:
|
||||
raise ValueError('roll, pitch, yaw must be in range [-1, 1]')
|
||||
if not 0 <= throttle <= 1:
|
||||
raise ValueError('throttle must be in range [0, 1]')
|
||||
self.mavlink.manual_control_send(self.mav_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore
|
||||
|
||||
def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'AUTO']):
|
||||
raise NotImplementedError('Setting mode is not implemented yet')
|
||||
|
||||
def cli(self, cmd: str, wait_response: bool = True) -> str:
|
||||
cmd = cmd.strip()
|
||||
cmd_bytes = (cmd + '\n').encode('utf-8')
|
||||
if len(cmd_bytes) > 70:
|
||||
raise ValueError(f'Command is too long: {len(cmd_bytes)} > 70')
|
||||
cmd_bytes = cmd_bytes.ljust(70, b'\0')
|
||||
response_prefix = f'> {cmd}\n'
|
||||
for attempt in range(3):
|
||||
try:
|
||||
self.mavlink.serial_control_send(0, 0, 0, 0, len(cmd_bytes), cmd_bytes)
|
||||
if not wait_response:
|
||||
return ''
|
||||
response = self.wait('print_full', timeout=0.1, value=lambda text: text.startswith(response_prefix))
|
||||
return response[len(response_prefix):].strip()
|
||||
except TimeoutError:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to send command {cmd} after 3 attempts')
|
||||
|
||||
def reboot(self):
|
||||
self.cli('reboot', wait_response=False)
|
||||
|
||||
def reset(self):
|
||||
self.cli('reset')
|
22
tools/pyproject.toml
Normal file
22
tools/pyproject.toml
Normal file
@ -0,0 +1,22 @@
|
||||
[project]
|
||||
name = "pyflix"
|
||||
version = "0.1.0"
|
||||
description = "Python API for Flix drone"
|
||||
authors = [{ name="Oleg Kalachev", email="okalachev@gmail.com" }]
|
||||
license = "MIT"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
dependencies = [
|
||||
"pymavlink",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.setuptools]
|
||||
packages = ["pyflix"]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://github.com/okalachev/flix/tree/master/tools/pyflix"
|
||||
Issues = "https://github.com/okalachev/flix/issues"
|
Loading…
x
Reference in New Issue
Block a user