From c313134ba18b9fad4a37bbb1f34f7f73b25f9fcb Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Tue, 29 Dec 2020 19:21:00 -0800 Subject: [PATCH 1/4] Scripts for CSV, InfluxDB, MQTT get_status output Initial support for dumping status info in CSV format or to InfuxDB or MQTT servers. InfluxDB script is largely what Equinox- posted into issue #2, but with a few things renamed, and the state info being recorded as a string of the enum value name, rather than the (integer) enum value. All of these need header comments still and some need a bit more options handling, cleanup, and error handling. --- dishStatusCsv.py | 25 +++++++++++++++ dishStatusInflux.py | 75 +++++++++++++++++++++++++++++++++++++++++++++ dishStatusMqtt.py | 42 +++++++++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 dishStatusCsv.py create mode 100644 dishStatusInflux.py create mode 100644 dishStatusMqtt.py diff --git a/dishStatusCsv.py b/dishStatusCsv.py new file mode 100644 index 0000000..0a32ee8 --- /dev/null +++ b/dishStatusCsv.py @@ -0,0 +1,25 @@ +#!/usr/bin/python3 +import grpc + +import spacex.api.device.device_pb2 +import spacex.api.device.device_pb2_grpc + +with grpc.insecure_channel('192.168.100.1:9200') as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + +status = response.dish_get_status + +# More alerts may be added in future, so rather than list them individually, +# build a bit field based on field numbers of the DishAlerts message. +alert_bits = 0 +for alert in status.alerts.ListFields(): + alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + +print(",".join([status.device_info.id, status.device_info.hardware_version, status.device_info.software_version, + spacex.api.device.dish_pb2.DishState.Name(status.state)]) + "," + + ",".join(str(x) for x in [status.device_state.uptime_s, status.snr, status.seconds_to_first_nonempty_slot, + status.pop_ping_drop_rate, status.downlink_throughput_bps, status.uplink_throughput_bps, + status.pop_ping_latency_ms, alert_bits, status.obstruction_stats.fraction_obstructed, + status.obstruction_stats.currently_obstructed, status.obstruction_stats.last_24h_obstructed_s]) + "," + + ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed)) diff --git a/dishStatusInflux.py b/dishStatusInflux.py new file mode 100644 index 0000000..7afa3f9 --- /dev/null +++ b/dishStatusInflux.py @@ -0,0 +1,75 @@ +#!/usr/bin/python3 +from influxdb import InfluxDBClient +from influxdb import SeriesHelper + +import grpc + +import spacex.api.device.device_pb2 +import spacex.api.device.device_pb2_grpc + +import time + +class DeviceStatusSeries(SeriesHelper): + class Meta: + series_name = "spacex.starlink.user_terminal.status" + fields = [ + "hardware_version", + "software_version", + "state", + "alert_motors_stuck", + "alert_thermal_throttle", + "alert_thermal_shutdown", + "alert_unexpected_location", + "snr", + "seconds_to_first_nonempty_slot", + "pop_ping_drop_rate", + "downlink_throughput_bps", + "uplink_throughput_bps", + "pop_ping_latency_ms", + "currently_obstructed", + "fraction_obstructed"] + tags = ["id"] + +influxClient = InfluxDBClient("localhost", 8086, "script-user", "password", "dishstats", ssl=False, retries=1, timeout=15) + +try: + dishChannel = grpc.insecure_channel("192.168.100.1:9200") + + pending = 0 + count = 0 + while True: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(dishChannel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + status = response.dish_get_status + DeviceStatusSeries( + id=status.device_info.id, + hardware_version=status.device_info.hardware_version, + software_version=status.device_info.software_version, + state=status.state, + alert_motors_stuck=status.alerts.motors_stuck, + alert_thermal_throttle=status.alerts.thermal_throttle, + alert_thermal_shutdown=status.alerts.thermal_shutdown, + alert_unexpected_location=status.alerts.unexpected_location, + snr=status.snr, + seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot, + pop_ping_drop_rate=status.pop_ping_drop_rate, + downlink_throughput_bps=status.downlink_throughput_bps, + uplink_throughput_bps=status.uplink_throughput_bps, + pop_ping_latency_ms=status.pop_ping_latency_ms, + currently_obstructed=status.obstruction_stats.currently_obstructed, + fraction_obstructed=status.obstruction_stats.fraction_obstructed) + pending = pending + 1 + print("Samples: " + str(pending)) + if count > 5: + try: + DeviceStatusSeries.commit(influxClient) + print("Wrote " + str(pending)) + pending = 0 + except Exception as e: + print("Failed to write: " + str(e)) + count = 0 + count = count + 1 + time.sleep(5) +finally: + # Flush on error/exit + DeviceStatusSeries.commit(influxClient) diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py new file mode 100644 index 0000000..9b2c52c --- /dev/null +++ b/dishStatusMqtt.py @@ -0,0 +1,42 @@ +#!/usr/bin/python3 +import paho.mqtt.publish + +import grpc + +import spacex.api.device.device_pb2 +import spacex.api.device.device_pb2_grpc + +with grpc.insecure_channel('192.168.100.1:9200') as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + +status = response.dish_get_status + +# More alerts may be added in future, so rather than list them individually, +# build a bit field based on field numbers of the DishAlerts message. +alert_bits = 0 +for alert in status.alerts.ListFields(): + alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + +topicPrefix = "starlink/dish_status/" + status.device_info.id + "/" +msgs = [(topicPrefix + "hardware_version", status.device_info.hardware_version, 0, False), + (topicPrefix + "software_version", status.device_info.software_version, 0, False), + (topicPrefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), + (topicPrefix + "uptime", status.device_state.uptime_s, 0, False), + (topicPrefix + "snr", status.snr, 0, False), + (topicPrefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), + (topicPrefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), + (topicPrefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False), + (topicPrefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False), + (topicPrefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False), + (topicPrefix + "alerts", alert_bits, 0, False), + (topicPrefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False), + (topicPrefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False), + # While the field name for this one implies it covers 24 hours, the + # empirical evidence suggests it only covers 12 hours. It also resets + # on dish reboot, so may not cover that whole period. Rather than try + # to convey that complexity in the topic label, just be a bit vague: + (topicPrefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), + (topicPrefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)] + +paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id) From 61e9abf7d4c5cb32481e629491e2223b29cf190c Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Tue, 29 Dec 2020 20:45:30 -0800 Subject: [PATCH 2/4] Add header comments and update README Also, add a timestamp to the CSV output, similar to what is in the get_history equivalent. --- README.md | 4 ++++ dishStatusCsv.py | 14 +++++++++++++- dishStatusInflux.py | 8 ++++++++ dishStatusMqtt.py | 8 ++++++++ 4 files changed, 33 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index bb51ea9..e0442ba 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,10 @@ All the tools that pull data from the dish expect to be able to reach it at the The scripts that don't use `grpcurl` to pull data require the `grpcio` Python package at runtime and generating the necessary gRPC protocol code requires the `grpcio-tools` package. Information about how to install both can be found at https://grpc.io/docs/languages/python/quickstart/ +The scripts that use [MQTT](https://mqtt.org/) for output require the `paho-mqtt` Python package. Information about how to install that can be found at https://www.eclipse.org/paho/index.php?page=clients/python/index.php + +The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) for output require the `influxdb` Python package. Information about how to install that can be found at https://github.com/influxdata/influxdb-python. Note that this is the (slightly) older version of the InfluxDB client Python module, not the InfluxDB 2.0 client. It can still be made to work with an InfluxDB 2.0 server, but doing so requires using `influx v1` [CLI commands](https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/v1/) on the server to map the 1.x username, password, and database names to their 2.0 equivalents. + ## Usage For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example: diff --git a/dishStatusCsv.py b/dishStatusCsv.py index 0a32ee8..3b8e31b 100644 --- a/dishStatusCsv.py +++ b/dishStatusCsv.py @@ -1,13 +1,24 @@ #!/usr/bin/python3 +###################################################################### +# +# Output get_status info in CSV format. +# +# This script pulls the current status once and prints to stdout. +# +###################################################################### import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc +import datetime + with grpc.insecure_channel('192.168.100.1:9200') as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) +timestamp = datetime.datetime.utcnow() + status = response.dish_get_status # More alerts may be added in future, so rather than list them individually, @@ -16,7 +27,8 @@ alert_bits = 0 for alert in status.alerts.ListFields(): alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) -print(",".join([status.device_info.id, status.device_info.hardware_version, status.device_info.software_version, +print(",".join([timestamp.replace(microsecond=0).isoformat(), status.device_info.id, + status.device_info.hardware_version, status.device_info.software_version, spacex.api.device.dish_pb2.DishState.Name(status.state)]) + "," + ",".join(str(x) for x in [status.device_state.uptime_s, status.snr, status.seconds_to_first_nonempty_slot, status.pop_ping_drop_rate, status.downlink_throughput_bps, status.uplink_throughput_bps, diff --git a/dishStatusInflux.py b/dishStatusInflux.py index 7afa3f9..e39150c 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -1,4 +1,12 @@ #!/usr/bin/python3 +###################################################################### +# +# Write get_status info to an InfluxDB database. +# +# This script will periodically poll current status and write it to +# the specified InfluxDB database in a loop. +# +###################################################################### from influxdb import InfluxDBClient from influxdb import SeriesHelper diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py index 9b2c52c..a0993cc 100644 --- a/dishStatusMqtt.py +++ b/dishStatusMqtt.py @@ -1,4 +1,12 @@ #!/usr/bin/python3 +###################################################################### +# +# Publish get_status info to a MQTT broker. +# +# This script pulls the current status once and publishes it to the +# specified MQTT broker. +# +###################################################################### import paho.mqtt.publish import grpc From e1a4c473c8a5541f9f2f48f0789f6630f21194fa Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 30 Dec 2020 10:17:02 -0800 Subject: [PATCH 3/4] Handle errors on the gRPC connection Also, actually do the thing I said I was doing in the prior checkin by writing state as a string instead of integer. And a bit more cleanup. --- dishDumpStatus.py | 2 + dishStatusInflux.py | 89 +++++++++++++++++++++++++++++++-------------- 2 files changed, 64 insertions(+), 27 deletions(-) diff --git a/dishDumpStatus.py b/dishDumpStatus.py index befa1e5..118d978 100644 --- a/dishDumpStatus.py +++ b/dishDumpStatus.py @@ -10,6 +10,8 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc +# Note that if you remove the 'with' clause here, you need to separately +# call channel.close() when you're done with the gRPC connection. with grpc.insecure_channel('192.168.100.1:9200') as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) diff --git a/dishStatusInflux.py b/dishStatusInflux.py index e39150c..e8658bb 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -17,6 +17,9 @@ import spacex.api.device.device_pb2_grpc import time +fVerbose = True +sleepTime = 30 + class DeviceStatusSeries(SeriesHelper): class Meta: series_name = "spacex.starlink.user_terminal.status" @@ -38,46 +41,78 @@ class DeviceStatusSeries(SeriesHelper): "fraction_obstructed"] tags = ["id"] -influxClient = InfluxDBClient("localhost", 8086, "script-user", "password", "dishstats", ssl=False, retries=1, timeout=15) +influxClient = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15) try: - dishChannel = grpc.insecure_channel("192.168.100.1:9200") + dishChannel = None + lastId = None + fLastFailed = False pending = 0 count = 0 while True: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(dishChannel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) - status = response.dish_get_status - DeviceStatusSeries( - id=status.device_info.id, - hardware_version=status.device_info.hardware_version, - software_version=status.device_info.software_version, - state=status.state, - alert_motors_stuck=status.alerts.motors_stuck, - alert_thermal_throttle=status.alerts.thermal_throttle, - alert_thermal_shutdown=status.alerts.thermal_shutdown, - alert_unexpected_location=status.alerts.unexpected_location, - snr=status.snr, - seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot, - pop_ping_drop_rate=status.pop_ping_drop_rate, - downlink_throughput_bps=status.downlink_throughput_bps, - uplink_throughput_bps=status.uplink_throughput_bps, - pop_ping_latency_ms=status.pop_ping_latency_ms, - currently_obstructed=status.obstruction_stats.currently_obstructed, - fraction_obstructed=status.obstruction_stats.fraction_obstructed) + try: + if dishChannel is None: + dishChannel = grpc.insecure_channel("192.168.100.1:9200") + stub = spacex.api.device.device_pb2_grpc.DeviceStub(dishChannel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + status = response.dish_get_status + DeviceStatusSeries( + id=status.device_info.id, + hardware_version=status.device_info.hardware_version, + software_version=status.device_info.software_version, + state=spacex.api.device.dish_pb2.DishState.Name(status.state), + alert_motors_stuck=status.alerts.motors_stuck, + alert_thermal_throttle=status.alerts.thermal_throttle, + alert_thermal_shutdown=status.alerts.thermal_shutdown, + alert_unexpected_location=status.alerts.unexpected_location, + snr=status.snr, + seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot, + pop_ping_drop_rate=status.pop_ping_drop_rate, + downlink_throughput_bps=status.downlink_throughput_bps, + uplink_throughput_bps=status.uplink_throughput_bps, + pop_ping_latency_ms=status.pop_ping_latency_ms, + currently_obstructed=status.obstruction_stats.currently_obstructed, + fraction_obstructed=status.obstruction_stats.fraction_obstructed) + lastId = status.device_info.id + fLastFailed = False + except Exception as e: + if not dishChannel is None: + dishChannel.close() + dishChannel = None + if fLastFailed: + if not lastId is None: + DeviceStatusSeries(id=lastId, state="DISH_UNREACHABLE") + else: + # Retry once, because the connection may have been lost while + # we were sleeping + fLastFailed = True + continue pending = pending + 1 - print("Samples: " + str(pending)) + if fVerbose: + print("Samples: " + str(pending)) + count = count + 1 if count > 5: try: DeviceStatusSeries.commit(influxClient) - print("Wrote " + str(pending)) + if fVerbose: + print("Wrote " + str(pending)) pending = 0 except Exception as e: print("Failed to write: " + str(e)) count = 0 - count = count + 1 - time.sleep(5) + if sleepTime > 0: + time.sleep(sleepTime) + else: + break finally: # Flush on error/exit - DeviceStatusSeries.commit(influxClient) + try: + DeviceStatusSeries.commit(influxClient) + if fVerbose: + print("Wrote " + str(pending)) + except Exception as e: + print("Failed to write: " + str(e)) + influxClient.close() + if not dishChannel is None: + dishChannel.close() From f206a3ad91c5404889227dd2d1bb792518fe8da4 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 30 Dec 2020 11:47:03 -0800 Subject: [PATCH 4/4] Add a short blurb for the recently added scripts --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index e0442ba..e655931 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ python3 dishDumpStatus.py ``` and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app. +`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts. + ## To Be Done (Maybe) There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.