stm_agps_serial.py 1.92 KB
Newer Older
1
2
3
# Copyright(c) 2021 by craftyguy "Clayton Craft" <clayton@craftyguy.net>
# Distributed under GPLv3+ (see COPYING) WITHOUT ANY WARRANTY.
import logging
4
import trio
5
6
7
8

from .logger import LoggedException
from .stm_agps import STM_AGPS

9
10
11
12
13
14
try:
    from trio_serial import SerialStream
except ImportError:
    print("warning: trio-serial not found, some drivers may not work "
          "correctly")

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

class STM_AGPS_SERIAL(STM_AGPS):

    def __init__(self, serial_port, baud=9600):
        super().__init__(serial_port)
        self.__log = logging.getLogger(__name__)
        self._baud = baud
        # reminder: bytearrays are mutable
        self._buf = bytearray()

    async def __aenter__(self):
        await self.open()
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.close()

    async def close(self):
        await self._ser.aclose()

    async def open(self):
        try:
            self._ser = SerialStream(self._ser_port, baudrate=self._baud)
            await self._ser.aopen()
        except Exception as e:
            raise LoggedException(e)

    async def readline(self):
        # based on this implementation of readline:
        # https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
        idx = self._buf.find(b'\n')
        if idx >= 0:
            line = self._buf[:idx+1]
            self._buf = bytearray(self._buf[idx+1:])
            return bytes(line)
        while True:
51
            data = await self._ser.receive_some(40)
52
53
54
55
56
57
            idx = data.find(b'\n')
            if idx >= 0:
                line = self._buf + data[:idx+1]
                self._buf = bytearray(data[idx+1:])
                return bytes(line)
            else:
58
                self._buf.extend(data)
59
60
            # sleep to prevent spinning faster than the serial device can write
            await trio.sleep(0.1)
61
62
63

    async def _write(self, data):
        await self._ser.send_all(data)