starlink-grpc-tools/dish_grpc_sqlite.py
sparky8512 549a46ae56 Add new dish_grpc script for sqlite output
I'm sure this isn't a particularly optimal implementation, but it's functional.

This required exporting knowledge about the types that will be returned per field from starlink_grpc and moving things around a little in dish_common.
2021-02-03 17:23:01 -08:00

243 lines
7.8 KiB
Python

#!/usr/bin/python3
"""Write Starlink user terminal data to a sqlite database.
This script pulls the current status info and/or metrics computed from the
history data and writes them to the specified sqlite database either once or
in a periodic loop.
Array data is currently written to the database as text strings of comma-
separated values, which may not be the best method for some use cases. If you
find yourself wishing they were handled better, please open a feature request
at https://github.com/sparky8512/starlink-grpc-tools/issues explaining the use
case and how you would rather see it. This only affects a few fields, since
most of the useful data is not in arrays.
NOTE: The Starlink user terminal does not include time values with its
history or status data, so this script uses current system time to compute
the timestamps it writes into the database. It is recommended to run this
script on a host that has its system clock synced via NTP. Otherwise, the
timestamps may get out of sync with real time.
"""
from datetime import datetime
from datetime import timezone
from itertools import repeat
import logging
import signal
import sqlite3
import sys
import time
import dish_common
import starlink_grpc
SCHEMA_VERSION = 1
class Terminated(Exception):
pass
def handle_sigterm(signum, frame):
# Turn SIGTERM into an exception so main loop can clean up
raise Terminated
def parse_args():
parser = dish_common.create_arg_parser(output_description="write it to a sqlite database")
parser.add_argument("database", help="Database file to use")
group = parser.add_argument_group(title="sqlite database options")
group.add_argument("-k",
"--skip-query",
action="store_true",
help="Skip querying for prior sample write point in bulk mode")
opts = dish_common.run_arg_parser(parser, need_id=True)
return opts
def query_counter(opts, gstate):
now = time.time()
cur = gstate.sql_conn.cursor()
cur.execute(
'SELECT "time", "counter" FROM "history" WHERE "time"<? AND "id"=? '
'ORDER BY "time" DESC LIMIT 1', (now, gstate.dish_id))
row = cur.fetchone()
cur.close()
if row and row[0] and row[1]:
if opts.verbose:
print("Existing time base: {0} -> {1}".format(
row[1], datetime.fromtimestamp(row[0], tz=timezone.utc)))
return row
else:
return 0, None
def loop_body(opts, gstate):
tables = {"status": {}, "ping_stats": {}, "usage": {}}
hist_cols = ["time", "id"]
hist_rows = []
def cb_add_item(key, val, category):
tables[category][key] = val
def cb_add_sequence(key, val, category, start):
tables[category][key] = ",".join(str(subv) if subv is not None else "" for subv in val)
def cb_add_bulk(bulk, count, timestamp, counter):
nonlocal hist_cols
if len(hist_cols) == 2:
hist_cols.extend(bulk.keys())
hist_cols.append("counter")
for i in range(count):
timestamp += 1
counter += 1
row = [timestamp, gstate.dish_id]
row.extend(val[i] for val in bulk.values())
row.append(counter)
hist_rows.append(row)
now = int(time.time())
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)
if opts.bulk_mode and not rc:
if gstate.counter is None and opts.samples < 0:
gstate.timestamp, gstate.counter = query_counter(opts, gstate)
rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk)
rows_written = 0
try:
cur = gstate.sql_conn.cursor()
for category, fields in tables.items():
if fields:
sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format(
category, ",".join('"' + x + '"' for x in fields),
",".join(repeat("?",
len(fields) + 2)))
values = [now, gstate.dish_id]
values.extend(fields.values())
cur.execute(sql, values)
rows_written += 1
if hist_rows:
sql = 'INSERT OR REPLACE INTO "history" ({0}) VALUES({1})'.format(
",".join('"' + x + '"' for x in hist_cols), ",".join(repeat("?", len(hist_cols))))
cur.executemany(sql, hist_rows)
rows_written += len(hist_rows)
cur.close()
gstate.sql_conn.commit()
except sqlite3.OperationalError as e:
# these are not necessarily fatal, but also not much can do about
logging.error("Unexpected error from database, discarding %s rows: %s", rows_written, e)
rc = 1
else:
if opts.verbose:
print("Rows written to db:", rows_written)
return rc
def ensure_schema(opts, conn):
cur = conn.cursor()
cur.execute("PRAGMA user_version")
version = cur.fetchone()
if version and version[0] == SCHEMA_VERSION:
cur.close()
return
if opts.verbose:
if version[0]:
print("Upgrading schema from version:", version)
else:
print("Initializing new database")
# If/when more fields get added or changed, the schema will have changed
# and this will have to handle the upgrade case. For now, just create the
# new tables.
tables = {}
name_groups = starlink_grpc.status_field_names()
type_groups = starlink_grpc.status_field_types()
tables["status"] = zip(name_groups, type_groups)
name_groups = starlink_grpc.history_stats_field_names()
type_groups = starlink_grpc.history_stats_field_types()
tables["ping_stats"] = zip(name_groups[0:5], type_groups[0:5])
tables["usage"] = ((name_groups[5], type_groups[5]),)
name_groups = starlink_grpc.history_bulk_field_names()
type_groups = starlink_grpc.history_bulk_field_types()
tables["history"] = ((name_groups[1], type_groups[1]), (["counter"], [int]))
def sql_type(type_class):
if issubclass(type_class, float):
return "REAL"
if issubclass(type_class, bool):
# advisory only, stores as int:
return "BOOLEAN"
if issubclass(type_class, int):
return "INTEGER"
if issubclass(type_class, str):
return "TEXT"
raise TypeError
for table, group_pairs in tables.items():
columns = ['"time" INTEGER NOT NULL', '"id" TEXT NOT NULL']
for name_group, type_group in group_pairs:
for name_item, type_item in zip(name_group, type_group):
name_item = dish_common.BRACKETS_RE.match(name_item).group(1)
if name_item != "id":
columns.append('"{0}" {1}'.format(name_item, sql_type(type_item)))
sql = 'CREATE TABLE "{0}" ({1}, PRIMARY KEY("time","id"))'.format(table, ", ".join(columns))
cur.execute(sql)
cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION))
cur.close()
conn.commit()
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState()
gstate.points = []
gstate.deferred_points = []
signal.signal(signal.SIGTERM, handle_sigterm)
gstate.sql_conn = sqlite3.connect(opts.database)
ensure_schema(opts, gstate.sql_conn)
rc = 0
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
except sqlite3.Error as e:
logging.error("Database error: %s", e)
rc = 1
except Terminated:
pass
finally:
gstate.sql_conn.close()
gstate.shutdown()
sys.exit(rc)
if __name__ == '__main__':
main()