Commit c9da8e17 authored by Angus Ainslie's avatar Angus Ainslie
Browse files

test_scripts/gnss_test: add a gnss test script



This is formated for the librem5 screen and can display position and
satellites S/N ratio. It can also interface with the gts1800 test fixture.
Signed-off-by: default avatarAngus Ainslie <angus@akkea.ca>
parent d7447232
Pipeline #62731 passed with stages
in 3 minutes and 31 seconds
#!/usr/bin/python
import pynmea2
import os
import threading
import socket
import time
import datetime
import sys
import copy
from os import path
gts1800IP = None
gts1800Port = 8080
NMEAudpPort = None
# this is to get data from gps-share instead of the gnss directly
gsIP = '127.0.0.1'
# gsPort = 10110
gsPort = 0
VERSION = "0.2"
debug = False
seed = False
class GtsData:
def __init__(self):
self.lat = 0.0
self.lon = 0.0
self.altitude = 0.0
self.timestamp = None
self.datestamp = None
self.sats = {}
self.gpSats = []
self.glSats = []
self.leap_seconds = 18
self.accuracy = '99.0'
self.pos_status = 'V'
self.fix_type = '0'
@property
def solSats(self):
sats = copy.deepcopy(self.gpSats) + copy.deepcopy(self.glSats)
return sats
@property
def datetime(self):
if self.datestamp and self.timestamp:
return datetime.datetime.combine(self.datestamp, self.timestamp)
return None
@property
def antenna(self):
ret = 0
if self.pos_status == 'A':
ret = ret | 1
if len(self.solSats) > 0:
ret = ret | 2
return ret
@property
def fixsuc(self):
if self.pos_status == 'A':
return 1
return 0
@property
def fix(self):
if self.fix_type == '2':
return '2D'
elif self.fix_type == '3':
return '3D'
else:
return 'None'
return 'NA'
@property
def fixtime(self):
if self.datetime:
return (self.datetime - datetime.datetime(1980, 1, 6)).total_seconds() + self.leap_seconds
return 0
@property
def utctime(self):
if self.datetime:
return int(time.mktime(self.datetime.timetuple()))
return 0
@property
def timems(self):
if not self.timestamp:
return 0
# we need to spoof milliseconds because the module doesn't provide that
timesec = time.time()
return int((timesec - int(timesec)) * 1000)
@property
def time10ms(self):
if not self.timestamp:
return 0
# we need to spoof milliseconds because the module doesn't provide that
timesec = time.time()
return int((timesec - int(timesec)) * 100)
class GnssThread(threading.Thread):
def __init__(self, ip=None, port=None):
threading.Thread.__init__(self)
self.gts = GtsData()
self.NMEAdata = []
if port != 0:
self.gnss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if ip is not None and port is not None:
self.gnss.connect((ip, port))
else:
self.gnss = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.gnss.connect('/var/run/gps-share.sock')
self.almanac = False
self.running = True
def write_cmd(self, cmd, expect=None):
print("cmd: %s" % cmd)
with open("/dev/gnss0", 'wb') as f:
f.write(str(cmd).encode("ascii"))
f.write(b'\r\n')
if expect:
for i in range(20):
line = self.gnss.recv(1024)
if expect.encode("ascii") in line:
sys.stderr.write('found: %s:%s\n' % (expect, line))
return True, line
sys.stderr.write('not found: %s:%s\n' % (expect, line))
return False, line
# wait for cmd completion
for i in range(20):
line = self.gnss.recv(1024)
if str(cmd).encode("ascii") in line:
return True, line
return False, line
def dump_sat_info(self, cmd, filename, match, finish):
flush = self.gnss.recv(32768)
msg = pynmea2.GGA('P', cmd, ())
self.write_cmd(msg)
data = {}
if path.exists(filename):
with open(filename, 'rb') as f:
for line in f:
# print("dump line: ", line)
if line.startswith(b'$P%s' % cmd):
# print("found existing line: ", line)
index = line.split(',')[1]
data[index] = line
for i in range(0, 400):
line = self.gnss.recv(1024)
# print("dump line: ", line)
if finish in line:
# print("dump %s finished" % cmd)
with open(filename, 'wb') as f:
for index in data:
f.write(data[index])
return
if line.startswith(match):
# print("dump found new line: ", line.split(',')[1], ":", line)
index = int(line.split(',')[1])
data[index] = line
print("dump %s finish %s: failed" % (cmd, finish))
def dump_almanac(self):
self.dump_sat_info('STMDUMPALMANAC', "almanac.txt", b'$PSTMALMANAC', b'STMDUMPALMANAC')
def dump_ephemerides(self):
self.dump_sat_info('STMDUMPEPHEMS', "ephemerides.txt", b'$PSTMEPHEM', b'STMDUMPEPHEMS')
def seed(self, filename):
with open(filename, 'rb') as f:
for line in f:
with open("/dev/gnss0", 'wb') as of:
of.write(str(line).encode("ascii"))
of.write(b'\r\n')
time.sleep(0.04)
def seed_almanac(self):
self.seed("almanac.txt")
def seed_ephemerides(self):
self.seed("ephemerides.txt")
def run(self):
data = ''
lastTime = time.time()
while self.running:
data += self.gnss.recv(32768)
if not data.decode('ascii', errors='replace').endswith("\n"):
# sys.stderr.write('skipping: %s\n' % data)
continue
if debug:
sys.stderr.write('read: %s\n' % data)
lines = data.decode('ascii', errors='replace').split("\n")
lines = filter(None, lines)
data = ''
for line in lines:
if not line.startswith('$'):
continue
msg = None
if NMEAudpPort:
self.NMEAdata.append(line)
try:
msg = pynmea2.parse(line)
if debug:
print(repr(msg))
if isinstance(msg, pynmea2.types.talker.GSV):
if msg.snr_1:
self.gts.sats[msg.sv_prn_num_1] = (msg.elevation_deg_1, msg.azimuth_1, msg.snr_1)
elif len(msg.sv_prn_num_1) and msg.sv_prn_num_1 in self.gts.sats:
del self.gts.sats[msg.sv_prn_num_1]
if msg.snr_2:
self.gts.sats[msg.sv_prn_num_2] = (msg.elevation_deg_2, msg.azimuth_2, msg.snr_2)
elif len(msg.sv_prn_num_2) and msg.sv_prn_num_2 in self.gts.sats:
del self.gts.sats[msg.sv_prn_num_2]
if msg.snr_3:
self.gts.sats[msg.sv_prn_num_3] = (msg.elevation_deg_3, msg.azimuth_3, msg.snr_3)
elif len(msg.sv_prn_num_3) and msg.sv_prn_num_3 in self.gts.sats:
del self.gts.sats[msg.sv_prn_num_3]
if msg.snr_4:
self.gts.sats[msg.sv_prn_num_4] = (msg.elevation_deg_4, msg.azimuth_4, msg.snr_4)
elif len(msg.sv_prn_num_4) and msg.sv_prn_num_4 in self.gts.sats:
del self.gts.sats[msg.sv_prn_num_4]
elif isinstance(msg, pynmea2.types.talker.GGA):
# print(repr(msg))
self.gts.timestamp = msg.timestamp
self.gts.lon = msg.longitude
self.gts.lat = msg.latitude
self.gts.altitude = msg.altitude
self.gts.accuracy = msg.horizontal_dil
elif isinstance(msg, pynmea2.types.talker.RMC):
# print(repr(msg))
self.gts.datestamp = msg.datestamp
self.gts.timestamp = msg.timestamp
self.gts.pos_status = msg.status
elif isinstance(msg, pynmea2.types.talker.GSA):
solSats = []
self.gts.fix_type = msg.mode_fix_type
if len(msg.sv_id01):
solSats.append(msg.sv_id01)
if len(msg.sv_id02):
solSats.append(msg.sv_id02)
if len(msg.sv_id03):
solSats.append(msg.sv_id03)
if len(msg.sv_id04):
solSats.append(msg.sv_id04)
if len(msg.sv_id05):
solSats.append(msg.sv_id05)
if len(msg.sv_id06):
solSats.append(msg.sv_id06)
if len(msg.sv_id07):
solSats.append(msg.sv_id07)
if len(msg.sv_id08):
solSats.append(msg.sv_id08)
if len(msg.sv_id09):
solSats.append(msg.sv_id09)
if len(msg.sv_id10):
solSats.append(msg.sv_id10)
if len(msg.sv_id11):
solSats.append(msg.sv_id11)
if len(msg.sv_id12):
solSats.append(msg.sv_id12)
if len(solSats):
if int(solSats[0]) > 64:
# GLONASS sats
self.gts.glSats = solSats
else:
self.gts.gpSats = solSats
except Exception as e:
print("exception: ", e, "\nNMEA read failed ", data)
if msg:
print(repr(msg))
if self.gts.fix > 2 and (time.time() - lastTime) > 300:
self.dump_almanac()
self.dump_ephemerides()
self.almanac = True
lastTime = time.time()
if __name__ == '__main__':
if NMEAudpPort == gts1800Port:
print('Disabling NMEA transfers')
NMEAudpPort = None
if gsIP is not None:
gnsst = GnssThread(gsIP, gsPort)
while not gnsst.running:
time.sleep(0.5)
if seed and path.exists("almanac.txt"):
gnsst.seed_almanac()
if seed and path.exists("ephemerides.txt"):
gnsst.seed_ephemerides()
gnsst.start()
if gts1800IP is not None:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
print('Fix time ', gnsst.gts.fixtime)
try:
while True:
if not debug:
os.system('clear')
sats = b''
solSats = b''
localSats = copy.deepcopy(gnsst.gts.sats)
localSolSats = gnsst.gts.solSats
for sat in localSats:
if len(sats) > 0:
sats = sats + b'&'
sats = sats + b'S%s:%.1f' % (sat, float(localSats[sat][2]))
if sat in localSolSats:
solSats = solSats + b'S%s:%.1f ' % (sat, float(localSats[sat][2]))
print
if gts1800IP is not None:
print('GTS-GPS %s %s:%d' % (VERSION, gts1800IP, gts1800Port))
else:
print('GPS %s' % (VERSION))
print('---------------------------------------')
print('longitude %3.8f' % gnsst.gts.lon)
print('latitude %2.8f' % gnsst.gts.lat)
print('altitude (m) %.2f' % gnsst.gts.altitude)
print('accuracy %s' % gnsst.gts.accuracy)
print('fix %s' % gnsst.gts.fix)
print('sats ')
print(solSats)
print('timestamp %s.%02d' % (gnsst.gts.datetime, gnsst.gts.time10ms))
print('Fix time %.3f' % gnsst.gts.utctime)
print
# print 'gnsst info', gnsst.gt
if gts1800IP is not None:
msg = b'%s.%03d,%d,0,%d,%d,%s,%3.8f,%2.8f,%.3f,' % (gnsst.gts.datetime, gnsst.gts.timems, gnsst.gts.antenna, gnsst.gts.fixsuc, gnsst.gts.utctime, gnsst.gts.accuracy, gnsst.gts.lon, gnsst.gts.lat, gnsst.gts.altitude) + sats
msg = b'%04d%s\x00' % (len(msg), msg)
print('msg: %d %s' % (len(msg), msg))
sock.sendto(msg, (gts1800IP, gts1800Port))
while (len(gnsst.NMEAdata)):
data = gnsst.NMEAdata.pop()
if debug:
print("nmea send: ", data)
if NMEAudpPort:
sock.sendto(data, (gts1800IP, NMEAudpPort))
time.sleep(0.5)
except Exception as err:
print("\nKilling Thread: ", format(err))
except KeyboardInterrupt:
print("\nKilling Thread")
finally:
gnsst.running = False
gnsst.join()
print("Done.\nExiting.")
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment