project_inertial-control/lidar_module/lidar_reader.py
2026-04-29 08:31:56 +03:00

104 lines
3.1 KiB
Python

import serial
import time
import sqlite3
import struct
import signal
# Константы
SERIAL_PORT = '/dev/ttyS0'
BAUD_RATE = 230400
DB_PATH = '../inertial_data.db'
# Заголовок пакета LDS02RR
HEADER = 0xFA
PACKET_SIZE = 22
# Глобальные переменные для завершения
running = True
conn = None
ser = None
def shutdown(signum, frame):
"""Обработчик сигнала завершения"""
global running
print("Завершение лидар модуля...")
running = False
# Перехватываем сигналы завершения
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
def init_db():
"""Создать таблицу если не существует и очистить"""
conn = sqlite3.connect(DB_PATH)
conn.execute('''CREATE TABLE IF NOT EXISTS lidar_data (
timestamp REAL,
angle REAL,
distance_mm REAL,
quality INTEGER
)''')
conn.execute('DELETE FROM lidar_data')
conn.commit()
return conn
def parse_packet(data):
"""Парсить один пакет данных лидара"""
if len(data) < PACKET_SIZE or data[0] != HEADER:
return None
index = data[1] - 0xA0
if index < 0 or index > 89:
return None
points = []
for i in range(4):
offset = 4 + i * 4
distance_raw = struct.unpack_from('<H', data, offset)[0]
quality = struct.unpack_from('<H', data, offset + 2)[0] >> 2
distance = distance_raw / 4.0
angle = index * 4 + i
points.append((angle, distance, quality))
return points
def save_to_db():
"""Основной цикл записи данных в БД"""
global conn, ser, running
conn = init_db()
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print("Подключено к лидару")
except Exception as e:
print(f"Ошибка подключения: {e}")
conn.close()
return
print("Запись данных лидара...")
buffer = bytearray()
start_time = time.time()
try:
while running:
data = ser.read(PACKET_SIZE)
if not data:
continue
buffer.extend(data)
while len(buffer) >= PACKET_SIZE:
if buffer[0] == HEADER:
packet = bytes(buffer[:PACKET_SIZE])
points = parse_packet(packet)
if points:
t = round(time.time() - start_time, 3)
for angle, distance, quality in points:
conn.execute(
'INSERT INTO lidar_data VALUES (?, ?, ?, ?)',
(t, angle, round(distance, 1), quality)
)
conn.commit()
buffer = buffer[PACKET_SIZE:]
else:
buffer.pop(0)
finally:
ser.close()
conn.close()
print("Лидар модуль остановлен")
save_to_db()