Add new dish_grpc script for sqlite output

I'm sure this isn't a particularly optimal implementation, but it's functional.

This required exporting knowledge about the types that will be returned per field from starlink_grpc and moving things around a little in dish_common.
This commit is contained in:
sparky8512 2021-02-03 17:23:01 -08:00
parent 188733e4fb
commit 549a46ae56
4 changed files with 413 additions and 53 deletions

View file

@ -69,9 +69,9 @@ To collect and record packet loss summary stats at the top of every hour, you co
00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dish_grpc_text.py -H >~/dishStats.csv; ~/bin/dish_grpc_text.py ping_drop >>~/dishStats.csv 00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dish_grpc_text.py -H >~/dishStats.csv; ~/bin/dish_grpc_text.py ping_drop >>~/dishStats.csv
``` ```
`dish_grpc_influx.py` and `dish_grpc_mqtt.py` are similar, but they send their output to an InfluxDB server and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options. `dish_grpc_influx.py`, `dish_grpc_sqlite.py`, and `dish_grpc_mqtt.py` are similar, but they send their output to an InfluxDB server, a sqlite database, and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options.
All 3 scripts support processing status data in addition to the history data. The status data is mostly what appears related to the dish in the Debug Data section of the Starlink app. Specific status or history data groups can be selected by including their mode names on the command line. Run the scripts with `-h` command line option to get a list of available modes. See the documentation at the top of `starlink_grpc.py` for detail on what each of the fields means within each mode group. All 4 scripts support processing status data in addition to the history data. The status data is mostly what appears related to the dish in the Debug Data section of the Starlink app. Specific status or history data groups can be selected by including their mode names on the command line. Run the scripts with `-h` command line option to get a list of available modes. See the documentation at the top of `starlink_grpc.py` for detail on what each of the fields means within each mode group.
By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this: By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this:
``` ```
@ -82,7 +82,7 @@ Some of the scripts (currently only the InfluxDB one) also support specifying op
#### Bulk history data collection #### Bulk history data collection
`dish_grpc_influx.py` and `dish_grpc_text.py` also support a bulk history mode that collects and writes the full second-by-second data instead of summary stats. To select bulk mode, use `bulk_history` for the mode argument. You'll probably also want to use the `-t` option to have it run in a loop. `dish_grpc_influx.py`, `dish_grpc_sqlite.py`, and `dish_grpc_text.py` also support a bulk history mode that collects and writes the full second-by-second data instead of summary stats. To select bulk mode, use `bulk_history` for the mode argument. You'll probably also want to use the `-t` option to have it run in a loop.
### Other scripts ### Other scripts
@ -100,7 +100,7 @@ Possibly more simple examples to come, as the other scripts have started getting
## To Be Done (Maybe) ## To Be Done (Maybe)
More data backend options, including local database support. Maybe more data backend options. If there's one you'd like to see supported, please open a feature request issue.
There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app. There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.

View file

@ -34,19 +34,13 @@ def create_arg_parser(output_description, bulk_history=True):
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Collect status and/or history data from a Starlink user terminal and " + description="Collect status and/or history data from a Starlink user terminal and " +
output_description, output_description,
epilog="Additional arguments can be read from a file by including the @FILENAME as an " epilog="Additional arguments can be read from a file by including @FILENAME as an "
"option, where FILENAME is a path to a file that contains arguments, one per line.", "option, where FILENAME is a path to a file that contains arguments, one per line.",
fromfile_prefix_chars="@", fromfile_prefix_chars="@",
add_help=False) add_help=False)
all_modes = STATUS_MODES + HISTORY_STATS_MODES + UNGROUPED_MODES # need to remember this for later
if bulk_history: parser.bulk_history = bulk_history
all_modes.append("bulk_history")
parser.add_argument("mode",
nargs="+",
choices=all_modes,
help="The data group to record, one or more of: " + ", ".join(all_modes),
metavar="mode")
group = parser.add_argument_group(title="General options") group = parser.add_argument_group(title="General options")
group.add_argument("-h", "--help", action="help", help="Be helpful") group.add_argument("-h", "--help", action="help", help="Be helpful")
@ -91,6 +85,15 @@ def run_arg_parser(parser, need_id=False, no_stdout_errors=False):
Returns: Returns:
An argparse Namespace object with the parsed options set as attributes. An argparse Namespace object with the parsed options set as attributes.
""" """
all_modes = STATUS_MODES + HISTORY_STATS_MODES + UNGROUPED_MODES
if parser.bulk_history:
all_modes.append("bulk_history")
parser.add_argument("mode",
nargs="+",
choices=all_modes,
help="The data group to record, one or more of: " + ", ".join(all_modes),
metavar="mode")
opts = parser.parse_args() opts = parser.parse_args()
# for convenience, set flags for whether any mode in a group is selected # for convenience, set flags for whether any mode in a group is selected
@ -216,6 +219,17 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
add_data(usage, "usage") add_data(usage, "usage")
if opts.bulk_mode and add_bulk: if opts.bulk_mode and add_bulk:
return get_bulk_data(opts, gstate, add_bulk)
return 0
def get_bulk_data(opts, gstate, add_bulk):
"""Fetch bulk data. See `get_data` for details.
This was split out in case bulk data needs to be handled separately, for
example, if dish_id needs to be known before calling.
"""
before = time.time() before = time.time()
start = gstate.counter start = gstate.counter
@ -254,5 +268,3 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None):
gstate.counter = new_counter gstate.counter = new_counter
gstate.timestamp = timestamp + parsed_samples gstate.timestamp = timestamp + parsed_samples
return 0

243
dish_grpc_sqlite.py Normal file
View file

@ -0,0 +1,243 @@
#!/usr/bin/python3
"""Write Starlink user terminal data to a sqlite database.
This script pulls the current status info and/or metrics computed from the
history data and writes them to the specified sqlite database either once or
in a periodic loop.
Array data is currently written to the database as text strings of comma-
separated values, which may not be the best method for some use cases. If you
find yourself wishing they were handled better, please open a feature request
at https://github.com/sparky8512/starlink-grpc-tools/issues explaining the use
case and how you would rather see it. This only affects a few fields, since
most of the useful data is not in arrays.
NOTE: The Starlink user terminal does not include time values with its
history or status data, so this script uses current system time to compute
the timestamps it writes into the database. It is recommended to run this
script on a host that has its system clock synced via NTP. Otherwise, the
timestamps may get out of sync with real time.
"""
from datetime import datetime
from datetime import timezone
from itertools import repeat
import logging
import signal
import sqlite3
import sys
import time
import dish_common
import starlink_grpc
SCHEMA_VERSION = 1
class Terminated(Exception):
pass
def handle_sigterm(signum, frame):
# Turn SIGTERM into an exception so main loop can clean up
raise Terminated
def parse_args():
parser = dish_common.create_arg_parser(output_description="write it to a sqlite database")
parser.add_argument("database", help="Database file to use")
group = parser.add_argument_group(title="sqlite database options")
group.add_argument("-k",
"--skip-query",
action="store_true",
help="Skip querying for prior sample write point in bulk mode")
opts = dish_common.run_arg_parser(parser, need_id=True)
return opts
def query_counter(opts, gstate):
now = time.time()
cur = gstate.sql_conn.cursor()
cur.execute(
'SELECT "time", "counter" FROM "history" WHERE "time"<? AND "id"=? '
'ORDER BY "time" DESC LIMIT 1', (now, gstate.dish_id))
row = cur.fetchone()
cur.close()
if row and row[0] and row[1]:
if opts.verbose:
print("Existing time base: {0} -> {1}".format(
row[1], datetime.fromtimestamp(row[0], tz=timezone.utc)))
return row
else:
return 0, None
def loop_body(opts, gstate):
tables = {"status": {}, "ping_stats": {}, "usage": {}}
hist_cols = ["time", "id"]
hist_rows = []
def cb_add_item(key, val, category):
tables[category][key] = val
def cb_add_sequence(key, val, category, start):
tables[category][key] = ",".join(str(subv) if subv is not None else "" for subv in val)
def cb_add_bulk(bulk, count, timestamp, counter):
nonlocal hist_cols
if len(hist_cols) == 2:
hist_cols.extend(bulk.keys())
hist_cols.append("counter")
for i in range(count):
timestamp += 1
counter += 1
row = [timestamp, gstate.dish_id]
row.extend(val[i] for val in bulk.values())
row.append(counter)
hist_rows.append(row)
now = int(time.time())
rc = dish_common.get_data(opts, gstate, cb_add_item, cb_add_sequence)
if opts.bulk_mode and not rc:
if gstate.counter is None and opts.samples < 0:
gstate.timestamp, gstate.counter = query_counter(opts, gstate)
rc = dish_common.get_bulk_data(opts, gstate, cb_add_bulk)
rows_written = 0
try:
cur = gstate.sql_conn.cursor()
for category, fields in tables.items():
if fields:
sql = 'INSERT OR REPLACE INTO "{0}" ("time","id",{1}) VALUES ({2})'.format(
category, ",".join('"' + x + '"' for x in fields),
",".join(repeat("?",
len(fields) + 2)))
values = [now, gstate.dish_id]
values.extend(fields.values())
cur.execute(sql, values)
rows_written += 1
if hist_rows:
sql = 'INSERT OR REPLACE INTO "history" ({0}) VALUES({1})'.format(
",".join('"' + x + '"' for x in hist_cols), ",".join(repeat("?", len(hist_cols))))
cur.executemany(sql, hist_rows)
rows_written += len(hist_rows)
cur.close()
gstate.sql_conn.commit()
except sqlite3.OperationalError as e:
# these are not necessarily fatal, but also not much can do about
logging.error("Unexpected error from database, discarding %s rows: %s", rows_written, e)
rc = 1
else:
if opts.verbose:
print("Rows written to db:", rows_written)
return rc
def ensure_schema(opts, conn):
cur = conn.cursor()
cur.execute("PRAGMA user_version")
version = cur.fetchone()
if version and version[0] == SCHEMA_VERSION:
cur.close()
return
if opts.verbose:
if version[0]:
print("Upgrading schema from version:", version)
else:
print("Initializing new database")
# If/when more fields get added or changed, the schema will have changed
# and this will have to handle the upgrade case. For now, just create the
# new tables.
tables = {}
name_groups = starlink_grpc.status_field_names()
type_groups = starlink_grpc.status_field_types()
tables["status"] = zip(name_groups, type_groups)
name_groups = starlink_grpc.history_stats_field_names()
type_groups = starlink_grpc.history_stats_field_types()
tables["ping_stats"] = zip(name_groups[0:5], type_groups[0:5])
tables["usage"] = ((name_groups[5], type_groups[5]),)
name_groups = starlink_grpc.history_bulk_field_names()
type_groups = starlink_grpc.history_bulk_field_types()
tables["history"] = ((name_groups[1], type_groups[1]), (["counter"], [int]))
def sql_type(type_class):
if issubclass(type_class, float):
return "REAL"
if issubclass(type_class, bool):
# advisory only, stores as int:
return "BOOLEAN"
if issubclass(type_class, int):
return "INTEGER"
if issubclass(type_class, str):
return "TEXT"
raise TypeError
for table, group_pairs in tables.items():
columns = ['"time" INTEGER NOT NULL', '"id" TEXT NOT NULL']
for name_group, type_group in group_pairs:
for name_item, type_item in zip(name_group, type_group):
name_item = dish_common.BRACKETS_RE.match(name_item).group(1)
if name_item != "id":
columns.append('"{0}" {1}'.format(name_item, sql_type(type_item)))
sql = 'CREATE TABLE "{0}" ({1}, PRIMARY KEY("time","id"))'.format(table, ", ".join(columns))
cur.execute(sql)
cur.execute("PRAGMA user_version={0}".format(SCHEMA_VERSION))
cur.close()
conn.commit()
def main():
opts = parse_args()
logging.basicConfig(format="%(levelname)s: %(message)s")
gstate = dish_common.GlobalState()
gstate.points = []
gstate.deferred_points = []
signal.signal(signal.SIGTERM, handle_sigterm)
gstate.sql_conn = sqlite3.connect(opts.database)
ensure_schema(opts, gstate.sql_conn)
rc = 0
try:
next_loop = time.monotonic()
while True:
rc = loop_body(opts, gstate)
if opts.loop_interval > 0.0:
now = time.monotonic()
next_loop = max(next_loop + opts.loop_interval, now)
time.sleep(next_loop - now)
else:
break
except sqlite3.Error as e:
logging.error("Database error: %s", e)
rc = 1
except Terminated:
pass
finally:
gstate.sql_conn.close()
gstate.shutdown()
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -349,8 +349,7 @@ def status_field_names():
Returns: Returns:
A tuple with 3 lists, with status data field names, alert detail field A tuple with 3 lists, with status data field names, alert detail field
names, and obstruction detail field names to their respective values, names, and obstruction detail field names, in that order.
in that order.
""" """
alert_names = [] alert_names = []
for field in spacex.api.device.dish_pb2.DishAlerts.DESCRIPTOR.fields: for field in spacex.api.device.dish_pb2.DishAlerts.DESCRIPTOR.fields:
@ -378,6 +377,38 @@ def status_field_names():
], alert_names ], alert_names
def status_field_types():
"""Return the field types of the status data.
Return the type classes for each field. For sequence types, the type of
element in the sequence is returned, not the type of the sequence.
Returns:
A tuple with 3 lists, with status data field types, alert detail field
types, and obstruction detail field types, in that order.
"""
return [
str, # id
str, # hardware_version
str, # software_version
str, # state
int, # uptime
float, # snr
float, # seconds_to_first_nonempty_slot
float, # pop_ping_drop_rate
float, # downlink_throughput_bps
float, # uplink_throughput_bps
float, # pop_ping_latency_ms
int, # alerts
float, # fraction_obstructed
bool, # currently_obstructed
float, # seconds_obstructed
], [
float, # wedges_fraction_obstructed[]
float, # valid_s
], [bool] * len(spacex.api.device.dish_pb2.DishAlerts.DESCRIPTOR.fields)
def get_status(context=None): def get_status(context=None):
"""Fetch status data and return it in grpc structure format. """Fetch status data and return it in grpc structure format.
@ -506,6 +537,30 @@ def history_bulk_field_names():
] ]
def history_bulk_field_types():
"""Return the field types of the bulk history data.
Return the type classes for each field. For sequence types, the type of
element in the sequence is returned, not the type of the sequence.
Returns:
A tuple with 2 lists, the first with general data types, the second
with bulk history data types.
"""
return [
int, # samples
int, # end_counter
], [
float, # pop_ping_drop_rate[]
float, # pop_ping_latency_ms[]
float, # downlink_throughput_bps[]
float, # uplink_throughput_bps[]
float, # snr[]
bool, # scheduled[]
bool, # obstructed[]
]
def history_ping_field_names(): def history_ping_field_names():
"""Deprecated. Use history_stats_field_names instead.""" """Deprecated. Use history_stats_field_names instead."""
return history_stats_field_names()[0:3] return history_stats_field_names()[0:3]
@ -561,6 +616,56 @@ def history_stats_field_names():
] ]
def history_stats_field_types():
"""Return the field types of the packet loss stats.
Return the type classes for each field. For sequence types, the type of
element in the sequence is returned, not the type of the sequence.
Returns:
A tuple with 6 lists, with general data types, ping drop stat types,
ping drop run length stat types, ping latency stat types, loaded ping
latency stat types, and bandwidth usage stat types, in that order.
Note:
Additional lists may be added to this tuple in the future with
additional data groups, so it not recommended for the caller to
assume exactly 6 elements.
"""
return [
int, # samples
int, # end_counter
], [
float, # total_ping_drop
int, # count_full_ping_drop
int, # count_obstructed
float, # total_obstructed_ping_drop
int, # count_full_obstructed_ping_drop
int, # count_unscheduled
float, # total_unscheduled_ping_drop
int, # count_full_unscheduled_ping_drop
], [
int, # init_run_fragment
int, # final_run_fragment
int, # run_seconds[]
int, # run_minutes[]
], [
float, # mean_all_ping_latency
float, # deciles_all_ping_latency[]
float, # mean_full_ping_latency
float, # deciles_full_ping_latency[]
float, # stdev_full_ping_latency
], [
int, # load_bucket_samples[]
float, # load_bucket_min_latency[]
float, # load_bucket_median_latency[]
float, # load_bucket_max_latency[]
], [
int, # download_usage
int, # upload_usage
]
def get_history(context=None): def get_history(context=None):
"""Fetch history data and return it in grpc structure format. """Fetch history data and return it in grpc structure format.