Verified Commit d38ca1b0 authored by Clayton Craft's avatar Clayton Craft
Browse files

stm_agps_serial: add new driver for STM serial devices

Extends STM_AGPS, and uses trio-serial for managing the serial
parent c8c25119
......@@ -36,7 +36,7 @@ following signals:
### Dependencies:
- [Trio](
- [Trio-Serial](
- [Trio-Serial]( (only for STM serial devices)
- [pynmea2](
This project uses meson to "build" and install things:
......@@ -6,7 +6,7 @@ socket=/var/run/gnss_share.sock
# GPS device driver to use
# Supported values: stm
# Supported values: stm, stm_serial
# Path to GPS device to use
......@@ -14,6 +14,7 @@ from logging.handlers import SysLogHandler
from .logger import LoggedException
from .stm_agps import STM_AGPS
from .stm_agps_serial import STM_AGPS_SERIAL
# List of NMEA prefixes to send to clients.
# This list is the same one that geoclue monitors.
......@@ -32,7 +33,8 @@ logger = logging.getLogger("gnss_share")
class GnssShare:
drivers = {
'stm': STM_AGPS
'stm': STM_AGPS,
'stm_serial': STM_AGPS_SERIAL,
def __init__(self, config):
# Copyright(c) 2021 by craftyguy "Clayton Craft" <>
# Distributed under GPLv3+ (see COPYING) WITHOUT ANY WARRANTY.
import logging
from trio_serial import SerialStream
from .logger import LoggedException
from .stm_agps import STM_AGPS
def __init__(self, serial_port, baud=9600):
self.__log = logging.getLogger(__name__)
self._baud = baud
# reminder: bytearrays are mutable
self._buf = bytearray()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
await self.close()
async def close(self):
await self._ser.aclose()
async def open(self):
self._ser = SerialStream(self._ser_port, baudrate=self._baud)
await self._ser.aopen()
except Exception as e:
raise LoggedException(e)
async def readline(self):
# based on this implementation of readline:
idx = self._buf.find(b'\n')
if idx >= 0:
line = self._buf[:idx+1]
self._buf = bytearray(self._buf[idx+1:])
return bytes(line)
while True:
data = await self._ser.receive_some(10)
idx = data.find(b'\n')
if idx >= 0:
line = self._buf + data[:idx+1]
self._buf = bytearray(data[idx+1:])
return bytes(line)
async def _write(self, data):
await self._ser.send_all(data)
......@@ -27,6 +27,7 @@ pkgsources = [
install_data(pkgsources, install_dir : moduledir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment