mirror of
https://github.com/okalachev/flix.git
synced 2026-01-11 21:46:55 +00:00
Implement AUTO mode for automatic flights
Support SET_ATTITUDE_TARGET, SET_ACTUATOR_CONTROL_TARGET in mavlink. ACTUATOR_OUTPUT_STATUS is changed ACTUATOR_CONTROL_TARGET to match used message for setting motor outputs. Add support for changing mode from mavlink. Support automatic flights in pyflix.
This commit is contained in:
@@ -66,13 +66,14 @@ gyro = flix.wait('gyro') # wait for gyroscope update
|
||||
attitude = flix.wait('attitude', timeout=3) # wait for attitude update, raise TimeoutError after 3 seconds
|
||||
```
|
||||
|
||||
The `value` argument specifies a condition for filtering events. It can be either an expected value or a callable:
|
||||
The second argument (`value`) specifies a condition for filtering events. It can be either an expected value or a callable:
|
||||
|
||||
```python
|
||||
flix.wait('armed', value=True) # wait until armed
|
||||
flix.wait('armed', value=False) # wait until disarmed
|
||||
flix.wait('motors', value=lambda motors: not any(motors)) # wait until all motors stop
|
||||
flix.wait('attitude_euler', value=lambda att: att[0] > 0) # wait until roll angle is positive
|
||||
flix.wait('armed', True) # wait until armed
|
||||
flix.wait('armed', False) # wait until disarmed
|
||||
flix.wait('mode', 'AUTO') # wait until flight mode is switched to AUTO
|
||||
flix.wait('motors', lambda motors: not any(motors)) # wait until all motors stop
|
||||
flix.wait('attitude_euler', lambda att: att[0] > 0) # wait until roll angle is positive
|
||||
```
|
||||
|
||||
Full list of events:
|
||||
@@ -107,7 +108,7 @@ Get and set firmware parameters using `get_param` and `set_param` methods:
|
||||
|
||||
```python
|
||||
pitch_p = flix.get_param('PITCH_P') # get parameter value
|
||||
flix.set_param('PITCH_P', 5) # set parameter value
|
||||
flix.set_param('PITCH_P', 5) # set parameter value
|
||||
```
|
||||
|
||||
Execute CLI commands using `cli` method. This method returns command response:
|
||||
@@ -121,17 +122,54 @@ flix.cli('reboot') # reboot the drone
|
||||
> [!TIP]
|
||||
> Use `help` command to get the list of available commands.
|
||||
|
||||
You can imitate pilot's controls using `set_controls` method:
|
||||
|
||||
```python
|
||||
flix.set_controls(roll=0, pitch=0, yaw=0, throttle=0.6)
|
||||
```
|
||||
|
||||
> [!WARNING]
|
||||
> This method **is not intended for automatic flights**, only for adding support for a custom pilot input device.
|
||||
|
||||
### Automatic flight
|
||||
|
||||
The flight control feature is in development. List of methods intended for automatic flight control:
|
||||
To perform automatic flight, switch the mode to *AUTO*, either from the remote control, or from the code:
|
||||
|
||||
* `set_position`
|
||||
* `set_velocity`
|
||||
* `set_attitude`
|
||||
* `set_rates`
|
||||
* `set_motors`
|
||||
* `set_controls`
|
||||
* `set_mode`
|
||||
```python
|
||||
flix.set_mode('AUTO')
|
||||
```
|
||||
|
||||
In this mode you can set flight control targets. Setting attitude target:
|
||||
|
||||
```python
|
||||
flix.set_attitude([0.1, 0.2, 0.3], 0.6) # set target roll, pitch, yaw and thrust
|
||||
flix.set_attitude([1, 0, 0, 0], 0.6) # set target attitude quaternion and thrust
|
||||
```
|
||||
|
||||
Setting angular rates target:
|
||||
|
||||
```python
|
||||
flix.set_rates([0.1, 0.2, 0.3], 0.6) # set target roll rate, pitch rate, yaw rate and thrust
|
||||
```
|
||||
|
||||
You also can control raw motors outputs directly:
|
||||
|
||||
```python
|
||||
flix.set_motors([0.5, 0.5, 0.5, 0.5]) # set motors outputs in range [0, 1]
|
||||
```
|
||||
|
||||
In *AUTO* mode, the drone will arm automatically if the thrust is greater than zero, and disarm if thrust is zero. Therefore, to disarm the drone, set thrust to zero:
|
||||
|
||||
```python
|
||||
flix.set_attitude([0, 0, 0], 0) # disarm the drone
|
||||
```
|
||||
|
||||
The following methods are in development and are not functional yet:
|
||||
|
||||
* `set_position` — set target position.
|
||||
* `set_velocity` — set target velocity.
|
||||
|
||||
To exit from *AUTO* mode move control sticks and the drone will switch to *STAB* mode.
|
||||
|
||||
## Usage alongside QGroundControl
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import os
|
||||
import time
|
||||
from queue import Queue, Empty
|
||||
from typing import Literal, Optional, Callable, List, Dict, Any, Union
|
||||
from typing import Literal, Optional, Callable, List, Dict, Any, Union, Sequence
|
||||
import logging
|
||||
import errno
|
||||
from threading import Thread, Timer
|
||||
@@ -40,6 +40,7 @@ class Flix:
|
||||
|
||||
_connection_timeout = 3
|
||||
_print_buffer: str = ''
|
||||
_modes = ['MANUAL', 'ACRO', 'STAB', 'AUTO']
|
||||
|
||||
def __init__(self, system_id: int=1, wait_connection: bool=True):
|
||||
if not (0 <= system_id < 256):
|
||||
@@ -55,7 +56,7 @@ class Flix:
|
||||
if e.errno != errno.EADDRINUSE:
|
||||
raise
|
||||
# Port busy - using proxy
|
||||
logger.debug('Listening on port 14560 (proxy)')
|
||||
logger.debug('Listening on port 14555 (proxy)')
|
||||
self.connection: mavutil.mavfile = mavutil.mavlink_connection('udpin:0.0.0.0:14555', source_system=254) # type: ignore
|
||||
self.connection.target_system = system_id
|
||||
self.mavlink: mavlink.MAVLink = self.connection.mav
|
||||
@@ -147,7 +148,7 @@ class Flix:
|
||||
|
||||
def _handle_mavlink_message(self, msg: mavlink.MAVLink_message):
|
||||
if isinstance(msg, mavlink.MAVLink_heartbeat_message):
|
||||
self.mode = ['MANUAL', 'ACRO', 'STAB', 'USER'][msg.custom_mode]
|
||||
self.mode = self._modes[msg.custom_mode]
|
||||
self.armed = msg.base_mode & mavlink.MAV_MODE_FLAG_SAFETY_ARMED != 0
|
||||
self._trigger('mode', self.mode)
|
||||
self._trigger('armed', self.armed)
|
||||
@@ -168,6 +169,11 @@ class Flix:
|
||||
msg.chan5_raw, msg.chan6_raw, msg.chan7_raw, msg.chan8_raw]
|
||||
self._trigger('channels', self.channels)
|
||||
|
||||
if isinstance(msg, mavlink.MAVLink_actuator_control_target_message):
|
||||
self.motors = msg.controls[:4] # type: ignore
|
||||
self._trigger('motors', self.motors)
|
||||
|
||||
# TODO: to be removed: the old way of passing motor outputs
|
||||
if isinstance(msg, mavlink.MAVLink_actuator_output_status_message):
|
||||
self.motors = msg.actuator[:4] # type: ignore
|
||||
self._trigger('motors', self.motors)
|
||||
@@ -231,6 +237,19 @@ class Flix:
|
||||
def _flu_to_mavlink(v: List[float]) -> List[float]:
|
||||
return Flix._mavlink_to_flu(v)
|
||||
|
||||
def _command_send(self, command: int, params: Sequence[float]):
|
||||
if len(params) != 7:
|
||||
raise ValueError('Command must have 7 parameters')
|
||||
for attempt in range(3):
|
||||
try:
|
||||
logger.debug(f'Send command {command} with params {params} (attempt #{attempt + 1})')
|
||||
self.mavlink.command_long_send(self.system_id, 0, command, 0, *params) # type: ignore
|
||||
self.wait('mavlink.COMMAND_ACK', value=lambda msg: msg.command == command and msg.result == mavlink.MAV_RESULT_ACCEPTED, timeout=0.1)
|
||||
return
|
||||
except TimeoutError:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to send command {command} after 3 attempts')
|
||||
|
||||
def _connected(self):
|
||||
# Reset disconnection timer
|
||||
self._disconnected_timer.cancel()
|
||||
@@ -275,6 +294,11 @@ class Flix:
|
||||
continue
|
||||
raise RuntimeError(f'Failed to set parameter {name} to {value} after 3 attempts')
|
||||
|
||||
def set_mode(self, mode: Union[str, int]):
|
||||
if isinstance(mode, str):
|
||||
mode = self._modes.index(mode.upper())
|
||||
self._command_send(mavlink.MAV_CMD_DO_SET_MODE, (0, mode, 0, 0, 0, 0, 0))
|
||||
|
||||
def set_position(self, position: List[float], yaw: Optional[float] = None, wait: bool = False, tolerance: float = 0.1):
|
||||
raise NotImplementedError('Position control is not implemented yet')
|
||||
|
||||
@@ -282,17 +306,37 @@ class Flix:
|
||||
raise NotImplementedError('Velocity control is not implemented yet')
|
||||
|
||||
def set_attitude(self, attitude: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
if len(attitude) == 3:
|
||||
attitude = Quaternion([attitude[0], attitude[1], attitude[2]]).q # type: ignore
|
||||
elif len(attitude) != 4:
|
||||
raise ValueError('Attitude must be [roll, pitch, yaw] or [w, x, y, z] quaternion')
|
||||
if not (0 <= thrust <= 1):
|
||||
raise ValueError('Thrust must be in range [0, 1]')
|
||||
attitude = self._flu_to_mavlink(attitude)
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_attitude_target_send(0, self.system_id, 0, 0,
|
||||
[attitude[0], attitude[1], attitude[2], attitude[3]],
|
||||
0, 0, 0, thrust)
|
||||
|
||||
def set_rates(self, rates: List[float], thrust: float):
|
||||
raise NotImplementedError('Automatic flight is not implemented yet')
|
||||
if len(rates) != 3:
|
||||
raise ValueError('Rates must be [roll_rate, pitch_rate, yaw_rate]')
|
||||
if not (0 <= thrust <= 1):
|
||||
raise ValueError('Thrust must be in range [0, 1]')
|
||||
rates = self._flu_to_mavlink(rates)
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_attitude_target_send(0, self.system_id, 0,
|
||||
mavlink.ATTITUDE_TARGET_TYPEMASK_ATTITUDE_IGNORE,
|
||||
[1, 0, 0, 0],
|
||||
rates[0], rates[1], rates[2], thrust)
|
||||
|
||||
def set_motors(self, motors: List[float]):
|
||||
if len(motors) != 4:
|
||||
raise ValueError('motors must have 4 values')
|
||||
if not all(0 <= m <= 1 for m in motors):
|
||||
raise ValueError('motors must be in range [0, 1]')
|
||||
raise NotImplementedError
|
||||
for _ in range(2): # duplicate to ensure delivery
|
||||
self.mavlink.set_actuator_control_target_send(int(time.time() * 1000000), 0, self.system_id, 0, motors + [0] * 4) # type: ignore
|
||||
|
||||
def set_controls(self, roll: float, pitch: float, yaw: float, throttle: float):
|
||||
"""Send pilot's controls. Warning: not intended for automatic control"""
|
||||
@@ -300,10 +344,7 @@ class Flix:
|
||||
raise ValueError('roll, pitch, yaw must be in range [-1, 1]')
|
||||
if not 0 <= throttle <= 1:
|
||||
raise ValueError('throttle must be in range [0, 1]')
|
||||
self.mavlink.manual_control_send(self.system_id, roll * 1000, pitch * 1000, yaw * 1000, throttle * 1000, 0) # type: ignore
|
||||
|
||||
def set_mode(self, mode: Literal['MANUAL', 'ACRO', 'STAB', 'USER']):
|
||||
raise NotImplementedError('Setting mode is not implemented yet')
|
||||
self.mavlink.manual_control_send(self.system_id, int(roll * 1000), int(pitch * 1000), int(yaw * 1000), int(throttle * 1000), 0) # type: ignore
|
||||
|
||||
def cli(self, cmd: str, wait_response: bool = True) -> str:
|
||||
cmd = cmd.strip()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "pyflix"
|
||||
version = "0.5"
|
||||
version = "0.6"
|
||||
description = "Python API for Flix drone"
|
||||
authors = [{ name="Oleg Kalachev", email="okalachev@gmail.com" }]
|
||||
license = "MIT"
|
||||
|
||||
Reference in New Issue
Block a user