diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b909990 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,25 @@ +FROM python:3.9 +LABEL maintainer="neurocis " + +RUN true && \ +\ +# Install GRPCurl +wget https://github.com/fullstorydev/grpcurl/releases/download/v1.8.0/grpcurl_1.8.0_linux_x86_64.tar.gz && \ +tar -xvf grpcurl_1.8.0_linux_x86_64.tar.gz grpcurl && \ +chown root:root grpcurl && \ +chmod 755 grpcurl && \ +mv grpcurl /usr/bin/. && \ +rm grpcurl_1.8.0_linux_x86_64.tar.gz && \ +\ +# Install python prerequisites +pip3 install grpcio grpcio-tools paho-mqtt influxdb + +ADD . /app +WORKDIR /app + +# run crond as main process of container +ENTRYPOINT ["/bin/sh", "/app/entrypoint.sh"] +CMD ["dishStatusInflux.py"] + +# docker run -d --name='starlink-grpc-tools' -e INFLUXDB_HOST=192.168.1.34 -e INFLUXDB_PORT=8086 -e INFLUXDB_DB=starlink +# --net='br0' --ip='192.168.1.39' neurocis/starlink-grpc-tools dishStatusInflux.py diff --git a/GrafanaDashboard - Starlink Statistics.json b/GrafanaDashboard - Starlink Statistics.json new file mode 100644 index 0000000..94e6da3 --- /dev/null +++ b/GrafanaDashboard - Starlink Statistics.json @@ -0,0 +1,818 @@ +{ + "__inputs": [ + { + "name": "VAR_DS_INFLUXDB", + "type": "constant", + "label": "InfluxDB DataSource", + "value": "InfluxDB-starlinkstats", + "description": "" + }, + { + "name": "VAR_TBL_STATS", + "type": "constant", + "label": "Table name for Statistics", + "value": "spacex.starlink.user_terminal.status", + "description": "" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "7.3.6" + }, + { + "type": "panel", + "id": "graph", + "name": "Graph", + "version": "" + }, + { + "type": "datasource", + "id": "influxdb", + "name": "InfluxDB", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "table", + "name": "Table", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1610413551748, + "links": [], + "panels": [ + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_INFLUXDB", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 11, + "w": 12, + "x": 0, + "y": 0 + }, + "hiddenSeries": false, + "id": 4, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "hideZero": false, + "max": true, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [], + "measurement": "/^$TBL_STATS$/", + "orderByTime": "ASC", + "policy": "default", + "queryType": "randomWalk", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "downlink_throughput_bps" + ], + "type": "field" + }, + { + "params": [ + "bps Down" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "uplink_throughput_bps" + ], + "type": "field" + }, + { + "params": [ + "bps Up" + ], + "type": "alias" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Actual Throughput", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "$$hashKey": "object:1099", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "$$hashKey": "object:1100", + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$DS_INFLUXDB", + "description": "", + "fieldConfig": { + "defaults": { + "custom": {} + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 11, + "w": 12, + "x": 12, + "y": 0 + }, + "hiddenSeries": false, + "id": 2, + "legend": { + "alignAsTable": true, + "avg": true, + "current": true, + "max": true, + "min": true, + "show": true, + "total": false, + "values": true + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.3.6", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "groupBy": [], + "measurement": "/^$TBL_STATS$/", + "orderByTime": "ASC", + "policy": "default", + "queryType": "randomWalk", + "refId": "A", + "resultFormat": "time_series", + "select": [ + [ + { + "params": [ + "pop_ping_latency_ms" + ], + "type": "field" + }, + { + "params": [ + "Ping Latency" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "pop_ping_drop_rate" + ], + "type": "field" + }, + { + "params": [ + "Drop Rate" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "fraction_obstructed" + ], + "type": "field" + }, + { + "params": [ + "*100" + ], + "type": "math" + }, + { + "params": [ + "Percent Obstructed" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "snr" + ], + "type": "field" + }, + { + "params": [ + "*10" + ], + "type": "math" + }, + { + "params": [ + "SNR" + ], + "type": "alias" + } + ] + ], + "tags": [] + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Ping Latency, Drop Rate, Percent Obstructed & SNR", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "cacheTimeout": null, + "datasource": "$DS_INFLUXDB", + "description": "", + "fieldConfig": { + "defaults": { + "custom": { + "align": null, + "filterable": false + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Obstructed" + }, + "properties": [ + { + "id": "custom.width", + "value": 105 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Wrong Location" + }, + "properties": [ + { + "id": "custom.width", + "value": 114 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Thermal Throttle" + }, + "properties": [ + { + "id": "custom.width", + "value": 121 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Thermal Shutdown" + }, + "properties": [ + { + "id": "custom.width", + "value": 136 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Motors Stuck" + }, + "properties": [ + { + "id": "custom.width", + "value": 116 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Time" + }, + "properties": [ + { + "id": "custom.width", + "value": 143 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "State" + }, + "properties": [ + { + "id": "custom.width", + "value": 118 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Bad Location" + }, + "properties": [ + { + "id": "custom.width", + "value": 122 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Temp Throttle" + }, + "properties": [ + { + "id": "custom.width", + "value": 118 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Temp Shutdown" + }, + "properties": [ + { + "id": "custom.width", + "value": 134 + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Software Version" + }, + "properties": [ + { + "id": "custom.width", + "value": 369 + } + ] + } + ] + }, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 11 + }, + "id": 6, + "interval": null, + "links": [], + "options": { + "showHeader": true, + "sortBy": [ + { + "desc": true, + "displayName": "Time (last)" + } + ] + }, + "pluginVersion": "7.3.6", + "targets": [ + { + "groupBy": [], + "hide": false, + "measurement": "/^$TBL_STATS$/", + "orderByTime": "ASC", + "policy": "default", + "query": "SELECT \"currently_obstructed\" AS \"Obstructed\", \"alert_unexpected_location\" AS \"Wrong Location\", \"alert_thermal_throttle\" AS \"Thermal Throttle\", \"alert_thermal_shutdown\" AS \"Thermal Shutdown\", \"alert_motors_stuck\" AS \"Motors Stuck\", \"state\" AS \"State\" FROM \"spacex.starlink.user_terminal.status\" WHERE $timeFilter", + "queryType": "randomWalk", + "rawQuery": false, + "refId": "A", + "resultFormat": "table", + "select": [ + [ + { + "params": [ + "state" + ], + "type": "field" + }, + { + "params": [ + "State" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "currently_obstructed" + ], + "type": "field" + }, + { + "params": [ + "Obstructed" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "alert_unexpected_location" + ], + "type": "field" + }, + { + "params": [ + "Bad Location" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "alert_thermal_throttle" + ], + "type": "field" + }, + { + "params": [ + "Temp Throttled" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "alert_thermal_shutdown" + ], + "type": "field" + }, + { + "params": [ + "Temp Shutdown" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "alert_motors_stuck" + ], + "type": "field" + }, + { + "params": [ + "Motors Stuck" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "software_version" + ], + "type": "field" + }, + { + "params": [ + "Software Version" + ], + "type": "alias" + } + ], + [ + { + "params": [ + "hardware_version" + ], + "type": "field" + }, + { + "params": [ + "Hardware Version" + ], + "type": "alias" + } + ] + ], + "tags": [] + } + ], + "timeFrom": null, + "timeShift": null, + "title": "Alerts & Versions", + "transformations": [ + { + "id": "groupBy", + "options": { + "fields": { + "Bad Location": { + "aggregations": [], + "operation": "groupby" + }, + "Hardware Version": { + "aggregations": [], + "operation": "groupby" + }, + "Motors Stuck": { + "aggregations": [], + "operation": "groupby" + }, + "Obstructed": { + "aggregations": [], + "operation": "groupby" + }, + "Software Version": { + "aggregations": [], + "operation": "groupby" + }, + "State": { + "aggregations": [], + "operation": "groupby" + }, + "Temp Shutdown": { + "aggregations": [], + "operation": "groupby" + }, + "Temp Throttle": { + "aggregations": [], + "operation": "groupby" + }, + "Temp Throttled": { + "aggregations": [], + "operation": "groupby" + }, + "Thermal Shutdown": { + "aggregations": [], + "operation": "groupby" + }, + "Thermal Throttle": { + "aggregations": [], + "operation": "groupby" + }, + "Time": { + "aggregations": [ + "last" + ], + "operation": "aggregate" + }, + "Wrong Location": { + "aggregations": [], + "operation": "groupby" + } + } + } + } + ], + "type": "table" + } + ], + "refresh": false, + "schemaVersion": 26, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "current": { + "value": "${VAR_DS_INFLUXDB}", + "text": "${VAR_DS_INFLUXDB}", + "selected": false + }, + "error": null, + "hide": 2, + "label": "InfluxDB DataSource", + "name": "DS_INFLUXDB", + "options": [ + { + "value": "${VAR_DS_INFLUXDB}", + "text": "${VAR_DS_INFLUXDB}", + "selected": false + } + ], + "query": "${VAR_DS_INFLUXDB}", + "skipUrlSync": false, + "type": "constant" + }, + { + "current": { + "value": "${VAR_TBL_STATS}", + "text": "${VAR_TBL_STATS}", + "selected": false + }, + "error": null, + "hide": 2, + "label": "Table name for Statistics", + "name": "TBL_STATS", + "options": [ + { + "value": "${VAR_TBL_STATS}", + "text": "${VAR_TBL_STATS}", + "selected": false + } + ], + "query": "${VAR_TBL_STATS}", + "skipUrlSync": false, + "type": "constant" + } + ] + }, + "time": { + "from": "now-24h", + "to": "now" + }, + "timepicker": { + "refresh_intervals": [ + "5s", + "10s", + "30s", + "1m", + "5m", + "15m", + "30m", + "1h", + "2h", + "1d" + ] + }, + "timezone": "", + "title": "Starlink Statistics", + "uid": "ymkHwLaMz", + "version": 36 +} \ No newline at end of file diff --git a/README.md b/README.md index e655931..6cee089 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ For more information on what Starlink is, see [starlink.com](https://www.starlin `parseJsonHistory.py` operates on a JSON format data representation of the protocol buffer messages, such as that output by [gRPCurl](https://github.com/fullstorydev/grpcurl). The command lines below assume `grpcurl` is installed in the runtime PATH. If that's not the case, just substitute in the full path to the command. -All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile) and [iOS app](https://apps.apple.com/us/app/starlink/id1537177988). When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. +All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. The scripts that don't use `grpcurl` to pull data require the `grpcio` Python package at runtime and generating the necessary gRPC protocol code requires the `grpcio-tools` package. Information about how to install both can be found at https://grpc.io/docs/languages/python/quickstart/ @@ -17,7 +17,11 @@ The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) f ## Usage -For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example: +Of the 3 groups below, the grpc scripts are really the only ones being actively developed. The others are mostly by way of example of what could be done with the underlying data. + +### The JSON parser script + +`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: ``` grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py ``` @@ -26,9 +30,13 @@ For more usage options, run: python parseJsonHistory.py -h ``` -When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it. +When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script (or more likely the parsing logic it uses, which now resides in `starlink_json.py`) could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it. -The other scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code: +The one bit of functionality this script has over the grpc scripts is that it supports capturing the grpcurl output to a file and reading from that, which may be useful if you're collecting data in one place but analyzing it in another. Otherwise, it's probably better to use `dishHistoryStats.py`, described below. + +### The grpc scripts + +This set of scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code: ``` grpcurl -plaintext -protoset-out dish.protoset 192.168.100.1:9200 describe SpaceX.API.Device.Device mkdir src @@ -41,29 +49,66 @@ python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto ``` -Then move the resulting files to where the Python scripts can find them, such as in the same directory as the scripts themselves. +Then move the resulting files to where the Python scripts can find them in its import path, such as in the same directory as the scripts themselves. -Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options. +Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options. For example: +``` +python3 parseHistoryStats.py +``` -To collect and record summary stats every hour, you can put something like the following in your user crontab: +By default, `parseHistoryStats.py` (and `parseJsonHistory.py`) will output the stats in CSV format. You can use the `-v` option to instead output in a (slightly) more human-readable format. + +To collect and record summary stats at the top of every hour, you could put something like the following in your user crontab (assuming you have moved the scripts to ~/bin and made them executable): ``` 00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dishHistoryStats.py -H >~/dishStats.csv; ~/bin/dishHistoryStats.py >>~/dishStats.csv ``` -`dishDumpStatus.py` is even simpler. Just run it as: +`dishHistoryInflux.py` and `dishHistoryMqtt.py` are similar, but they send their output to an InfluxDB server and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options. + +`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the status data instead of history data, to various data backends. The information they pull is mostly what appears related to the dish in the Debug Data section of the Starlink app. As with the corresponding history scripts, run them with `-h` command line option for usage details. + +By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this: +``` +python3 dishStatusInflux.py -t 30 [... probably other args to specifiy server options ...] +``` + +Some of the scripts (currently only the InfluxDB ones) also support specifying options through environment variables. See details in the scripts for the environment variables that map to options. + +### Other scripts + +`dishDumpStatus.py` is a simple example of how to use the grpc modules (the ones generated by protoc, not `starlink_grpc.py`) directly. Just run it as: ``` python3 dishDumpStatus.py ``` -and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app. +and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. -`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts. +Possibly more simple examples to come, as the other scripts have started getting a bit complicated. ## To Be Done (Maybe) There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app. +Proper Python packaging, since some of the scripts are no longer self-contained. + ## Other Tidbits The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses an HTTP/1.1 wrapper, whereas the one on port 9200 uses HTTP/2.0, which is what most gRPC tools expect. The Starlink router also exposes a gRPC service, on ports 9000 (HTTP/2.0) and 9001 (HTTP/1.1). + +## Docker for InfluxDB ( & MQTT under development ) + +Initialization of the container can be performed with the following command: + +``` +docker run -d --name='starlink-grpc-tools' -e INFLUXDB_HOST={InfluxDB Hostname} \ + -e INFLUXDB_PORT={Port, 8086 usually} \ + -e INFLUXDB_USER={Optional, InfluxDB Username} \ + -e INFLUXDB_PWD={Optional, InfluxDB Password} \ + -e INFLUXDB_DB={Pre-created DB name, starlinkstats works well} \ + neurocis/starlink-grpc-tools dishStatusInflux.py -v +``` + +`dishStatusInflux.py -v` is optional and will run same but not -verbose, or you can replace it with one of the other scripts if you wish to run that instead. There is also an `GrafanaDashboard - Starlink Statistics.json` which can be imported to get some charts like: + +![image](https://user-images.githubusercontent.com/945191/104257179-ae570000-5431-11eb-986e-3fedd04bfcfb.png) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py new file mode 100644 index 0000000..07e43f7 --- /dev/null +++ b/dishHistoryInflux.py @@ -0,0 +1,243 @@ +#!/usr/bin/python3 +###################################################################### +# +# Write Starlink user terminal packet loss statistics to an InfluxDB +# database. +# +# This script examines the most recent samples from the history data, +# computes several different metrics related to packet loss, and +# writes those to the specified InfluxDB database. +# +###################################################################### + +import getopt +import datetime +import logging +import os +import sys +import time +import warnings + +from influxdb import InfluxDBClient + +import starlink_grpc + + +def main(): + arg_error = False + + try: + opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:D:IP:R:SU:") + except getopt.GetoptError as err: + print(str(err)) + arg_error = True + + # Default to 1 hour worth of data samples. + samples_default = 3600 + samples = None + print_usage = False + verbose = False + default_loop_time = 0 + loop_time = default_loop_time + run_lengths = False + host_default = "localhost" + database_default = "starlinkstats" + icargs = {"host": host_default, "timeout": 5, "database": database_default} + rp = None + flush_limit = 6 + + # For each of these check they are both set and not empty string + influxdb_host = os.environ.get("INFLUXDB_HOST") + if influxdb_host: + icargs["host"] = influxdb_host + influxdb_port = os.environ.get("INFLUXDB_PORT") + if influxdb_port: + icargs["port"] = int(influxdb_port) + influxdb_user = os.environ.get("INFLUXDB_USER") + if influxdb_user: + icargs["username"] = influxdb_user + influxdb_pwd = os.environ.get("INFLUXDB_PWD") + if influxdb_pwd: + icargs["password"] = influxdb_pwd + influxdb_db = os.environ.get("INFLUXDB_DB") + if influxdb_db: + icargs["database"] = influxdb_db + influxdb_rp = os.environ.get("INFLUXDB_RP") + if influxdb_rp: + rp = influxdb_rp + influxdb_ssl = os.environ.get("INFLUXDB_SSL") + if influxdb_ssl: + icargs["ssl"] = True + if influxdb_ssl.lower() == "secure": + icargs["verify_ssl"] = True + elif influxdb_ssl.lower() == "insecure": + icargs["verify_ssl"] = False + else: + icargs["verify_ssl"] = influxdb_ssl + + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-a": + samples = -1 + elif opt == "-h": + print_usage = True + elif opt == "-n": + icargs["host"] = arg + elif opt == "-p": + icargs["port"] = int(arg) + elif opt == "-r": + run_lengths = True + elif opt == "-s": + samples = int(arg) + elif opt == "-t": + loop_time = float(arg) + elif opt == "-v": + verbose = True + elif opt == "-C": + icargs["ssl"] = True + icargs["verify_ssl"] = arg + elif opt == "-D": + icargs["database"] = arg + elif opt == "-I": + icargs["ssl"] = True + icargs["verify_ssl"] = False + elif opt == "-P": + icargs["password"] = arg + elif opt == "-R": + rp = arg + elif opt == "-S": + icargs["ssl"] = True + icargs["verify_ssl"] = True + elif opt == "-U": + icargs["username"] = arg + + if "password" in icargs and "username" not in icargs: + print("Password authentication requires username to be set") + arg_error = True + + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -a: Parse all valid samples") + print(" -h: Be helpful") + print(" -n : Hostname of InfluxDB server, default: " + host_default) + print(" -p : Port number to use on InfluxDB server") + print(" -r: Include ping drop run length stats") + print(" -s : Number of data samples to parse, default: loop interval,") + print(" if set, else " + str(samples_default)) + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify server") + print(" -D : Database name to use, default: " + database_default) + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P : Set password for authentication") + print(" -R : Retention policy name to use") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U : Set username for authentication") + sys.exit(1 if arg_error else 0) + + if samples is None: + samples = int(loop_time) if loop_time > 0 else samples_default + + logging.basicConfig(format="%(levelname)s: %(message)s") + + class GlobalState: + pass + + gstate = GlobalState() + gstate.dish_id = None + gstate.points = [] + + def conn_error(msg, *args): + # Connection errors that happen in an interval loop are not critical + # failures, but are interesting enough to print in non-verbose mode. + if loop_time > 0: + print(msg % args) + else: + logging.error(msg, *args) + + def flush_points(client): + try: + client.write_points(gstate.points, retention_policy=rp) + if verbose: + print("Data points written: " + str(len(gstate.points))) + gstate.points.clear() + except Exception as e: + conn_error("Failed writing to InfluxDB database: %s", str(e)) + return 1 + + return 0 + + def loop_body(client): + if gstate.dish_id is None: + try: + gstate.dish_id = starlink_grpc.get_id() + if verbose: + print("Using dish ID: " + gstate.dish_id) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting dish ID: %s", str(e)) + return 1 + + timestamp = datetime.datetime.utcnow() + + try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting ping stats: %s", str(e)) + return 1 + + all_stats = g_stats.copy() + all_stats.update(pd_stats) + if run_lengths: + for k, v in rl_stats.items(): + if k.startswith("run_"): + for i, subv in enumerate(v, start=1): + all_stats[k + "_" + str(i)] = subv + else: + all_stats[k] = v + + gstate.points.append({ + "measurement": "spacex.starlink.user_terminal.ping_stats", + "tags": { + "id": gstate.dish_id + }, + "time": timestamp, + "fields": all_stats, + }) + if verbose: + print("Data points queued: " + str(len(gstate.points))) + + if len(gstate.points) >= flush_limit: + return flush_points(client) + + return 0 + + if "verify_ssl" in icargs and not icargs["verify_ssl"]: + # user has explicitly said be insecure, so don't warn about it + warnings.filterwarnings("ignore", message="Unverified HTTPS request") + + influx_client = InfluxDBClient(**icargs) + try: + next_loop = time.monotonic() + while True: + rc = loop_body(influx_client) + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + finally: + if gstate.points: + rc = flush_points(influx_client) + influx_client.close() + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/dishHistoryMqtt.py b/dishHistoryMqtt.py new file mode 100644 index 0000000..a4349d4 --- /dev/null +++ b/dishHistoryMqtt.py @@ -0,0 +1,185 @@ +#!/usr/bin/python3 +###################################################################### +# +# Publish Starlink user terminal packet loss statistics to a MQTT +# broker. +# +# This script examines the most recent samples from the history data, +# computes several different metrics related to packet loss, and +# publishes those to the specified MQTT broker. +# +###################################################################### + +import getopt +import logging +import sys +import time + +try: + import ssl + ssl_ok = True +except ImportError: + ssl_ok = False + +import paho.mqtt.publish + +import starlink_grpc + + +def main(): + arg_error = False + + try: + opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:ISP:U:") + except getopt.GetoptError as err: + print(str(err)) + arg_error = True + + # Default to 1 hour worth of data samples. + samples_default = 3600 + samples = None + print_usage = False + verbose = False + default_loop_time = 0 + loop_time = default_loop_time + run_lengths = False + host_default = "localhost" + mqargs = {"hostname": host_default} + username = None + password = None + + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-a": + samples = -1 + elif opt == "-h": + print_usage = True + elif opt == "-n": + mqargs["hostname"] = arg + elif opt == "-p": + mqargs["port"] = int(arg) + elif opt == "-r": + run_lengths = True + elif opt == "-s": + samples = int(arg) + elif opt == "-t": + loop_time = float(arg) + elif opt == "-v": + verbose = True + elif opt == "-C": + mqargs["tls"] = {"ca_certs": arg} + elif opt == "-I": + if ssl_ok: + mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE} + else: + print("No SSL support found") + sys.exit(1) + elif opt == "-P": + password = arg + elif opt == "-S": + mqargs["tls"] = {} + elif opt == "-U": + username = arg + + if username is None and password is not None: + print("Password authentication requires username to be set") + arg_error = True + + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -a: Parse all valid samples") + print(" -h: Be helpful") + print(" -n : Hostname of MQTT broker, default: " + host_default) + print(" -p : Port number to use on MQTT broker") + print(" -r: Include ping drop run length stats") + print(" -s : Number of data samples to parse, default: loop interval,") + print(" if set, else " + str(samples_default)) + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify broker") + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P: Set password for username/password authentication") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U: Set username for authentication") + sys.exit(1 if arg_error else 0) + + if samples is None: + samples = int(loop_time) if loop_time > 0 else samples_default + + if username is not None: + mqargs["auth"] = {"username": username} + if password is not None: + mqargs["auth"]["password"] = password + + logging.basicConfig(format="%(levelname)s: %(message)s") + + class GlobalState: + pass + + gstate = GlobalState() + gstate.dish_id = None + + def conn_error(msg, *args): + # Connection errors that happen in an interval loop are not critical + # failures, but are interesting enough to print in non-verbose mode. + if loop_time > 0: + print(msg % args) + else: + logging.error(msg, *args) + + def loop_body(): + if gstate.dish_id is None: + try: + gstate.dish_id = starlink_grpc.get_id() + if verbose: + print("Using dish ID: " + gstate.dish_id) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting dish ID: %s", str(e)) + return 1 + + try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting ping stats: %s", str(e)) + return 1 + + topic_prefix = "starlink/dish_ping_stats/" + gstate.dish_id + "/" + msgs = [(topic_prefix + k, v, 0, False) for k, v in g_stats.items()] + msgs.extend([(topic_prefix + k, v, 0, False) for k, v in pd_stats.items()]) + if run_lengths: + for k, v in rl_stats.items(): + if k.startswith("run_"): + msgs.append((topic_prefix + k, ",".join(str(x) for x in v), 0, False)) + else: + msgs.append((topic_prefix + k, v, 0, False)) + + try: + paho.mqtt.publish.multiple(msgs, client_id=gstate.dish_id, **mqargs) + if verbose: + print("Successfully published to MQTT broker") + except Exception as e: + conn_error("Failed publishing to MQTT broker: %s", str(e)) + return 1 + + return 0 + + next_loop = time.monotonic() + while True: + rc = loop_body() + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/dishHistoryStats.py b/dishHistoryStats.py index b9aeb58..45a4ee1 100644 --- a/dishHistoryStats.py +++ b/dishHistoryStats.py @@ -11,102 +11,141 @@ ###################################################################### import datetime -import sys import getopt +import logging +import sys +import time import starlink_grpc -arg_error = False -try: - opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") -except getopt.GetoptError as err: - print(str(err)) - arg_error = True +def main(): + arg_error = False -# Default to 1 hour worth of data samples. -samples_default = 3600 -samples = samples_default -print_usage = False -verbose = False -parse_all = False -print_header = False -run_lengths = False - -if not arg_error: - if len(args) > 0: + try: + opts, args = getopt.getopt(sys.argv[1:], "ahrs:t:vH") + except getopt.GetoptError as err: + print(str(err)) arg_error = True - else: - for opt, arg in opts: - if opt == "-a": - parse_all = True - elif opt == "-h": - print_usage = True - elif opt == "-r": - run_lengths = True - elif opt == "-s": - samples = int(arg) - elif opt == "-v": - verbose = True - elif opt == "-H": - print_header = True -if print_usage or arg_error: - print("Usage: " + sys.argv[0] + " [options...]") - print("Options:") - print(" -a: Parse all valid samples") - print(" -h: Be helpful") - print(" -r: Include ping drop run length stats") - print(" -s : Parse data samples, default: " + str(samples_default)) - print(" -v: Be verbose") - print(" -H: print CSV header instead of parsing file") - sys.exit(1 if arg_error else 0) + # Default to 1 hour worth of data samples. + samples_default = 3600 + samples = None + print_usage = False + verbose = False + default_loop_time = 0 + loop_time = default_loop_time + run_lengths = False + print_header = False -fields, rl_fields = starlink_grpc.history_ping_field_names() + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-a": + samples = -1 + elif opt == "-h": + print_usage = True + elif opt == "-r": + run_lengths = True + elif opt == "-s": + samples = int(arg) + elif opt == "-t": + loop_time = float(arg) + elif opt == "-v": + verbose = True + elif opt == "-H": + print_header = True -if print_header: - header = ["datetimestamp_utc"] - header.extend(fields) - if run_lengths: - for field in rl_fields: - if field.startswith("run_"): - header.extend(field + "_" + str(x) for x in range(1, 61)) - else: - header.append(field) - print(",".join(header)) - sys.exit(0) + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -a: Parse all valid samples") + print(" -h: Be helpful") + print(" -r: Include ping drop run length stats") + print(" -s : Number of data samples to parse, default: loop interval,") + print(" if set, else " + str(samples_default)) + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -v: Be verbose") + print(" -H: print CSV header instead of parsing history data") + sys.exit(1 if arg_error else 0) -timestamp = datetime.datetime.utcnow() + if samples is None: + samples = int(loop_time) if loop_time > 0 else samples_default -stats, rl_stats = starlink_grpc.history_ping_stats(-1 if parse_all else samples, - verbose) + logging.basicConfig(format="%(levelname)s: %(message)s") -if stats is None or rl_stats is None: - # verbose output already happened, so just bail. - sys.exit(1) + g_fields, pd_fields, rl_fields = starlink_grpc.history_ping_field_names() -if verbose: - print("Parsed samples: " + str(stats["samples"])) - print("Total ping drop: " + str(stats["total_ping_drop"])) - print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) - print("Obstructed: " + str(stats["count_obstructed"])) - print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) - print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) - print("Unscheduled: " + str(stats["count_unscheduled"])) - print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) - print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) - if run_lengths: - print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) - print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) - print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"])) - print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) -else: - csv_data = [timestamp.replace(microsecond=0).isoformat()] - csv_data.extend(str(stats[field]) for field in fields) - if run_lengths: - for field in rl_fields: - if field.startswith("run_"): - csv_data.extend(str(substat) for substat in rl_stats[field]) - else: - csv_data.append(str(rl_stats[field])) - print(",".join(csv_data)) + if print_header: + header = ["datetimestamp_utc"] + header.extend(g_fields) + header.extend(pd_fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + header.extend(field + "_" + str(x) for x in range(1, 61)) + else: + header.append(field) + print(",".join(header)) + sys.exit(0) + + def loop_body(): + timestamp = datetime.datetime.utcnow() + + try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) + except starlink_grpc.GrpcError as e: + logging.error("Failure getting ping stats: %s", str(e)) + return 1 + + if verbose: + print("Parsed samples: " + str(g_stats["samples"])) + print("Total ping drop: " + str(pd_stats["total_ping_drop"])) + print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"])) + print("Obstructed: " + str(pd_stats["count_obstructed"])) + print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"])) + print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"])) + print("Unscheduled: " + str(pd_stats["count_unscheduled"])) + print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"])) + print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"])) + if run_lengths: + print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) + print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) + print("Per-second drop runs: " + + ", ".join(str(x) for x in rl_stats["run_seconds"])) + print("Per-minute drop runs: " + + ", ".join(str(x) for x in rl_stats["run_minutes"])) + if loop_time > 0: + print() + else: + csv_data = [timestamp.replace(microsecond=0).isoformat()] + csv_data.extend(str(g_stats[field]) for field in g_fields) + csv_data.extend(str(pd_stats[field]) for field in pd_fields) + if run_lengths: + for field in rl_fields: + if field.startswith("run_"): + csv_data.extend(str(substat) for substat in rl_stats[field]) + else: + csv_data.append(str(rl_stats[field])) + print(",".join(csv_data)) + + return 0 + + next_loop = time.monotonic() + while True: + rc = loop_body() + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/dishStatusCsv.py b/dishStatusCsv.py index 98269e8..55443b5 100644 --- a/dishStatusCsv.py +++ b/dishStatusCsv.py @@ -1,51 +1,147 @@ #!/usr/bin/python3 ###################################################################### # -# Output get_status info in CSV format. +# Output Starlink user terminal status info in CSV format. # -# This script pulls the current status once and prints to stdout. +# This script pulls the current status and prints to stdout either +# once or in a periodic loop. # ###################################################################### + import datetime +import getopt +import logging +import sys +import time import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) -timestamp = datetime.datetime.utcnow() +def main(): + arg_error = False -status = response.dish_get_status + try: + opts, args = getopt.getopt(sys.argv[1:], "ht:H") + except getopt.GetoptError as err: + print(str(err)) + arg_error = True -# More alerts may be added in future, so rather than list them individually, -# build a bit field based on field numbers of the DishAlerts message. -alert_bits = 0 -for alert in status.alerts.ListFields(): - alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + print_usage = False + default_loop_time = 0 + loop_time = default_loop_time + print_header = False -csv_data = [ - timestamp.replace(microsecond=0).isoformat(), - status.device_info.id, - status.device_info.hardware_version, - status.device_info.software_version, - spacex.api.device.dish_pb2.DishState.Name(status.state) -] -csv_data.extend(str(x) for x in [ - status.device_state.uptime_s, - status.snr, - status.seconds_to_first_nonempty_slot, - status.pop_ping_drop_rate, - status.downlink_throughput_bps, - status.uplink_throughput_bps, - status.pop_ping_latency_ms, - alert_bits, - status.obstruction_stats.fraction_obstructed, - status.obstruction_stats.currently_obstructed, - status.obstruction_stats.last_24h_obstructed_s -]) -csv_data.extend(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed) -print(",".join(csv_data)) + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-t": + loop_time = float(arg) + elif opt == "-H": + print_header = True + + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -H: print CSV header instead of parsing file") + sys.exit(1 if arg_error else 0) + + logging.basicConfig(format="%(levelname)s: %(message)s") + + if print_header: + header = [ + "datetimestamp_utc", + "hardware_version", + "software_version", + "state", + "uptime", + "snr", + "seconds_to_first_nonempty_slot", + "pop_ping_drop_rate", + "downlink_throughput_bps", + "uplink_throughput_bps", + "pop_ping_latency_ms", + "alerts", + "fraction_obstructed", + "currently_obstructed", + "seconds_obstructed", + ] + header.extend("wedges_fraction_obstructed_" + str(x) for x in range(12)) + print(",".join(header)) + sys.exit(0) + + def loop_body(): + timestamp = datetime.datetime.utcnow() + + try: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + + status = response.dish_get_status + + # More alerts may be added in future, so rather than list them individually, + # build a bit field based on field numbers of the DishAlerts message. + alert_bits = 0 + for alert in status.alerts.ListFields(): + alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + + csv_data = [ + timestamp.replace(microsecond=0).isoformat(), + status.device_info.id, + status.device_info.hardware_version, + status.device_info.software_version, + spacex.api.device.dish_pb2.DishState.Name(status.state), + ] + csv_data.extend( + str(x) for x in [ + status.device_state.uptime_s, + status.snr, + status.seconds_to_first_nonempty_slot, + status.pop_ping_drop_rate, + status.downlink_throughput_bps, + status.uplink_throughput_bps, + status.pop_ping_latency_ms, + alert_bits, + status.obstruction_stats.fraction_obstructed, + status.obstruction_stats.currently_obstructed, + status.obstruction_stats.last_24h_obstructed_s, + ]) + csv_data.extend(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed) + rc = 0 + except grpc.RpcError: + if loop_time <= 0: + logging.error("Failed getting status info") + csv_data = [ + timestamp.replace(microsecond=0).isoformat(), "", "", "", "DISH_UNREACHABLE" + ] + rc = 1 + + print(",".join(csv_data)) + + return rc + + next_loop = time.monotonic() + while True: + rc = loop_body() + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/dishStatusInflux.py b/dishStatusInflux.py index b6ab371..6a708f3 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -1,118 +1,270 @@ #!/usr/bin/python3 ###################################################################### # -# Write get_status info to an InfluxDB database. +# Write Starlink user terminal status info to an InfluxDB database. # -# This script will periodically poll current status and write it to -# the specified InfluxDB database in a loop. +# This script will poll current status and write it to the specified +# InfluxDB database either once or in a periodic loop. # ###################################################################### -import time -from influxdb import InfluxDBClient -from influxdb import SeriesHelper +import getopt +import logging +import os +import sys +import time +import warnings import grpc +from influxdb import InfluxDBClient +from influxdb import SeriesHelper import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -verbose = True -sleep_time = 30 -class DeviceStatusSeries(SeriesHelper): - class Meta: - series_name = "spacex.starlink.user_terminal.status" - fields = [ - "hardware_version", - "software_version", - "state", - "alert_motors_stuck", - "alert_thermal_throttle", - "alert_thermal_shutdown", - "alert_unexpected_location", - "snr", - "seconds_to_first_nonempty_slot", - "pop_ping_drop_rate", - "downlink_throughput_bps", - "uplink_throughput_bps", - "pop_ping_latency_ms", - "currently_obstructed", - "fraction_obstructed"] - tags = ["id"] +def main(): + arg_error = False -influx_client = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15) - -try: - dish_channel = None - last_id = None - last_failed = False - - pending = 0 - count = 0 - while True: - try: - if dish_channel is None: - dish_channel = grpc.insecure_channel("192.168.100.1:9200") - stub = spacex.api.device.device_pb2_grpc.DeviceStub(dish_channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) - status = response.dish_get_status - DeviceStatusSeries( - id=status.device_info.id, - hardware_version=status.device_info.hardware_version, - software_version=status.device_info.software_version, - state=spacex.api.device.dish_pb2.DishState.Name(status.state), - alert_motors_stuck=status.alerts.motors_stuck, - alert_thermal_throttle=status.alerts.thermal_throttle, - alert_thermal_shutdown=status.alerts.thermal_shutdown, - alert_unexpected_location=status.alerts.unexpected_location, - snr=status.snr, - seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot, - pop_ping_drop_rate=status.pop_ping_drop_rate, - downlink_throughput_bps=status.downlink_throughput_bps, - uplink_throughput_bps=status.uplink_throughput_bps, - pop_ping_latency_ms=status.pop_ping_latency_ms, - currently_obstructed=status.obstruction_stats.currently_obstructed, - fraction_obstructed=status.obstruction_stats.fraction_obstructed) - last_id = status.device_info.id - last_failed = False - except grpc.RpcError: - if dish_channel is not None: - dish_channel.close() - dish_channel = None - if last_failed: - if last_id is not None: - DeviceStatusSeries(id=last_id, state="DISH_UNREACHABLE") - else: - # Retry once, because the connection may have been lost while - # we were sleeping - last_failed = True - continue - pending = pending + 1 - if verbose: - print("Samples: " + str(pending)) - count = count + 1 - if count > 5: - try: - DeviceStatusSeries.commit(influx_client) - if verbose: - print("Wrote " + str(pending)) - pending = 0 - except Exception as e: - print("Failed to write: " + str(e)) - count = 0 - if sleep_time > 0: - time.sleep(sleep_time) - else: - break -finally: - # Flush on error/exit try: - DeviceStatusSeries.commit(influx_client) + opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:D:IP:R:SU:") + except getopt.GetoptError as err: + print(str(err)) + arg_error = True + + print_usage = False + verbose = False + default_loop_time = 0 + loop_time = default_loop_time + host_default = "localhost" + database_default = "starlinkstats" + icargs = {"host": host_default, "timeout": 5, "database": database_default} + rp = None + flush_limit = 6 + + # For each of these check they are both set and not empty string + influxdb_host = os.environ.get("INFLUXDB_HOST") + if influxdb_host: + icargs["host"] = influxdb_host + influxdb_port = os.environ.get("INFLUXDB_PORT") + if influxdb_port: + icargs["port"] = int(influxdb_port) + influxdb_user = os.environ.get("INFLUXDB_USER") + if influxdb_user: + icargs["username"] = influxdb_user + influxdb_pwd = os.environ.get("INFLUXDB_PWD") + if influxdb_pwd: + icargs["password"] = influxdb_pwd + influxdb_db = os.environ.get("INFLUXDB_DB") + if influxdb_db: + icargs["database"] = influxdb_db + influxdb_rp = os.environ.get("INFLUXDB_RP") + if influxdb_rp: + rp = influxdb_rp + influxdb_ssl = os.environ.get("INFLUXDB_SSL") + if influxdb_ssl: + icargs["ssl"] = True + if influxdb_ssl.lower() == "secure": + icargs["verify_ssl"] = True + elif influxdb_ssl.lower() == "insecure": + icargs["verify_ssl"] = False + else: + icargs["verify_ssl"] = influxdb_ssl + + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-n": + icargs["host"] = arg + elif opt == "-p": + icargs["port"] = int(arg) + elif opt == "-t": + loop_time = int(arg) + elif opt == "-v": + verbose = True + elif opt == "-C": + icargs["ssl"] = True + icargs["verify_ssl"] = arg + elif opt == "-D": + icargs["database"] = arg + elif opt == "-I": + icargs["ssl"] = True + icargs["verify_ssl"] = False + elif opt == "-P": + icargs["password"] = arg + elif opt == "-R": + rp = arg + elif opt == "-S": + icargs["ssl"] = True + icargs["verify_ssl"] = True + elif opt == "-U": + icargs["username"] = arg + + if "password" in icargs and "username" not in icargs: + print("Password authentication requires username to be set") + arg_error = True + + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -n : Hostname of InfluxDB server, default: " + host_default) + print(" -p : Port number to use on InfluxDB server") + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify server") + print(" -D : Database name to use, default: " + database_default) + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P : Set password for authentication") + print(" -R : Retention policy name to use") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U : Set username for authentication") + sys.exit(1 if arg_error else 0) + + logging.basicConfig(format="%(levelname)s: %(message)s") + + class GlobalState: + pass + + gstate = GlobalState() + gstate.dish_channel = None + gstate.dish_id = None + gstate.pending = 0 + + class DeviceStatusSeries(SeriesHelper): + class Meta: + series_name = "spacex.starlink.user_terminal.status" + fields = [ + "hardware_version", + "software_version", + "state", + "alert_motors_stuck", + "alert_thermal_throttle", + "alert_thermal_shutdown", + "alert_unexpected_location", + "snr", + "seconds_to_first_nonempty_slot", + "pop_ping_drop_rate", + "downlink_throughput_bps", + "uplink_throughput_bps", + "pop_ping_latency_ms", + "currently_obstructed", + "fraction_obstructed", + ] + tags = ["id"] + retention_policy = rp + + def conn_error(msg, *args): + # Connection errors that happen in an interval loop are not critical + # failures, but are interesting enough to print in non-verbose mode. + if loop_time > 0: + print(msg % args) + else: + logging.error(msg, *args) + + def flush_pending(client): + try: + DeviceStatusSeries.commit(client) + if verbose: + print("Data points written: " + str(gstate.pending)) + gstate.pending = 0 + except Exception as e: + conn_error("Failed writing to InfluxDB database: %s", str(e)) + return 1 + + return 0 + + def get_status_retry(): + """Try getting the status at most twice""" + + channel_reused = True + while True: + try: + if gstate.dish_channel is None: + gstate.dish_channel = grpc.insecure_channel("192.168.100.1:9200") + channel_reused = False + stub = spacex.api.device.device_pb2_grpc.DeviceStub(gstate.dish_channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + return response.dish_get_status + except grpc.RpcError: + gstate.dish_channel.close() + gstate.dish_channel = None + if channel_reused: + # If the channel was open already, the connection may have + # been lost in the time since prior loop iteration, so after + # closing it, retry once, in case the dish is now reachable. + if verbose: + print("Dish RPC channel error") + else: + raise + + def loop_body(client): + try: + status = get_status_retry() + DeviceStatusSeries(id=status.device_info.id, + hardware_version=status.device_info.hardware_version, + software_version=status.device_info.software_version, + state=spacex.api.device.dish_pb2.DishState.Name(status.state), + alert_motors_stuck=status.alerts.motors_stuck, + alert_thermal_throttle=status.alerts.thermal_throttle, + alert_thermal_shutdown=status.alerts.thermal_shutdown, + alert_unexpected_location=status.alerts.unexpected_location, + snr=status.snr, + seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot, + pop_ping_drop_rate=status.pop_ping_drop_rate, + downlink_throughput_bps=status.downlink_throughput_bps, + uplink_throughput_bps=status.uplink_throughput_bps, + pop_ping_latency_ms=status.pop_ping_latency_ms, + currently_obstructed=status.obstruction_stats.currently_obstructed, + fraction_obstructed=status.obstruction_stats.fraction_obstructed) + gstate.dish_id = status.device_info.id + except grpc.RpcError: + if gstate.dish_id is None: + conn_error("Dish unreachable and ID unknown, so not recording state") + return 1 + else: + if verbose: + print("Dish unreachable") + DeviceStatusSeries(id=gstate.dish_id, state="DISH_UNREACHABLE") + + gstate.pending += 1 if verbose: - print("Wrote " + str(pending)) - except Exception as e: - print("Failed to write: " + str(e)) - influx_client.close() - if dish_channel is not None: - dish_channel.close() + print("Data points queued: " + str(gstate.pending)) + if gstate.pending >= flush_limit: + return flush_pending(client) + + return 0 + + if "verify_ssl" in icargs and not icargs["verify_ssl"]: + # user has explicitly said be insecure, so don't warn about it + warnings.filterwarnings("ignore", message="Unverified HTTPS request") + + influx_client = InfluxDBClient(**icargs) + try: + next_loop = time.monotonic() + while True: + rc = loop_body(influx_client) + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + finally: + # Flush on error/exit + if gstate.pending: + rc = flush_pending(influx_client) + influx_client.close() + if gstate.dish_channel is not None: + gstate.dish_channel.close() + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py index 9baaddd..06a1324 100644 --- a/dishStatusMqtt.py +++ b/dishStatusMqtt.py @@ -1,50 +1,188 @@ #!/usr/bin/python3 ###################################################################### # -# Publish get_status info to a MQTT broker. +# Publish Starlink user terminal status info to a MQTT broker. # -# This script pulls the current status once and publishes it to the -# specified MQTT broker. +# This script pulls the current status and publishes it to the +# specified MQTT broker either once or in a periodic loop. # ###################################################################### -import paho.mqtt.publish + +import getopt +import logging +import sys +import time + +try: + import ssl + ssl_ok = True +except ImportError: + ssl_ok = False import grpc +import paho.mqtt.publish import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) -status = response.dish_get_status +def main(): + arg_error = False -# More alerts may be added in future, so rather than list them individually, -# build a bit field based on field numbers of the DishAlerts message. -alert_bits = 0 -for alert in status.alerts.ListFields(): - alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + try: + opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:ISP:U:") + except getopt.GetoptError as err: + print(str(err)) + arg_error = True -topic_prefix = "starlink/dish_status/" + status.device_info.id + "/" -msgs = [(topic_prefix + "hardware_version", status.device_info.hardware_version, 0, False), - (topic_prefix + "software_version", status.device_info.software_version, 0, False), - (topic_prefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), - (topic_prefix + "uptime", status.device_state.uptime_s, 0, False), - (topic_prefix + "snr", status.snr, 0, False), - (topic_prefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), - (topic_prefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), - (topic_prefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False), - (topic_prefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False), - (topic_prefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False), - (topic_prefix + "alerts", alert_bits, 0, False), - (topic_prefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False), - (topic_prefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False), - # While the field name for this one implies it covers 24 hours, the - # empirical evidence suggests it only covers 12 hours. It also resets - # on dish reboot, so may not cover that whole period. Rather than try - # to convey that complexity in the topic label, just be a bit vague: - (topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), - (topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)] + print_usage = False + verbose = False + default_loop_time = 0 + loop_time = default_loop_time + host_default = "localhost" + mqargs = {"hostname": host_default} + username = None + password = None -paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id) + if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-n": + mqargs["hostname"] = arg + elif opt == "-p": + mqargs["port"] = int(arg) + elif opt == "-t": + loop_time = float(arg) + elif opt == "-v": + verbose = True + elif opt == "-C": + mqargs["tls"] = {"ca_certs": arg} + elif opt == "-I": + if ssl_ok: + mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE} + else: + print("No SSL support found") + sys.exit(1) + elif opt == "-P": + password = arg + elif opt == "-S": + mqargs["tls"] = {} + elif opt == "-U": + username = arg + + if username is None and password is not None: + print("Password authentication requires username to be set") + arg_error = True + + if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -n : Hostname of MQTT broker, default: " + host_default) + print(" -p : Port number to use on MQTT broker") + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_loop_time)) + print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify broker") + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P: Set password for username/password authentication") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U: Set username for authentication") + sys.exit(1 if arg_error else 0) + + if username is not None: + mqargs["auth"] = {"username": username} + if password is not None: + mqargs["auth"]["password"] = password + + logging.basicConfig(format="%(levelname)s: %(message)s") + + class GlobalState: + pass + + gstate = GlobalState() + gstate.dish_id = None + + def conn_error(msg, *args): + # Connection errors that happen in an interval loop are not critical + # failures, but are interesting enough to print in non-verbose mode. + if loop_time > 0: + print(msg % args) + else: + logging.error(msg, *args) + + def loop_body(): + try: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + + status = response.dish_get_status + + # More alerts may be added in future, so rather than list them individually, + # build a bit field based on field numbers of the DishAlerts message. + alert_bits = 0 + for alert in status.alerts.ListFields(): + alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) + + gstate.dish_id = status.device_info.id + topic_prefix = "starlink/dish_status/" + gstate.dish_id + "/" + msgs = [ + (topic_prefix + "hardware_version", status.device_info.hardware_version, 0, False), + (topic_prefix + "software_version", status.device_info.software_version, 0, False), + (topic_prefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), + (topic_prefix + "uptime", status.device_state.uptime_s, 0, False), + (topic_prefix + "snr", status.snr, 0, False), + (topic_prefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), + (topic_prefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), + (topic_prefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False), + (topic_prefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False), + (topic_prefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False), + (topic_prefix + "alerts", alert_bits, 0, False), + (topic_prefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False), + (topic_prefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False), + # While the field name for this one implies it covers 24 hours, the + # empirical evidence suggests it only covers 12 hours. It also resets + # on dish reboot, so may not cover that whole period. Rather than try + # to convey that complexity in the topic label, just be a bit vague: + (topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), + (topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False), + ] + except grpc.RpcError: + if gstate.dish_id is None: + conn_error("Dish unreachable and ID unknown, so not recording state") + return 1 + if verbose: + print("Dish unreachable") + topic_prefix = "starlink/dish_status/" + gstate.dish_id + "/" + msgs = [(topic_prefix + "state", "DISH_UNREACHABLE", 0, False)] + + try: + paho.mqtt.publish.multiple(msgs, client_id=gstate.dish_id, **mqargs) + if verbose: + print("Successfully published to MQTT broker") + except Exception as e: + conn_error("Failed publishing to MQTT broker: %s", str(e)) + return 1 + + return 0 + + next_loop = time.monotonic() + while True: + rc = loop_body() + if loop_time > 0: + now = time.monotonic() + next_loop = max(next_loop + loop_time, now) + time.sleep(next_loop - now) + else: + break + + sys.exit(rc) + + +if __name__ == '__main__': + main() diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 0000000..5cce7fe --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,13 @@ +#!/bin/sh + +printenv >> /etc/environment +ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone +grpcurl -plaintext -protoset-out dish.protoset 192.168.100.1:9200 describe SpaceX.API.Device.Device +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/device.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/common/status/status.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/command.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/common.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/dish.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto +python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto +/usr/local/bin/python3 $@ diff --git a/parseJsonHistory.py b/parseJsonHistory.py index 9357847..e12d676 100644 --- a/parseJsonHistory.py +++ b/parseJsonHistory.py @@ -16,6 +16,7 @@ import datetime import sys import getopt +import logging import starlink_json @@ -32,7 +33,6 @@ samples_default = 3600 samples = samples_default print_usage = False verbose = False -parse_all = False print_header = False run_lengths = False @@ -42,7 +42,7 @@ if not arg_error: else: for opt, arg in opts: if opt == "-a": - parse_all = True + samples = -1 elif opt == "-h": print_usage = True elif opt == "-r": @@ -61,16 +61,19 @@ if print_usage or arg_error: print(" -a: Parse all valid samples") print(" -h: Be helpful") print(" -r: Include ping drop run length stats") - print(" -s : Parse data samples, default: " + str(samples_default)) + print(" -s : Number of data samples to parse, default: " + str(samples_default)) print(" -v: Be verbose") print(" -H: print CSV header instead of parsing file") sys.exit(1 if arg_error else 0) -fields, rl_fields = starlink_json.history_ping_field_names() +logging.basicConfig(format="%(levelname)s: %(message)s") + +g_fields, pd_fields, rl_fields = starlink_json.history_ping_field_names() if print_header: header = ["datetimestamp_utc"] - header.extend(fields) + header.extend(g_fields) + header.extend(pd_fields) if run_lengths: for field in rl_fields: if field.startswith("run_"): @@ -82,24 +85,23 @@ if print_header: timestamp = datetime.datetime.utcnow() -stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", - -1 if parse_all else samples, - verbose) - -if stats is None or rl_stats is None: - # verbose output already happened, so just bail. +try: + g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", + samples, verbose) +except starlink_json.JsonError as e: + logging.error("Failure getting ping stats: %s", str(e)) sys.exit(1) if verbose: - print("Parsed samples: " + str(stats["samples"])) - print("Total ping drop: " + str(stats["total_ping_drop"])) - print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) - print("Obstructed: " + str(stats["count_obstructed"])) - print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) - print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) - print("Unscheduled: " + str(stats["count_unscheduled"])) - print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) - print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) + print("Parsed samples: " + str(g_stats["samples"])) + print("Total ping drop: " + str(pd_stats["total_ping_drop"])) + print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"])) + print("Obstructed: " + str(pd_stats["count_obstructed"])) + print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"])) + print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"])) + print("Unscheduled: " + str(pd_stats["count_unscheduled"])) + print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"])) + print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"])) if run_lengths: print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) @@ -107,7 +109,8 @@ if verbose: print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) else: csv_data = [timestamp.replace(microsecond=0).isoformat()] - csv_data.extend(str(stats[field]) for field in fields) + csv_data.extend(str(g_stats[field]) for field in g_fields) + csv_data.extend(str(pd_stats[field]) for field in pd_fields) if run_lengths: for field in rl_fields: if field.startswith("run_"): diff --git a/starlink_grpc.py b/starlink_grpc.py index 3401863..40e3572 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -4,13 +4,17 @@ This module may eventually contain more expansive parsing logic, but for now it contains functions to parse the history data for some specific packet loss statistics. -General ping drop (packet loss) statistics: - This group of statistics characterize the packet loss (labeled "ping drop" - in the field names of the Starlink gRPC service protocol) in various ways. +General statistics: + This group of statistics contains data relevant to all the other groups. The sample interval is currently 1 second. samples: The number of valid samples analyzed. + +General ping drop (packet loss) statistics: + This group of statistics characterize the packet loss (labeled "ping drop" + in the field names of the Starlink gRPC service protocol) in various ways. + total_ping_drop: The total amount of time, in sample intervals, that experienced ping drop. count_full_ping_drop: The number of samples that experienced 100% @@ -62,13 +66,13 @@ Ping drop run length statistics: No sample should be counted in more than one of the run length stats or stat elements, so the total of all of them should be equal to - count_full_ping_drop from the general stats. + count_full_ping_drop from the ping drop stats. Samples that experience less than 100% ping drop are not counted in this group of stats, even if they happen at the beginning or end of a run of 100% ping drop samples. To compute the amount of time that experienced ping loss in less than a single run of 100% ping drop, use - (total_ping_drop - count_full_ping_drop) from the general stats. + (total_ping_drop - count_full_ping_drop) from the ping drop stats. """ from itertools import chain @@ -78,15 +82,61 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc + +class GrpcError(Exception): + """Provides error info when something went wrong with a gRPC call.""" + def __init__(self, e, *args, **kwargs): + # grpc.RpcError is too verbose to print in whole, but it may also be + # a Call object, and that class has some minimally useful info. + if isinstance(e, grpc.Call): + msg = e.details() + elif isinstance(e, grpc.RpcError): + msg = "Unknown communication or service error" + else: + msg = str(e) + super().__init__(msg, *args, **kwargs) + + +def get_status(): + """Fetch status data and return it in grpc structure format. + + Raises: + grpc.RpcError: Communication or service error. + """ + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + return response.dish_get_status + + +def get_id(): + """Return the ID from the dish status information. + + Returns: + A string identifying the Starlink user terminal reachable from the + local network. + + Raises: + GrpcError: No user terminal is currently reachable. + """ + try: + status = get_status() + return status.device_info.id + except grpc.RpcError as e: + raise GrpcError(e) + + def history_ping_field_names(): """Return the field names of the packet loss stats. Returns: - A tuple with 2 lists, the first with general stat names and the - second with ping drop run length stat names. + A tuple with 3 lists, the first with general stat names, the second + with ping drop stat names, and the third with ping drop run length + stat names. """ return [ "samples", + ], [ "total_ping_drop", "count_full_ping_drop", "count_obstructed", @@ -94,14 +144,15 @@ def history_ping_field_names(): "count_full_obstructed_ping_drop", "count_unscheduled", "total_unscheduled_ping_drop", - "count_full_unscheduled_ping_drop" + "count_full_unscheduled_ping_drop", ], [ "init_run_fragment", "final_run_fragment", "run_seconds", - "run_minutes" + "run_minutes", ] + def get_history(): """Fetch history data and return it in grpc structure format. @@ -113,6 +164,7 @@ def get_history(): response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) return response.dish_get_history + def history_ping_stats(parse_samples, verbose=False): """Fetch, parse, and compute the packet loss stats. @@ -122,19 +174,18 @@ def history_ping_stats(parse_samples, verbose=False): verbose (bool): Optionally produce verbose output. Returns: - On success, a tuple with 2 dicts, the first mapping general stat names - to their values and the second mapping ping drop run length stat names - to their values. + A tuple with 3 dicts, the first mapping general stat names to their + values, the second mapping ping drop stat names to their values and + the third mapping ping drop run length stat names to their values. - On failure, the tuple (None, None). + Raises: + GrpcError: Failed getting history info from the Starlink user + terminal. """ try: history = get_history() - except grpc.RpcError: - if verbose: - # RpcError is too verbose to print the details. - print("Failed getting history") - return None, None + except grpc.RpcError as e: + raise GrpcError(e) # 'current' is the count of data samples written to the ring buffer, # irrespective of buffer wrap. @@ -154,13 +205,13 @@ def history_ping_stats(parse_samples, verbose=False): # index to next data sample after the newest one. offset = current % samples - tot = 0 + tot = 0.0 count_full_drop = 0 count_unsched = 0 - total_unsched_drop = 0 + total_unsched_drop = 0.0 count_full_unsched = 0 count_obstruct = 0 - total_obstruct_drop = 0 + total_obstruct_drop = 0.0 count_full_obstruct = 0 second_runs = [0] * 60 @@ -180,9 +231,10 @@ def history_ping_stats(parse_samples, verbose=False): for i in sample_range: d = history.pop_ping_drop_rate[i] - tot += d if d >= 1: - count_full_drop += d + # just in case... + d = 1 + count_full_drop += 1 run_length += 1 elif run_length > 0: if init_run_length is None: @@ -191,7 +243,7 @@ def history_ping_stats(parse_samples, verbose=False): if run_length <= 60: second_runs[run_length - 1] += run_length else: - minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length + minute_runs[min((run_length-1) // 60 - 1, 59)] += run_length run_length = 0 elif init_run_length is None: init_run_length = 0 @@ -199,7 +251,7 @@ def history_ping_stats(parse_samples, verbose=False): count_unsched += 1 total_unsched_drop += d if d >= 1: - count_full_unsched += d + count_full_unsched += 1 # scheduled=false and obstructed=true do not ever appear to overlap, # but in case they do in the future, treat that as just unscheduled # in order to avoid double-counting it. @@ -207,7 +259,8 @@ def history_ping_stats(parse_samples, verbose=False): count_obstruct += 1 total_obstruct_drop += d if d >= 1: - count_full_obstruct += d + count_full_obstruct += 1 + tot += d # If the entire sample set is one big drop run, it will be both initial # fragment (continued from prior sample range) and final one (continued @@ -219,6 +272,7 @@ def history_ping_stats(parse_samples, verbose=False): return { "samples": parse_samples, + }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop, "count_obstructed": count_obstruct, @@ -226,10 +280,10 @@ def history_ping_stats(parse_samples, verbose=False): "count_full_obstructed_ping_drop": count_full_obstruct, "count_unscheduled": count_unsched, "total_unscheduled_ping_drop": total_unsched_drop, - "count_full_unscheduled_ping_drop": count_full_unsched + "count_full_unscheduled_ping_drop": count_full_unsched, }, { "init_run_fragment": init_run_length, "final_run_fragment": run_length, "run_seconds": second_runs, - "run_minutes": minute_runs + "run_minutes": minute_runs, } diff --git a/starlink_json.py b/starlink_json.py index ca70547..7365430 100644 --- a/starlink_json.py +++ b/starlink_json.py @@ -14,15 +14,22 @@ import sys from itertools import chain + +class JsonError(Exception): + """Provides error info when something went wrong with JSON parsing.""" + + def history_ping_field_names(): """Return the field names of the packet loss stats. Returns: - A tuple with 2 lists, the first with general stat names and the - second with ping drop run length stat names. + A tuple with 3 lists, the first with general stat names, the second + with ping drop stat names, and the third with ping drop run length + stat names. """ return [ "samples", + ], [ "total_ping_drop", "count_full_ping_drop", "count_obstructed", @@ -30,31 +37,34 @@ def history_ping_field_names(): "count_full_obstructed_ping_drop", "count_unscheduled", "total_unscheduled_ping_drop", - "count_full_unscheduled_ping_drop" + "count_full_unscheduled_ping_drop", ], [ "init_run_fragment", "final_run_fragment", "run_seconds", - "run_minutes" + "run_minutes", ] + def get_history(filename): """Read JSON data and return the raw history in dict format. Args: filename (str): Filename from which to read JSON data, or "-" to read from standard input. + + Raises: + Various exceptions depending on Python version: Failure to open or + read input or invalid JSON read on input. """ if filename == "-": json_data = json.load(sys.stdin) else: - json_file = open(filename) - try: + with open(filename) as json_file: json_data = json.load(json_file) - finally: - json_file.close() return json_data["dishGetHistory"] + def history_ping_stats(filename, parse_samples, verbose=False): """Fetch, parse, and compute the packet loss stats. @@ -66,18 +76,19 @@ def history_ping_stats(filename, parse_samples, verbose=False): verbose (bool): Optionally produce verbose output. Returns: - On success, a tuple with 2 dicts, the first mapping general stat names - to their values and the second mapping ping drop run length stat names - to their values. + A tuple with 3 dicts, the first mapping general stat names to their + values, the second mapping ping drop stat names to their values and + the third mapping ping drop run length stat names to their values. - On failure, the tuple (None, None). + Raises: + JsonError: Failure to open, read, or parse JSON on input. """ try: history = get_history(filename) + except ValueError as e: + raise JsonError("Failed to parse JSON: " + str(e)) except Exception as e: - if verbose: - print("Failed getting history: " + str(e)) - return None, None + raise JsonError(e) # "current" is the count of data samples written to the ring buffer, # irrespective of buffer wrap. @@ -97,13 +108,13 @@ def history_ping_stats(filename, parse_samples, verbose=False): # index to next data sample after the newest one. offset = current % samples - tot = 0 + tot = 0.0 count_full_drop = 0 count_unsched = 0 - total_unsched_drop = 0 + total_unsched_drop = 0.0 count_full_unsched = 0 count_obstruct = 0 - total_obstruct_drop = 0 + total_obstruct_drop = 0.0 count_full_obstruct = 0 second_runs = [0] * 60 @@ -123,9 +134,10 @@ def history_ping_stats(filename, parse_samples, verbose=False): for i in sample_range: d = history["popPingDropRate"][i] - tot += d if d >= 1: - count_full_drop += d + # just in case... + d = 1 + count_full_drop += 1 run_length += 1 elif run_length > 0: if init_run_length is None: @@ -134,7 +146,7 @@ def history_ping_stats(filename, parse_samples, verbose=False): if run_length <= 60: second_runs[run_length - 1] += run_length else: - minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length + minute_runs[min((run_length-1) // 60 - 1, 59)] += run_length run_length = 0 elif init_run_length is None: init_run_length = 0 @@ -142,7 +154,7 @@ def history_ping_stats(filename, parse_samples, verbose=False): count_unsched += 1 total_unsched_drop += d if d >= 1: - count_full_unsched += d + count_full_unsched += 1 # scheduled=false and obstructed=true do not ever appear to overlap, # but in case they do in the future, treat that as just unscheduled # in order to avoid double-counting it. @@ -150,7 +162,8 @@ def history_ping_stats(filename, parse_samples, verbose=False): count_obstruct += 1 total_obstruct_drop += d if d >= 1: - count_full_obstruct += d + count_full_obstruct += 1 + tot += d # If the entire sample set is one big drop run, it will be both initial # fragment (continued from prior sample range) and final one (continued @@ -162,6 +175,7 @@ def history_ping_stats(filename, parse_samples, verbose=False): return { "samples": parse_samples, + }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop, "count_obstructed": count_obstruct, @@ -169,10 +183,10 @@ def history_ping_stats(filename, parse_samples, verbose=False): "count_full_obstructed_ping_drop": count_full_obstruct, "count_unscheduled": count_unsched, "total_unscheduled_ping_drop": total_unsched_drop, - "count_full_unscheduled_ping_drop": count_full_unsched + "count_full_unscheduled_ping_drop": count_full_unsched, }, { "init_run_fragment": init_run_length, "final_run_fragment": run_length, "run_seconds": second_runs, - "run_minutes": minute_runs + "run_minutes": minute_runs, }