stm_agps.py 5.95 KB
Newer Older
Clayton Craft's avatar
Clayton Craft committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Copyright(c) 2021 by Angus Ainslee
# Copyright(c) 2021 by Purism SPC
# Copyright(c) 2021 by craftyguy "Clayton Craft" <clayton@craftyguy.net>
# Distributed under GPLv3+ (see COPYING) WITHOUT ANY WARRANTY.
import logging
import os
import pynmea2
import trio
from datetime import datetime

from .logger import LoggedException


class STM_AGPS:

16
    def __init__(self, serial_port, baud=None):
Clayton Craft's avatar
Clayton Craft committed
17
18
19
20
21
22
        self.__log = logging.getLogger(__name__)
        if not os.path.exists(serial_port):
            raise LoggedException("Serial port does not exist: "
                                  f"{serial_port}")
        self._ser_port = serial_port
        self._location = b""
23
        # reminder: bytearrays are mutable
24
        self._buf = bytearray()
Clayton Craft's avatar
Clayton Craft committed
25
26
27
28
29
30
31
32
33
34
35
36
37

    async def __aenter__(self):
        await self.open()
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        await self.close()

    async def close(self):
        await self._ser.aclose()

    async def open(self):
        try:
38
39
            self._ser = await trio.open_file(self._ser_port,
                                             "w+b", buffering=0)
40
            self._ser_read_cmd = self._ser.read
Clayton Craft's avatar
Clayton Craft committed
41
42
43
44
        except Exception as e:
            raise LoggedException(e)

    async def readline(self):
45
46
47
48
49
50
51
52
        # based on this implementation of readline:
        # https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
        idx = self._buf.find(b'\n')
        if idx >= 0:
            line = self._buf[:idx+1]
            self._buf = bytearray(self._buf[idx+1:])
            return bytes(line)
        while True:
53
            data = await self._ser_read_cmd(40)
54
55
56
57
58
59
60
61
            idx = data.find(b'\n')
            if idx >= 0:
                line = self._buf + data[:idx+1]
                self._buf = bytearray(data[idx+1:])
                return bytes(line)
            else:
                self._buf.extend(data)
            # sleep to prevent spinning faster than the device can write
62
            await trio.sleep(0.1)
Clayton Craft's avatar
Clayton Craft committed
63

64
65
66
    async def _write(self, data):
        await self._ser.write(data)

Clayton Craft's avatar
Clayton Craft committed
67
68
69
70
71
    async def _serial_write_cmd(self, cmd, expect=None):
        # number of times to poll serial output for ACK after sending command
        polling_loops = 50

        self.__log.info(f"cmd: {cmd}")
72
73
        await self._write(str(cmd).encode("ascii"))
        await self._write(b'\r\n')
Clayton Craft's avatar
Clayton Craft committed
74
75
        if expect:
            for i in range(polling_loops):
Clayton Craft's avatar
Clayton Craft committed
76
                line = await self.readline()
Clayton Craft's avatar
Clayton Craft committed
77
78
79
80
81
82
83
84
85
86
87
                line = line[:-1]
                self.__log.info(f"read: {line}")
                if expect.encode("ascii") in line:
                    self.__log.info(f"found: {expect}: {line}")
                    return True, line

            self.__log.info(f"not found: {expect}: {line}")
            return False, line

        # wait for cmd completion
        for i in range(polling_loops):
Clayton Craft's avatar
Clayton Craft committed
88
            line = await self.readline()
Clayton Craft's avatar
Clayton Craft committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
            line = line[:-1]
            self.__log.info(f"read: {line}")
            if str(cmd).encode("ascii") in line:
                return True, line

        return False, line

    async def _store_to_file(self, cmd, ack, file):
        msg = pynmea2.GGA('P', cmd, ())
        result, line = await self._serial_write_cmd(msg, ack)
        if not result:
            raise LoggedException("Unable to get data from device")
        async with await trio.open_file(file, 'wb') as f:
            while True:
                if cmd.encode() in line:
                    return
                if line.startswith(ack.encode()):
                    self.__log.info(line)
                    await f.write(line + b'\n')
Clayton Craft's avatar
Clayton Craft committed
108
                line = await self.readline()
Clayton Craft's avatar
Clayton Craft committed
109
110
                line = line[:-1]

111
112
113
114
115
    async def _load_from_file(self, ack, file):
        async with await trio.open_file(file, 'rb') as f:
            while line := await f.readline():
                await self._serial_write_cmd(line.strip(), ack)

Clayton Craft's avatar
Clayton Craft committed
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    async def reset(self):
        msg = pynmea2.GGA('P', 'STMGPSRESET', ())
        await self._serial_write_cmd(msg)
        await trio.sleep(1)

    async def store(self, dir):
        almanac_path = os.path.join(dir, 'almanac.txt')
        ephemeris_path = os.path.join(dir, 'ephemeris.txt')

        # reset device in case it is stuck
        await self.reset()

        await self._store_almanac(almanac_path)
        await self._store_ephemeris(ephemeris_path)

    async def load(self, dir):
        almanac_path = os.path.join(dir, 'almanac.txt')
        ephemeris_path = os.path.join(dir, 'ephemeris.txt')

        for file in [almanac_path, ephemeris_path]:
136
            if not os.path.exists(file):
Clayton Craft's avatar
Clayton Craft committed
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
                self.__log.warn(f"AGPS file not found: {file}")
                self.__log.warn("*NOT* loading AGPS data")
                return

        # reset device in case it is stuck, and set time
        await self.reset()
        await self.set_time()

        await self._load_almanac(almanac_path)
        await self._load_ephemeris(ephemeris_path)

    async def _store_ephemeris(self, file='ephemeris.txt'):
        await self._store_to_file('STMDUMPEPHEMS', '$PSTMEPHEM', file)

    async def _store_almanac(self, file='almanac.txt'):
        await self._store_to_file('STMDUMPALMANAC', '$PSTMALMANAC', file)

    async def _load_ephemeris(self, file='ephemeris.txt'):
155
        await self._load_from_file('$PSTMEPHEMOK', file)
Clayton Craft's avatar
Clayton Craft committed
156
157

    async def _load_almanac(self, file='almanac.txt'):
158
        await self._load_from_file('$PSTMALMANACOK', file)
Clayton Craft's avatar
Clayton Craft committed
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

    async def set_time(self):
        await self.reset()
        now = datetime.utcnow()

        # INITTIME expects values to be 2 or 4 digits long
        msg = pynmea2.GGA('P', 'STMINITTIME', (
            now.strftime('%d'),
            now.strftime('%m'),
            now.strftime('%Y'),
            now.strftime('%H'),
            now.strftime('%M'),
            now.strftime('%S'),
            ))
        ret, line = await self._serial_write_cmd(msg, "STMINITTIMEOK")
        if not ret:
            raise LoggedException(f"ERROR: {line}")
        self.__log.info(line)