Verified Commit a3aa4cf8 authored by Clayton Craft's avatar Clayton Craft
Browse files

stm_agps: add support for slower serial devices by using trio-serial

Reading the device as a file doesn't work well when using a real serial
device at some slow baud, i.e. 'EOF' causes empty strings to be read.
parent f98a335b
......@@ -7,19 +7,23 @@ import os
import pynmea2
import trio
from datetime import datetime
from trio_serial import SerialStream
from .logger import LoggedException
class STM_AGPS:
def __init__(self, serial_port):
def __init__(self, serial_port, baud=9600):
self.__log = logging.getLogger(__name__)
if not os.path.exists(serial_port):
raise LoggedException("Serial port does not exist: "
f"{serial_port}")
self._ser_port = serial_port
self._location = b""
# reminder: bytearrays are mutable
self._buf = bytearray()
self._baud = baud
async def __aenter__(self):
await self.open()
......@@ -33,24 +37,39 @@ class STM_AGPS:
async def open(self):
try:
self._ser = await trio.open_file(self._ser_port,
"w+b", buffering=0)
self._ser = SerialStream(self._ser_port, baudrate=self._baud)
await self._ser.aopen()
except Exception as e:
raise LoggedException(e)
async def readline(self):
return await self._ser.readline()
# based on this implementation of readline:
# https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
idx = self._buf.find(b'\n')
if idx >= 0:
line = self._buf[:idx+1]
self._buf = bytearray(self._buf[idx+1:])
return bytes(line)
while True:
data = await self._ser.receive_some(10)
idx = data.find(b'\n')
if idx >= 0:
line = self._buf + data[:idx+1]
self._buf = bytearray(data[idx+1:])
return bytes(line)
else:
self._buf.extend(data)
async def _serial_write_cmd(self, cmd, expect=None):
# number of times to poll serial output for ACK after sending command
polling_loops = 50
self.__log.info(f"cmd: {cmd}")
await self._ser.write(str(cmd).encode("ascii"))
await self._ser.write(b'\r\n')
await self._ser.send_all(str(cmd).encode("ascii"))
await self._ser.send_all(b'\r\n')
if expect:
for i in range(polling_loops):
line = await self._ser.readline()
line = await self.readline()
line = line[:-1]
self.__log.info(f"read: {line}")
if expect.encode("ascii") in line:
......@@ -62,7 +81,7 @@ class STM_AGPS:
# wait for cmd completion
for i in range(polling_loops):
line = await self._ser.readline()
line = await self.readline()
line = line[:-1]
self.__log.info(f"read: {line}")
if str(cmd).encode("ascii") in line:
......@@ -82,7 +101,7 @@ class STM_AGPS:
if line.startswith(ack.encode()):
self.__log.info(line)
await f.write(line + b'\n')
line = await self._ser.readline()
line = await self.readline()
line = line[:-1]
async def _load_from_file(self, ack, file):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment