Add InfluxDB and MQTT history stats scripts

Unlike the status info scripts, these include support for setting host and database parameters via command line options. Still to be added is support for HTTPS/SSL.

Add a get_id function to the grpc parser module, so it can be used for tagging purposes.

Minor cleanups in some of the other scripts to make them consistent with the newly added scripts.
This commit is contained in:
sparky8512 2021-01-09 12:03:37 -08:00
parent 253d6e9250
commit f067f08952
8 changed files with 270 additions and 13 deletions

View file

@ -26,7 +26,7 @@ For more usage options, run:
python parseJsonHistory.py -h python parseJsonHistory.py -h
``` ```
When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it. When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script (or more likely the parsing logic it uses, which now resides in `starlink_json.py`) could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it.
The other scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code: The other scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code:
``` ```

124
dishHistoryInflux.py Normal file
View file

@ -0,0 +1,124 @@
#!/usr/bin/python3
######################################################################
#
# Write Starlink user terminal packet loss statistics to an InfluxDB
# database.
#
# This script examines the most recent samples from the history data,
# computes several different metrics related to packet loss, and
# writes those to the specified InfluxDB database.
#
######################################################################
import datetime
import sys
import getopt
from influxdb import InfluxDBClient
import starlink_grpc
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vD:P:R:U:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples.
samples_default = 3600
samples = samples_default
print_usage = False
verbose = False
run_lengths = False
host_default = "localhost"
database_default = "dishstats"
icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-n":
icargs["host"] = arg
elif opt == "-p":
icargs["port"] = int(arg)
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-D":
icargs["database"] = arg
elif opt == "-P":
icargs["password"] = arg
elif opt == "-R":
rp = arg
elif opt == "-U":
icargs["username"] = arg
if "password" in icargs and "username" not in icargs:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -a: Parse all valid samples")
print(" -h: Be helpful")
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server")
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose")
print(" -D <name>: Database name to use, default: " + database_default)
print(" -P <word>: Set password for authentication")
print(" -R <name>: Retention policy name to use")
print(" -U <name>: Set username for authentication")
sys.exit(1 if arg_error else 0)
dish_id = starlink_grpc.get_id()
if dish_id is None:
if verbose:
print("Unable to connect to Starlink user terminal")
sys.exit(1)
timestamp = datetime.datetime.utcnow()
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
if g_stats is None:
# verbose output already happened, so just bail.
sys.exit(1)
all_stats = g_stats.copy()
all_stats.update(pd_stats)
if run_lengths:
for k, v in rl_stats.items():
if k.startswith("run_"):
for i, subv in enumerate(v, start=1):
all_stats[k + "_" + str(i)] = subv
else:
all_stats[k] = v
points = [{
"measurement": "spacex.starlink.user_terminal.ping_stats",
"tags": {"id": dish_id},
"time": timestamp,
"fields": all_stats,
}]
influx_client = InfluxDBClient(**icargs)
try:
influx_client.write_points(points, retention_policy=rp)
finally:
influx_client.close()

113
dishHistoryMqtt.py Normal file
View file

@ -0,0 +1,113 @@
#!/usr/bin/python3
######################################################################
#
# Publish Starlink user terminal packet loss statistics to a MQTT
# broker.
#
# This script examines the most recent samples from the history data,
# computes several different metrics related to packet loss, and
# publishes those to the specified MQTT broker.
#
######################################################################
import sys
import getopt
import paho.mqtt.publish
import starlink_grpc
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vU:P:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples.
samples_default = 3600
samples = samples_default
print_usage = False
verbose = False
run_lengths = False
host_default = "localhost"
host = host_default
port = None
username = None
password = None
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-n":
host = arg
elif opt == "-p":
port = int(arg)
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-P":
password = arg
elif opt == "-U":
username = arg
if username is None and password is not None:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -a: Parse all valid samples")
print(" -h: Be helpful")
print(" -n <name>: Hostname of MQTT broker, default: " + host_default)
print(" -p <num>: Port number to use on MQTT broker")
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose")
print(" -P: Set password for username/password authentication")
print(" -U: Set username for authentication")
sys.exit(1 if arg_error else 0)
dish_id = starlink_grpc.get_id()
if dish_id is None:
if verbose:
print("Unable to connect to Starlink user terminal")
sys.exit(1)
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
if g_stats is None:
# verbose output already happened, so just bail.
sys.exit(1)
topic_prefix = "starlink/dish_ping_stats/" + dish_id + "/"
msgs = [(topic_prefix + k, v, 0, False) for k, v in g_stats.items()]
msgs.extend([(topic_prefix + k, v, 0, False) for k, v in pd_stats.items()])
if run_lengths:
for k, v in rl_stats.items():
if k.startswith("run_"):
msgs.append((topic_prefix + k, ",".join(str(x) for x in v), 0, False))
else:
msgs.append((topic_prefix + k, v, 0, False))
optargs = {}
if username is not None:
auth = {"username": username}
if password is not None:
auth["password"] = password
optargs["auth"] = auth
if port is not None:
optargs["port"] = port
paho.mqtt.publish.multiple(msgs, hostname=host, client_id=dish_id, **optargs)

View file

@ -29,7 +29,6 @@ samples_default = 3600
samples = samples_default samples = samples_default
print_usage = False print_usage = False
verbose = False verbose = False
parse_all = False
print_header = False print_header = False
run_lengths = False run_lengths = False
@ -39,7 +38,7 @@ if not arg_error:
else: else:
for opt, arg in opts: for opt, arg in opts:
if opt == "-a": if opt == "-a":
parse_all = True samples = -1
elif opt == "-h": elif opt == "-h":
print_usage = True print_usage = True
elif opt == "-r": elif opt == "-r":
@ -57,7 +56,7 @@ if print_usage or arg_error:
print(" -a: Parse all valid samples") print(" -a: Parse all valid samples")
print(" -h: Be helpful") print(" -h: Be helpful")
print(" -r: Include ping drop run length stats") print(" -r: Include ping drop run length stats")
print(" -s <num>: Parse <num> data samples, default: " + str(samples_default)) print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose") print(" -v: Be verbose")
print(" -H: print CSV header instead of parsing file") print(" -H: print CSV header instead of parsing file")
sys.exit(1 if arg_error else 0) sys.exit(1 if arg_error else 0)
@ -79,8 +78,7 @@ if print_header:
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(-1 if parse_all else samples, g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
verbose)
if g_stats is None: if g_stats is None:
# verbose output already happened, so just bail. # verbose output already happened, so just bail.

View file

@ -1,7 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Write get_status info to an InfluxDB database. # Write Starlink user terminal status info to an InfluxDB database.
# #
# This script will periodically poll current status and write it to # This script will periodically poll current status and write it to
# the specified InfluxDB database in a loop. # the specified InfluxDB database in a loop.

View file

@ -1,7 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Publish get_status info to a MQTT broker. # Publish Starlink user terminal status info to a MQTT broker.
# #
# This script pulls the current status once and publishes it to the # This script pulls the current status once and publishes it to the
# specified MQTT broker. # specified MQTT broker.

View file

@ -32,7 +32,6 @@ samples_default = 3600
samples = samples_default samples = samples_default
print_usage = False print_usage = False
verbose = False verbose = False
parse_all = False
print_header = False print_header = False
run_lengths = False run_lengths = False
@ -42,7 +41,7 @@ if not arg_error:
else: else:
for opt, arg in opts: for opt, arg in opts:
if opt == "-a": if opt == "-a":
parse_all = True samples = -1
elif opt == "-h": elif opt == "-h":
print_usage = True print_usage = True
elif opt == "-r": elif opt == "-r":
@ -61,7 +60,7 @@ if print_usage or arg_error:
print(" -a: Parse all valid samples") print(" -a: Parse all valid samples")
print(" -h: Be helpful") print(" -h: Be helpful")
print(" -r: Include ping drop run length stats") print(" -r: Include ping drop run length stats")
print(" -s <num>: Parse <num> data samples, default: " + str(samples_default)) print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose") print(" -v: Be verbose")
print(" -H: print CSV header instead of parsing file") print(" -H: print CSV header instead of parsing file")
sys.exit(1 if arg_error else 0) sys.exit(1 if arg_error else 0)
@ -84,8 +83,7 @@ if print_header:
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-",
-1 if parse_all else samples, samples, verbose)
verbose)
if g_stats is None: if g_stats is None:
# verbose output already happened, so just bail. # verbose output already happened, so just bail.

View file

@ -82,6 +82,30 @@ import grpc
import spacex.api.device.device_pb2 import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc import spacex.api.device.device_pb2_grpc
def get_status():
"""Fetch status data and return it in grpc structure format.
Raises:
grpc.RpcError: Communication or service error.
"""
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
def get_id():
"""Return the ID from the dish status information.
Returns:
A string identifying the Starlink user terminal reachable from the
local network, or None if no user terminal is currently reachable.
"""
try:
status = get_status()
return status.device_info.id
except grpc.RpcError:
return None
def history_ping_field_names(): def history_ping_field_names():
"""Return the field names of the packet loss stats. """Return the field names of the packet loss stats.