2025-08-13 18:05:26 +02:00

286 lines
9.7 KiB
Python

from typing import Any, List, Optional
from datetime import datetime, timedelta
from fastapi import APIRouter, Depends, HTTPException, Body
from sqlalchemy.orm import Session
from sqlalchemy import desc
from app.db.session import get_db
from app.models.device import Device
from app.models.telemetry import Telemetry
from app.schemas.telemetry import TelemetryCreate, TelemetryResponse, IoTDeviceData, TelemetryWithDevice
router = APIRouter()
def convert_battery_voltage_to_percentage(voltage: float) -> float:
"""
Convert battery voltage to percentage for 12V lead-acid battery system (10.8V - 12.6V range)
"""
if voltage is None:
return None
# 12V battery voltage range: 10.8V (0%) to 12.6V (100%)
min_voltage = 10.8 # Minimum usable voltage
max_voltage = 12.6 # Full charge resting voltage
if voltage <= min_voltage:
return 0.0
elif voltage >= max_voltage:
return 100.0
else:
return ((voltage - min_voltage) / (max_voltage - min_voltage)) * 100.0
def convert_iot_data_to_telemetry(iot_data: IoTDeviceData) -> TelemetryCreate:
"""
Convert IoT device data format to internal telemetry format
"""
# Convert device timestamp if provided
device_timestamp = None
if iot_data.timestamp:
device_timestamp = datetime.fromtimestamp(iot_data.timestamp)
# Convert battery voltage to percentage
battery_level = None
if iot_data.battery_volt:
battery_level = convert_battery_voltage_to_percentage(iot_data.battery_volt)
return TelemetryCreate(
device_id=iot_data.device_id,
latitude=iot_data.latitude,
longitude=iot_data.longitude,
altitude=iot_data.altitude,
speed=None, # Not provided by IoT device
heading=None, # Not provided by IoT device
satellites=iot_data.satellites,
gps_fixed=iot_data.gps_fixed,
battery_level=battery_level,
battery_voltage=iot_data.battery_volt,
signal_strength=iot_data.signal,
temperature=iot_data.temp,
humidity=iot_data.hum,
solar_voltage=iot_data.solar_volt,
device_timestamp=device_timestamp,
extra_data={
"firmware": iot_data.firmware,
"device_name": iot_data.device_name
} if iot_data.firmware or iot_data.device_name else None
)
@router.post("/", response_model=TelemetryResponse)
def create_telemetry(
*,
db: Session = Depends(get_db),
telemetry_in: TelemetryCreate,
) -> Any:
"""
Create new telemetry data entry (original format).
"""
# Check if device exists
device = db.query(Device).filter(Device.id == telemetry_in.device_id).first()
if not device:
# Auto-create device if it doesn't exist
device = Device(id=telemetry_in.device_id, name=f"Device {telemetry_in.device_id}")
db.add(device)
db.commit()
db.refresh(device)
# Create telemetry entry
telemetry = Telemetry(
device_id=telemetry_in.device_id,
latitude=telemetry_in.latitude,
longitude=telemetry_in.longitude,
altitude=telemetry_in.altitude,
speed=telemetry_in.speed,
heading=telemetry_in.heading,
satellites=telemetry_in.satellites,
gps_fixed=telemetry_in.gps_fixed,
battery_level=telemetry_in.battery_level,
battery_voltage=telemetry_in.battery_voltage,
signal_strength=telemetry_in.signal_strength,
temperature=telemetry_in.temperature,
humidity=telemetry_in.humidity,
solar_voltage=telemetry_in.solar_voltage,
extra_data=telemetry_in.extra_data,
device_timestamp=telemetry_in.device_timestamp,
)
db.add(telemetry)
# Update device with latest telemetry
device.last_seen = datetime.now()
if telemetry_in.battery_level is not None:
device.battery_level = telemetry_in.battery_level
if telemetry_in.signal_strength is not None:
device.signal_strength = telemetry_in.signal_strength
if telemetry_in.latitude and telemetry_in.longitude:
device.latitude = telemetry_in.latitude
device.longitude = telemetry_in.longitude
if telemetry_in.altitude:
device.altitude = telemetry_in.altitude
# Update device info from extra_data if available
if telemetry_in.extra_data:
if "device_name" in telemetry_in.extra_data and telemetry_in.extra_data["device_name"]:
device.name = telemetry_in.extra_data["device_name"]
if "firmware" in telemetry_in.extra_data and telemetry_in.extra_data["firmware"]:
device.firmware_version = telemetry_in.extra_data["firmware"]
db.add(device)
db.commit()
db.refresh(telemetry)
return telemetry
@router.post("/iot", response_model=TelemetryResponse)
def create_iot_telemetry(
*,
db: Session = Depends(get_db),
iot_data: IoTDeviceData,
) -> Any:
"""
Create new telemetry data entry from IoT device (LILYGO format).
This endpoint handles the specific JSON format sent by LILYGO T-A7670G devices.
"""
# Convert IoT data format to internal format
telemetry_data = convert_iot_data_to_telemetry(iot_data)
# Check if device exists
device = db.query(Device).filter(Device.id == iot_data.device_id).first()
if not device:
# Auto-create device with IoT device info
device = Device(
id=iot_data.device_id,
name=iot_data.device_name or f"LILYGO {iot_data.device_id}",
model="LILYGO T-A7670G",
firmware_version=iot_data.firmware
)
db.add(device)
db.commit()
db.refresh(device)
else:
# Update device info if provided
if iot_data.device_name:
device.name = iot_data.device_name
if iot_data.firmware:
device.firmware_version = iot_data.firmware
# Create telemetry entry
telemetry = Telemetry(
device_id=telemetry_data.device_id,
latitude=telemetry_data.latitude,
longitude=telemetry_data.longitude,
altitude=telemetry_data.altitude,
speed=telemetry_data.speed,
heading=telemetry_data.heading,
satellites=telemetry_data.satellites,
gps_fixed=telemetry_data.gps_fixed,
battery_level=telemetry_data.battery_level,
battery_voltage=telemetry_data.battery_voltage,
signal_strength=telemetry_data.signal_strength,
temperature=telemetry_data.temperature,
humidity=telemetry_data.humidity,
solar_voltage=telemetry_data.solar_voltage,
extra_data=telemetry_data.extra_data,
device_timestamp=telemetry_data.device_timestamp,
)
db.add(telemetry)
# Update device with latest telemetry
device.last_seen = datetime.now()
if telemetry_data.battery_level is not None:
device.battery_level = telemetry_data.battery_level
if telemetry_data.signal_strength is not None:
device.signal_strength = telemetry_data.signal_strength
if telemetry_data.latitude and telemetry_data.longitude:
device.latitude = telemetry_data.latitude
device.longitude = telemetry_data.longitude
if telemetry_data.altitude:
device.altitude = telemetry_data.altitude
db.add(device)
db.commit()
db.refresh(telemetry)
return telemetry
@router.get("/device/{device_id}", response_model=List[TelemetryWithDevice])
def get_device_telemetry(
*,
db: Session = Depends(get_db),
device_id: str,
skip: int = 0,
limit: int = 100,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
) -> Any:
"""
Get telemetry data for a specific device.
"""
# Check if device exists
device = db.query(Device).filter(Device.id == device_id).first()
if not device:
raise HTTPException(status_code=404, detail="Device not found")
# Build query
query = db.query(Telemetry).filter(Telemetry.device_id == device_id)
if start_date:
query = query.filter(Telemetry.timestamp >= start_date)
if end_date:
query = query.filter(Telemetry.timestamp <= end_date)
# Get results
telemetry_data = query.order_by(desc(Telemetry.timestamp)).offset(skip).limit(limit).all()
# Add device info to each telemetry record
result = []
for telem in telemetry_data:
telem_dict = TelemetryWithDevice.from_orm(telem).dict()
telem_dict.update({
"device_name": device.name,
"device_model": device.model,
"firmware_version": device.firmware_version
})
result.append(TelemetryWithDevice(**telem_dict))
return result
@router.get("/latest", response_model=List[TelemetryWithDevice])
def get_latest_telemetry(
*,
db: Session = Depends(get_db),
limit: int = 10,
) -> Any:
"""
Get latest telemetry data across all devices.
"""
# Get latest telemetry for each device
subquery = db.query(
Telemetry.device_id,
desc(Telemetry.timestamp).label('max_timestamp')
).group_by(Telemetry.device_id).subquery()
# Join with telemetry and device tables
latest_telemetry = db.query(Telemetry, Device).join(
subquery,
(Telemetry.device_id == subquery.c.device_id) &
(Telemetry.timestamp == subquery.c.max_timestamp)
).join(Device, Telemetry.device_id == Device.id).limit(limit).all()
# Format response
result = []
for telem, device in latest_telemetry:
telem_dict = TelemetryWithDevice.from_orm(telem).dict()
telem_dict.update({
"device_name": device.name,
"device_model": device.model,
"firmware_version": device.firmware_version
})
result.append(TelemetryWithDevice(**telem_dict))
return result