Merge pull request #3 from sparky8512/working

Add scripts for status output to CSV, InfluxDB, and MQTT
This commit is contained in:
sparky8512 2020-12-30 11:57:05 -08:00 committed by GitHub
commit 5c762e754b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 213 additions and 0 deletions

View file

@ -11,6 +11,10 @@ All the tools that pull data from the dish expect to be able to reach it at the
The scripts that don't use `grpcurl` to pull data require the `grpcio` Python package at runtime and generating the necessary gRPC protocol code requires the `grpcio-tools` package. Information about how to install both can be found at https://grpc.io/docs/languages/python/quickstart/
The scripts that use [MQTT](https://mqtt.org/) for output require the `paho-mqtt` Python package. Information about how to install that can be found at https://www.eclipse.org/paho/index.php?page=clients/python/index.php
The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) for output require the `influxdb` Python package. Information about how to install that can be found at https://github.com/influxdata/influxdb-python. Note that this is the (slightly) older version of the InfluxDB client Python module, not the InfluxDB 2.0 client. It can still be made to work with an InfluxDB 2.0 server, but doing so requires using `influx v1` [CLI commands](https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/v1/) on the server to map the 1.x username, password, and database names to their 2.0 equivalents.
## Usage
For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example:
@ -52,6 +56,8 @@ python3 dishDumpStatus.py
```
and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app.
`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts.
## To Be Done (Maybe)
There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.

View file

@ -10,6 +10,8 @@ import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
# Note that if you remove the 'with' clause here, you need to separately
# call channel.close() when you're done with the gRPC connection.
with grpc.insecure_channel('192.168.100.1:9200') as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))

37
dishStatusCsv.py Normal file
View file

@ -0,0 +1,37 @@
#!/usr/bin/python3
######################################################################
#
# Output get_status info in CSV format.
#
# This script pulls the current status once and prints to stdout.
#
######################################################################
import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
import datetime
with grpc.insecure_channel('192.168.100.1:9200') as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
timestamp = datetime.datetime.utcnow()
status = response.dish_get_status
# More alerts may be added in future, so rather than list them individually,
# build a bit field based on field numbers of the DishAlerts message.
alert_bits = 0
for alert in status.alerts.ListFields():
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1)
print(",".join([timestamp.replace(microsecond=0).isoformat(), status.device_info.id,
status.device_info.hardware_version, status.device_info.software_version,
spacex.api.device.dish_pb2.DishState.Name(status.state)]) + "," +
",".join(str(x) for x in [status.device_state.uptime_s, status.snr, status.seconds_to_first_nonempty_slot,
status.pop_ping_drop_rate, status.downlink_throughput_bps, status.uplink_throughput_bps,
status.pop_ping_latency_ms, alert_bits, status.obstruction_stats.fraction_obstructed,
status.obstruction_stats.currently_obstructed, status.obstruction_stats.last_24h_obstructed_s]) + "," +
",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed))

118
dishStatusInflux.py Normal file
View file

@ -0,0 +1,118 @@
#!/usr/bin/python3
######################################################################
#
# Write get_status info to an InfluxDB database.
#
# This script will periodically poll current status and write it to
# the specified InfluxDB database in a loop.
#
######################################################################
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
import time
fVerbose = True
sleepTime = 30
class DeviceStatusSeries(SeriesHelper):
class Meta:
series_name = "spacex.starlink.user_terminal.status"
fields = [
"hardware_version",
"software_version",
"state",
"alert_motors_stuck",
"alert_thermal_throttle",
"alert_thermal_shutdown",
"alert_unexpected_location",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"currently_obstructed",
"fraction_obstructed"]
tags = ["id"]
influxClient = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15)
try:
dishChannel = None
lastId = None
fLastFailed = False
pending = 0
count = 0
while True:
try:
if dishChannel is None:
dishChannel = grpc.insecure_channel("192.168.100.1:9200")
stub = spacex.api.device.device_pb2_grpc.DeviceStub(dishChannel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status
DeviceStatusSeries(
id=status.device_info.id,
hardware_version=status.device_info.hardware_version,
software_version=status.device_info.software_version,
state=spacex.api.device.dish_pb2.DishState.Name(status.state),
alert_motors_stuck=status.alerts.motors_stuck,
alert_thermal_throttle=status.alerts.thermal_throttle,
alert_thermal_shutdown=status.alerts.thermal_shutdown,
alert_unexpected_location=status.alerts.unexpected_location,
snr=status.snr,
seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot,
pop_ping_drop_rate=status.pop_ping_drop_rate,
downlink_throughput_bps=status.downlink_throughput_bps,
uplink_throughput_bps=status.uplink_throughput_bps,
pop_ping_latency_ms=status.pop_ping_latency_ms,
currently_obstructed=status.obstruction_stats.currently_obstructed,
fraction_obstructed=status.obstruction_stats.fraction_obstructed)
lastId = status.device_info.id
fLastFailed = False
except Exception as e:
if not dishChannel is None:
dishChannel.close()
dishChannel = None
if fLastFailed:
if not lastId is None:
DeviceStatusSeries(id=lastId, state="DISH_UNREACHABLE")
else:
# Retry once, because the connection may have been lost while
# we were sleeping
fLastFailed = True
continue
pending = pending + 1
if fVerbose:
print("Samples: " + str(pending))
count = count + 1
if count > 5:
try:
DeviceStatusSeries.commit(influxClient)
if fVerbose:
print("Wrote " + str(pending))
pending = 0
except Exception as e:
print("Failed to write: " + str(e))
count = 0
if sleepTime > 0:
time.sleep(sleepTime)
else:
break
finally:
# Flush on error/exit
try:
DeviceStatusSeries.commit(influxClient)
if fVerbose:
print("Wrote " + str(pending))
except Exception as e:
print("Failed to write: " + str(e))
influxClient.close()
if not dishChannel is None:
dishChannel.close()

50
dishStatusMqtt.py Normal file
View file

@ -0,0 +1,50 @@
#!/usr/bin/python3
######################################################################
#
# Publish get_status info to a MQTT broker.
#
# This script pulls the current status once and publishes it to the
# specified MQTT broker.
#
######################################################################
import paho.mqtt.publish
import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
with grpc.insecure_channel('192.168.100.1:9200') as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status
# More alerts may be added in future, so rather than list them individually,
# build a bit field based on field numbers of the DishAlerts message.
alert_bits = 0
for alert in status.alerts.ListFields():
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1)
topicPrefix = "starlink/dish_status/" + status.device_info.id + "/"
msgs = [(topicPrefix + "hardware_version", status.device_info.hardware_version, 0, False),
(topicPrefix + "software_version", status.device_info.software_version, 0, False),
(topicPrefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False),
(topicPrefix + "uptime", status.device_state.uptime_s, 0, False),
(topicPrefix + "snr", status.snr, 0, False),
(topicPrefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False),
(topicPrefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False),
(topicPrefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False),
(topicPrefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False),
(topicPrefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False),
(topicPrefix + "alerts", alert_bits, 0, False),
(topicPrefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False),
(topicPrefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False),
# While the field name for this one implies it covers 24 hours, the
# empirical evidence suggests it only covers 12 hours. It also resets
# on dish reboot, so may not cover that whole period. Rather than try
# to convey that complexity in the topic label, just be a bit vague:
(topicPrefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False),
(topicPrefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)]
paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id)