Merge pull request #13 from sparky8512/main

Make current with main branch
This commit is contained in:
sparky8512 2021-01-16 07:34:24 -08:00 committed by GitHub
commit 51f1193bd2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 2163 additions and 338 deletions

25
Dockerfile Normal file
View file

@ -0,0 +1,25 @@
FROM python:3.9
LABEL maintainer="neurocis <neurocis@neurocis.me>"
RUN true && \
\
# Install GRPCurl
wget https://github.com/fullstorydev/grpcurl/releases/download/v1.8.0/grpcurl_1.8.0_linux_x86_64.tar.gz && \
tar -xvf grpcurl_1.8.0_linux_x86_64.tar.gz grpcurl && \
chown root:root grpcurl && \
chmod 755 grpcurl && \
mv grpcurl /usr/bin/. && \
rm grpcurl_1.8.0_linux_x86_64.tar.gz && \
\
# Install python prerequisites
pip3 install grpcio grpcio-tools paho-mqtt influxdb
ADD . /app
WORKDIR /app
# run crond as main process of container
ENTRYPOINT ["/bin/sh", "/app/entrypoint.sh"]
CMD ["dishStatusInflux.py"]
# docker run -d --name='starlink-grpc-tools' -e INFLUXDB_HOST=192.168.1.34 -e INFLUXDB_PORT=8086 -e INFLUXDB_DB=starlink
# --net='br0' --ip='192.168.1.39' neurocis/starlink-grpc-tools dishStatusInflux.py

View file

@ -0,0 +1,818 @@
{
"__inputs": [
{
"name": "VAR_DS_INFLUXDB",
"type": "constant",
"label": "InfluxDB DataSource",
"value": "InfluxDB-starlinkstats",
"description": ""
},
{
"name": "VAR_TBL_STATS",
"type": "constant",
"label": "Table name for Statistics",
"value": "spacex.starlink.user_terminal.status",
"description": ""
}
],
"__requires": [
{
"type": "grafana",
"id": "grafana",
"name": "Grafana",
"version": "7.3.6"
},
{
"type": "panel",
"id": "graph",
"name": "Graph",
"version": ""
},
{
"type": "datasource",
"id": "influxdb",
"name": "InfluxDB",
"version": "1.0.0"
},
{
"type": "panel",
"id": "table",
"name": "Table",
"version": ""
}
],
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": null,
"iteration": 1610413551748,
"links": [],
"panels": [
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$DS_INFLUXDB",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 11,
"w": 12,
"x": 0,
"y": 0
},
"hiddenSeries": false,
"id": 4,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"hideZero": false,
"max": true,
"min": false,
"rightSide": false,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.3.6",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"groupBy": [],
"measurement": "/^$TBL_STATS$/",
"orderByTime": "ASC",
"policy": "default",
"queryType": "randomWalk",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"downlink_throughput_bps"
],
"type": "field"
},
{
"params": [
"bps Down"
],
"type": "alias"
}
],
[
{
"params": [
"uplink_throughput_bps"
],
"type": "field"
},
{
"params": [
"bps Up"
],
"type": "alias"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Actual Throughput",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"$$hashKey": "object:1099",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"$$hashKey": "object:1100",
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$DS_INFLUXDB",
"description": "",
"fieldConfig": {
"defaults": {
"custom": {}
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 11,
"w": 12,
"x": 12,
"y": 0
},
"hiddenSeries": false,
"id": 2,
"legend": {
"alignAsTable": true,
"avg": true,
"current": true,
"max": true,
"min": true,
"show": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"nullPointMode": "null",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.3.6",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"groupBy": [],
"measurement": "/^$TBL_STATS$/",
"orderByTime": "ASC",
"policy": "default",
"queryType": "randomWalk",
"refId": "A",
"resultFormat": "time_series",
"select": [
[
{
"params": [
"pop_ping_latency_ms"
],
"type": "field"
},
{
"params": [
"Ping Latency"
],
"type": "alias"
}
],
[
{
"params": [
"pop_ping_drop_rate"
],
"type": "field"
},
{
"params": [
"Drop Rate"
],
"type": "alias"
}
],
[
{
"params": [
"fraction_obstructed"
],
"type": "field"
},
{
"params": [
"*100"
],
"type": "math"
},
{
"params": [
"Percent Obstructed"
],
"type": "alias"
}
],
[
{
"params": [
"snr"
],
"type": "field"
},
{
"params": [
"*10"
],
"type": "math"
},
{
"params": [
"SNR"
],
"type": "alias"
}
]
],
"tags": []
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Ping Latency, Drop Rate, Percent Obstructed & SNR",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"cacheTimeout": null,
"datasource": "$DS_INFLUXDB",
"description": "",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": [
{
"matcher": {
"id": "byName",
"options": "Obstructed"
},
"properties": [
{
"id": "custom.width",
"value": 105
}
]
},
{
"matcher": {
"id": "byName",
"options": "Wrong Location"
},
"properties": [
{
"id": "custom.width",
"value": 114
}
]
},
{
"matcher": {
"id": "byName",
"options": "Thermal Throttle"
},
"properties": [
{
"id": "custom.width",
"value": 121
}
]
},
{
"matcher": {
"id": "byName",
"options": "Thermal Shutdown"
},
"properties": [
{
"id": "custom.width",
"value": 136
}
]
},
{
"matcher": {
"id": "byName",
"options": "Motors Stuck"
},
"properties": [
{
"id": "custom.width",
"value": 116
}
]
},
{
"matcher": {
"id": "byName",
"options": "Time"
},
"properties": [
{
"id": "custom.width",
"value": 143
}
]
},
{
"matcher": {
"id": "byName",
"options": "State"
},
"properties": [
{
"id": "custom.width",
"value": 118
}
]
},
{
"matcher": {
"id": "byName",
"options": "Bad Location"
},
"properties": [
{
"id": "custom.width",
"value": 122
}
]
},
{
"matcher": {
"id": "byName",
"options": "Temp Throttle"
},
"properties": [
{
"id": "custom.width",
"value": 118
}
]
},
{
"matcher": {
"id": "byName",
"options": "Temp Shutdown"
},
"properties": [
{
"id": "custom.width",
"value": 134
}
]
},
{
"matcher": {
"id": "byName",
"options": "Software Version"
},
"properties": [
{
"id": "custom.width",
"value": 369
}
]
}
]
},
"gridPos": {
"h": 7,
"w": 24,
"x": 0,
"y": 11
},
"id": 6,
"interval": null,
"links": [],
"options": {
"showHeader": true,
"sortBy": [
{
"desc": true,
"displayName": "Time (last)"
}
]
},
"pluginVersion": "7.3.6",
"targets": [
{
"groupBy": [],
"hide": false,
"measurement": "/^$TBL_STATS$/",
"orderByTime": "ASC",
"policy": "default",
"query": "SELECT \"currently_obstructed\" AS \"Obstructed\", \"alert_unexpected_location\" AS \"Wrong Location\", \"alert_thermal_throttle\" AS \"Thermal Throttle\", \"alert_thermal_shutdown\" AS \"Thermal Shutdown\", \"alert_motors_stuck\" AS \"Motors Stuck\", \"state\" AS \"State\" FROM \"spacex.starlink.user_terminal.status\" WHERE $timeFilter",
"queryType": "randomWalk",
"rawQuery": false,
"refId": "A",
"resultFormat": "table",
"select": [
[
{
"params": [
"state"
],
"type": "field"
},
{
"params": [
"State"
],
"type": "alias"
}
],
[
{
"params": [
"currently_obstructed"
],
"type": "field"
},
{
"params": [
"Obstructed"
],
"type": "alias"
}
],
[
{
"params": [
"alert_unexpected_location"
],
"type": "field"
},
{
"params": [
"Bad Location"
],
"type": "alias"
}
],
[
{
"params": [
"alert_thermal_throttle"
],
"type": "field"
},
{
"params": [
"Temp Throttled"
],
"type": "alias"
}
],
[
{
"params": [
"alert_thermal_shutdown"
],
"type": "field"
},
{
"params": [
"Temp Shutdown"
],
"type": "alias"
}
],
[
{
"params": [
"alert_motors_stuck"
],
"type": "field"
},
{
"params": [
"Motors Stuck"
],
"type": "alias"
}
],
[
{
"params": [
"software_version"
],
"type": "field"
},
{
"params": [
"Software Version"
],
"type": "alias"
}
],
[
{
"params": [
"hardware_version"
],
"type": "field"
},
{
"params": [
"Hardware Version"
],
"type": "alias"
}
]
],
"tags": []
}
],
"timeFrom": null,
"timeShift": null,
"title": "Alerts & Versions",
"transformations": [
{
"id": "groupBy",
"options": {
"fields": {
"Bad Location": {
"aggregations": [],
"operation": "groupby"
},
"Hardware Version": {
"aggregations": [],
"operation": "groupby"
},
"Motors Stuck": {
"aggregations": [],
"operation": "groupby"
},
"Obstructed": {
"aggregations": [],
"operation": "groupby"
},
"Software Version": {
"aggregations": [],
"operation": "groupby"
},
"State": {
"aggregations": [],
"operation": "groupby"
},
"Temp Shutdown": {
"aggregations": [],
"operation": "groupby"
},
"Temp Throttle": {
"aggregations": [],
"operation": "groupby"
},
"Temp Throttled": {
"aggregations": [],
"operation": "groupby"
},
"Thermal Shutdown": {
"aggregations": [],
"operation": "groupby"
},
"Thermal Throttle": {
"aggregations": [],
"operation": "groupby"
},
"Time": {
"aggregations": [
"last"
],
"operation": "aggregate"
},
"Wrong Location": {
"aggregations": [],
"operation": "groupby"
}
}
}
}
],
"type": "table"
}
],
"refresh": false,
"schemaVersion": 26,
"style": "dark",
"tags": [],
"templating": {
"list": [
{
"current": {
"value": "${VAR_DS_INFLUXDB}",
"text": "${VAR_DS_INFLUXDB}",
"selected": false
},
"error": null,
"hide": 2,
"label": "InfluxDB DataSource",
"name": "DS_INFLUXDB",
"options": [
{
"value": "${VAR_DS_INFLUXDB}",
"text": "${VAR_DS_INFLUXDB}",
"selected": false
}
],
"query": "${VAR_DS_INFLUXDB}",
"skipUrlSync": false,
"type": "constant"
},
{
"current": {
"value": "${VAR_TBL_STATS}",
"text": "${VAR_TBL_STATS}",
"selected": false
},
"error": null,
"hide": 2,
"label": "Table name for Statistics",
"name": "TBL_STATS",
"options": [
{
"value": "${VAR_TBL_STATS}",
"text": "${VAR_TBL_STATS}",
"selected": false
}
],
"query": "${VAR_TBL_STATS}",
"skipUrlSync": false,
"type": "constant"
}
]
},
"time": {
"from": "now-24h",
"to": "now"
},
"timepicker": {
"refresh_intervals": [
"5s",
"10s",
"30s",
"1m",
"5m",
"15m",
"30m",
"1h",
"2h",
"1d"
]
},
"timezone": "",
"title": "Starlink Statistics",
"uid": "ymkHwLaMz",
"version": 36
}

View file

@ -7,7 +7,7 @@ For more information on what Starlink is, see [starlink.com](https://www.starlin
`parseJsonHistory.py` operates on a JSON format data representation of the protocol buffer messages, such as that output by [gRPCurl](https://github.com/fullstorydev/grpcurl). The command lines below assume `grpcurl` is installed in the runtime PATH. If that's not the case, just substitute in the full path to the command. `parseJsonHistory.py` operates on a JSON format data representation of the protocol buffer messages, such as that output by [gRPCurl](https://github.com/fullstorydev/grpcurl). The command lines below assume `grpcurl` is installed in the runtime PATH. If that's not the case, just substitute in the full path to the command.
All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile) and [iOS app](https://apps.apple.com/us/app/starlink/id1537177988). When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts. All the tools that pull data from the dish expect to be able to reach it at the dish's fixed IP address of 192.168.100.1, as do the Starlink [Android app](https://play.google.com/store/apps/details?id=com.starlink.mobile), [iOS app](https://apps.apple.com/us/app/starlink/id1537177988), and the browser app you can run directly from http://192.168.100.1. When using a router other than the one included with the Starlink installation kit, this usually requires some additional router configuration to make it work. That configuration is beyond the scope of this document, but if the Starlink app doesn't work on your home network, then neither will these scripts. That being said, you do not need the Starlink app installed to make use of these scripts.
The scripts that don't use `grpcurl` to pull data require the `grpcio` Python package at runtime and generating the necessary gRPC protocol code requires the `grpcio-tools` package. Information about how to install both can be found at https://grpc.io/docs/languages/python/quickstart/ The scripts that don't use `grpcurl` to pull data require the `grpcio` Python package at runtime and generating the necessary gRPC protocol code requires the `grpcio-tools` package. Information about how to install both can be found at https://grpc.io/docs/languages/python/quickstart/
@ -17,7 +17,11 @@ The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) f
## Usage ## Usage
For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example: Of the 3 groups below, the grpc scripts are really the only ones being actively developed. The others are mostly by way of example of what could be done with the underlying data.
### The JSON parser script
`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example:
``` ```
grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py
``` ```
@ -26,9 +30,13 @@ For more usage options, run:
python parseJsonHistory.py -h python parseJsonHistory.py -h
``` ```
When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it. When used as-is, `parseJsonHistory.py` will summarize packet loss information from the data the dish records. There's other bits of data in there, though, so that script (or more likely the parsing logic it uses, which now resides in `starlink_json.py`) could be used as a starting point or example of how to iterate through it. Most of the data displayed in the Statistics page of the Starlink app appears to come from this same `get_history` gRPC response. See the file `get_history_notes.txt` for some ramblings on how to interpret it.
The other scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code: The one bit of functionality this script has over the grpc scripts is that it supports capturing the grpcurl output to a file and reading from that, which may be useful if you're collecting data in one place but analyzing it in another. Otherwise, it's probably better to use `dishHistoryStats.py`, described below.
### The grpc scripts
This set of scripts can do the gRPC communication directly, but they require some generated code to support the specific gRPC protocol messages used. These would normally be generated from .proto files that specify those messages, but to date (2020-Dec), SpaceX has not publicly released such files. The gRPC service running on the dish appears to have [server reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) enabled, though. `grpcurl` can use that to extract a protoset file, and the `protoc` compiler can use that to make the necessary generated code:
``` ```
grpcurl -plaintext -protoset-out dish.protoset 192.168.100.1:9200 describe SpaceX.API.Device.Device grpcurl -plaintext -protoset-out dish.protoset 192.168.100.1:9200 describe SpaceX.API.Device.Device
mkdir src mkdir src
@ -41,29 +49,66 @@ python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=.
python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto
python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto
``` ```
Then move the resulting files to where the Python scripts can find them, such as in the same directory as the scripts themselves. Then move the resulting files to where the Python scripts can find them in its import path, such as in the same directory as the scripts themselves.
Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options. Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options. For example:
```
python3 parseHistoryStats.py
```
To collect and record summary stats every hour, you can put something like the following in your user crontab: By default, `parseHistoryStats.py` (and `parseJsonHistory.py`) will output the stats in CSV format. You can use the `-v` option to instead output in a (slightly) more human-readable format.
To collect and record summary stats at the top of every hour, you could put something like the following in your user crontab (assuming you have moved the scripts to ~/bin and made them executable):
``` ```
00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dishHistoryStats.py -H >~/dishStats.csv; ~/bin/dishHistoryStats.py >>~/dishStats.csv 00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dishHistoryStats.py -H >~/dishStats.csv; ~/bin/dishHistoryStats.py >>~/dishStats.csv
``` ```
`dishDumpStatus.py` is even simpler. Just run it as: `dishHistoryInflux.py` and `dishHistoryMqtt.py` are similar, but they send their output to an InfluxDB server and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options.
`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the status data instead of history data, to various data backends. The information they pull is mostly what appears related to the dish in the Debug Data section of the Starlink app. As with the corresponding history scripts, run them with `-h` command line option for usage details.
By default, all of these scripts will pull data once, send it off to the specified data backend, and then exit. They can instead be made to run in a periodic loop by passing a `-t` option to specify loop interval, in seconds. For example, to capture status information to a InfluxDB server every 30 seconds, you could do something like this:
```
python3 dishStatusInflux.py -t 30 [... probably other args to specifiy server options ...]
```
Some of the scripts (currently only the InfluxDB ones) also support specifying options through environment variables. See details in the scripts for the environment variables that map to options.
### Other scripts
`dishDumpStatus.py` is a simple example of how to use the grpc modules (the ones generated by protoc, not `starlink_grpc.py`) directly. Just run it as:
``` ```
python3 dishDumpStatus.py python3 dishDumpStatus.py
``` ```
and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app. and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it.
`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts. Possibly more simple examples to come, as the other scripts have started getting a bit complicated.
## To Be Done (Maybe) ## To Be Done (Maybe)
There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app. There are `reboot` and `dish_stow` requests in the Device protocol, too, so it should be trivial to write a command that initiates dish reboot and stow operations. These are easy enough to do with `grpcurl`, though, as there is no need to parse through the response data. For that matter, they're easy enough to do with the Starlink app.
Proper Python packaging, since some of the scripts are no longer self-contained.
## Other Tidbits ## Other Tidbits
The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses an HTTP/1.1 wrapper, whereas the one on port 9200 uses HTTP/2.0, which is what most gRPC tools expect. The Starlink Android app actually uses port 9201 instead of 9200. Both appear to expose the same gRPC service, but the one on port 9201 uses an HTTP/1.1 wrapper, whereas the one on port 9200 uses HTTP/2.0, which is what most gRPC tools expect.
The Starlink router also exposes a gRPC service, on ports 9000 (HTTP/2.0) and 9001 (HTTP/1.1). The Starlink router also exposes a gRPC service, on ports 9000 (HTTP/2.0) and 9001 (HTTP/1.1).
## Docker for InfluxDB ( & MQTT under development )
Initialization of the container can be performed with the following command:
```
docker run -d --name='starlink-grpc-tools' -e INFLUXDB_HOST={InfluxDB Hostname} \
-e INFLUXDB_PORT={Port, 8086 usually} \
-e INFLUXDB_USER={Optional, InfluxDB Username} \
-e INFLUXDB_PWD={Optional, InfluxDB Password} \
-e INFLUXDB_DB={Pre-created DB name, starlinkstats works well} \
neurocis/starlink-grpc-tools dishStatusInflux.py -v
```
`dishStatusInflux.py -v` is optional and will run same but not -verbose, or you can replace it with one of the other scripts if you wish to run that instead. There is also an `GrafanaDashboard - Starlink Statistics.json` which can be imported to get some charts like:
![image](https://user-images.githubusercontent.com/945191/104257179-ae570000-5431-11eb-986e-3fedd04bfcfb.png)

243
dishHistoryInflux.py Normal file
View file

@ -0,0 +1,243 @@
#!/usr/bin/python3
######################################################################
#
# Write Starlink user terminal packet loss statistics to an InfluxDB
# database.
#
# This script examines the most recent samples from the history data,
# computes several different metrics related to packet loss, and
# writes those to the specified InfluxDB database.
#
######################################################################
import getopt
import datetime
import logging
import os
import sys
import time
import warnings
from influxdb import InfluxDBClient
import starlink_grpc
def main():
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:D:IP:R:SU:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples.
samples_default = 3600
samples = None
print_usage = False
verbose = False
default_loop_time = 0
loop_time = default_loop_time
run_lengths = False
host_default = "localhost"
database_default = "starlinkstats"
icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None
flush_limit = 6
# For each of these check they are both set and not empty string
influxdb_host = os.environ.get("INFLUXDB_HOST")
if influxdb_host:
icargs["host"] = influxdb_host
influxdb_port = os.environ.get("INFLUXDB_PORT")
if influxdb_port:
icargs["port"] = int(influxdb_port)
influxdb_user = os.environ.get("INFLUXDB_USER")
if influxdb_user:
icargs["username"] = influxdb_user
influxdb_pwd = os.environ.get("INFLUXDB_PWD")
if influxdb_pwd:
icargs["password"] = influxdb_pwd
influxdb_db = os.environ.get("INFLUXDB_DB")
if influxdb_db:
icargs["database"] = influxdb_db
influxdb_rp = os.environ.get("INFLUXDB_RP")
if influxdb_rp:
rp = influxdb_rp
influxdb_ssl = os.environ.get("INFLUXDB_SSL")
if influxdb_ssl:
icargs["ssl"] = True
if influxdb_ssl.lower() == "secure":
icargs["verify_ssl"] = True
elif influxdb_ssl.lower() == "insecure":
icargs["verify_ssl"] = False
else:
icargs["verify_ssl"] = influxdb_ssl
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-n":
icargs["host"] = arg
elif opt == "-p":
icargs["port"] = int(arg)
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-t":
loop_time = float(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
icargs["ssl"] = True
icargs["verify_ssl"] = arg
elif opt == "-D":
icargs["database"] = arg
elif opt == "-I":
icargs["ssl"] = True
icargs["verify_ssl"] = False
elif opt == "-P":
icargs["password"] = arg
elif opt == "-R":
rp = arg
elif opt == "-S":
icargs["ssl"] = True
icargs["verify_ssl"] = True
elif opt == "-U":
icargs["username"] = arg
if "password" in icargs and "username" not in icargs:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -a: Parse all valid samples")
print(" -h: Be helpful")
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server")
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: loop interval,")
print(" if set, else " + str(samples_default))
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_loop_time))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify server")
print(" -D <name>: Database name to use, default: " + database_default)
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P <word>: Set password for authentication")
print(" -R <name>: Retention policy name to use")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U <name>: Set username for authentication")
sys.exit(1 if arg_error else 0)
if samples is None:
samples = int(loop_time) if loop_time > 0 else samples_default
logging.basicConfig(format="%(levelname)s: %(message)s")
class GlobalState:
pass
gstate = GlobalState()
gstate.dish_id = None
gstate.points = []
def conn_error(msg, *args):
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if loop_time > 0:
print(msg % args)
else:
logging.error(msg, *args)
def flush_points(client):
try:
client.write_points(gstate.points, retention_policy=rp)
if verbose:
print("Data points written: " + str(len(gstate.points)))
gstate.points.clear()
except Exception as e:
conn_error("Failed writing to InfluxDB database: %s", str(e))
return 1
return 0
def loop_body(client):
if gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id()
if verbose:
print("Using dish ID: " + gstate.dish_id)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting dish ID: %s", str(e))
return 1
timestamp = datetime.datetime.utcnow()
try:
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting ping stats: %s", str(e))
return 1
all_stats = g_stats.copy()
all_stats.update(pd_stats)
if run_lengths:
for k, v in rl_stats.items():
if k.startswith("run_"):
for i, subv in enumerate(v, start=1):
all_stats[k + "_" + str(i)] = subv
else:
all_stats[k] = v
gstate.points.append({
"measurement": "spacex.starlink.user_terminal.ping_stats",
"tags": {
"id": gstate.dish_id
},
"time": timestamp,
"fields": all_stats,
})
if verbose:
print("Data points queued: " + str(len(gstate.points)))
if len(gstate.points) >= flush_limit:
return flush_points(client)
return 0
if "verify_ssl" in icargs and not icargs["verify_ssl"]:
# user has explicitly said be insecure, so don't warn about it
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
influx_client = InfluxDBClient(**icargs)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(influx_client)
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
finally:
if gstate.points:
rc = flush_points(influx_client)
influx_client.close()
sys.exit(rc)
if __name__ == '__main__':
main()

185
dishHistoryMqtt.py Normal file
View file

@ -0,0 +1,185 @@
#!/usr/bin/python3
######################################################################
#
# Publish Starlink user terminal packet loss statistics to a MQTT
# broker.
#
# This script examines the most recent samples from the history data,
# computes several different metrics related to packet loss, and
# publishes those to the specified MQTT broker.
#
######################################################################
import getopt
import logging
import sys
import time
try:
import ssl
ssl_ok = True
except ImportError:
ssl_ok = False
import paho.mqtt.publish
import starlink_grpc
def main():
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:ISP:U:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples.
samples_default = 3600
samples = None
print_usage = False
verbose = False
default_loop_time = 0
loop_time = default_loop_time
run_lengths = False
host_default = "localhost"
mqargs = {"hostname": host_default}
username = None
password = None
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-n":
mqargs["hostname"] = arg
elif opt == "-p":
mqargs["port"] = int(arg)
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-t":
loop_time = float(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
mqargs["tls"] = {"ca_certs": arg}
elif opt == "-I":
if ssl_ok:
mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE}
else:
print("No SSL support found")
sys.exit(1)
elif opt == "-P":
password = arg
elif opt == "-S":
mqargs["tls"] = {}
elif opt == "-U":
username = arg
if username is None and password is not None:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -a: Parse all valid samples")
print(" -h: Be helpful")
print(" -n <name>: Hostname of MQTT broker, default: " + host_default)
print(" -p <num>: Port number to use on MQTT broker")
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: loop interval,")
print(" if set, else " + str(samples_default))
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_loop_time))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify broker")
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P: Set password for username/password authentication")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U: Set username for authentication")
sys.exit(1 if arg_error else 0)
if samples is None:
samples = int(loop_time) if loop_time > 0 else samples_default
if username is not None:
mqargs["auth"] = {"username": username}
if password is not None:
mqargs["auth"]["password"] = password
logging.basicConfig(format="%(levelname)s: %(message)s")
class GlobalState:
pass
gstate = GlobalState()
gstate.dish_id = None
def conn_error(msg, *args):
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if loop_time > 0:
print(msg % args)
else:
logging.error(msg, *args)
def loop_body():
if gstate.dish_id is None:
try:
gstate.dish_id = starlink_grpc.get_id()
if verbose:
print("Using dish ID: " + gstate.dish_id)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting dish ID: %s", str(e))
return 1
try:
g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
except starlink_grpc.GrpcError as e:
conn_error("Failure getting ping stats: %s", str(e))
return 1
topic_prefix = "starlink/dish_ping_stats/" + gstate.dish_id + "/"
msgs = [(topic_prefix + k, v, 0, False) for k, v in g_stats.items()]
msgs.extend([(topic_prefix + k, v, 0, False) for k, v in pd_stats.items()])
if run_lengths:
for k, v in rl_stats.items():
if k.startswith("run_"):
msgs.append((topic_prefix + k, ",".join(str(x) for x in v), 0, False))
else:
msgs.append((topic_prefix + k, v, 0, False))
try:
paho.mqtt.publish.multiple(msgs, client_id=gstate.dish_id, **mqargs)
if verbose:
print("Successfully published to MQTT broker")
except Exception as e:
conn_error("Failed publishing to MQTT broker: %s", str(e))
return 1
return 0
next_loop = time.monotonic()
while True:
rc = loop_body()
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -11,102 +11,141 @@
###################################################################### ######################################################################
import datetime import datetime
import sys
import getopt import getopt
import logging
import sys
import time
import starlink_grpc import starlink_grpc
arg_error = False
try: def main():
opts, args = getopt.getopt(sys.argv[1:], "ahrs:vH") arg_error = False
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# Default to 1 hour worth of data samples. try:
samples_default = 3600 opts, args = getopt.getopt(sys.argv[1:], "ahrs:t:vH")
samples = samples_default except getopt.GetoptError as err:
print_usage = False print(str(err))
verbose = False
parse_all = False
print_header = False
run_lengths = False
if not arg_error:
if len(args) > 0:
arg_error = True arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
parse_all = True
elif opt == "-h":
print_usage = True
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-H":
print_header = True
if print_usage or arg_error: # Default to 1 hour worth of data samples.
print("Usage: " + sys.argv[0] + " [options...]") samples_default = 3600
print("Options:") samples = None
print(" -a: Parse all valid samples") print_usage = False
print(" -h: Be helpful") verbose = False
print(" -r: Include ping drop run length stats") default_loop_time = 0
print(" -s <num>: Parse <num> data samples, default: " + str(samples_default)) loop_time = default_loop_time
print(" -v: Be verbose") run_lengths = False
print(" -H: print CSV header instead of parsing file") print_header = False
sys.exit(1 if arg_error else 0)
fields, rl_fields = starlink_grpc.history_ping_field_names() if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-a":
samples = -1
elif opt == "-h":
print_usage = True
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-t":
loop_time = float(arg)
elif opt == "-v":
verbose = True
elif opt == "-H":
print_header = True
if print_header: if print_usage or arg_error:
header = ["datetimestamp_utc"] print("Usage: " + sys.argv[0] + " [options...]")
header.extend(fields) print("Options:")
if run_lengths: print(" -a: Parse all valid samples")
for field in rl_fields: print(" -h: Be helpful")
if field.startswith("run_"): print(" -r: Include ping drop run length stats")
header.extend(field + "_" + str(x) for x in range(1, 61)) print(" -s <num>: Number of data samples to parse, default: loop interval,")
else: print(" if set, else " + str(samples_default))
header.append(field) print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
print(",".join(header)) str(default_loop_time))
sys.exit(0) print(" -v: Be verbose")
print(" -H: print CSV header instead of parsing history data")
sys.exit(1 if arg_error else 0)
timestamp = datetime.datetime.utcnow() if samples is None:
samples = int(loop_time) if loop_time > 0 else samples_default
stats, rl_stats = starlink_grpc.history_ping_stats(-1 if parse_all else samples, logging.basicConfig(format="%(levelname)s: %(message)s")
verbose)
if stats is None or rl_stats is None: g_fields, pd_fields, rl_fields = starlink_grpc.history_ping_field_names()
# verbose output already happened, so just bail.
sys.exit(1)
if verbose: if print_header:
print("Parsed samples: " + str(stats["samples"])) header = ["datetimestamp_utc"]
print("Total ping drop: " + str(stats["total_ping_drop"])) header.extend(g_fields)
print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) header.extend(pd_fields)
print("Obstructed: " + str(stats["count_obstructed"])) if run_lengths:
print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) for field in rl_fields:
print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) if field.startswith("run_"):
print("Unscheduled: " + str(stats["count_unscheduled"])) header.extend(field + "_" + str(x) for x in range(1, 61))
print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) else:
print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) header.append(field)
if run_lengths: print(",".join(header))
print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) sys.exit(0)
print("Final drop run fragment: " + str(rl_stats["final_run_fragment"]))
print("Per-second drop runs: " + ", ".join(str(x) for x in rl_stats["run_seconds"])) def loop_body():
print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) timestamp = datetime.datetime.utcnow()
else:
csv_data = [timestamp.replace(microsecond=0).isoformat()] try:
csv_data.extend(str(stats[field]) for field in fields) g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose)
if run_lengths: except starlink_grpc.GrpcError as e:
for field in rl_fields: logging.error("Failure getting ping stats: %s", str(e))
if field.startswith("run_"): return 1
csv_data.extend(str(substat) for substat in rl_stats[field])
else: if verbose:
csv_data.append(str(rl_stats[field])) print("Parsed samples: " + str(g_stats["samples"]))
print(",".join(csv_data)) print("Total ping drop: " + str(pd_stats["total_ping_drop"]))
print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"]))
print("Obstructed: " + str(pd_stats["count_obstructed"]))
print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"]))
print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"]))
print("Unscheduled: " + str(pd_stats["count_unscheduled"]))
print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"]))
print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"]))
if run_lengths:
print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"]))
print("Final drop run fragment: " + str(rl_stats["final_run_fragment"]))
print("Per-second drop runs: " +
", ".join(str(x) for x in rl_stats["run_seconds"]))
print("Per-minute drop runs: " +
", ".join(str(x) for x in rl_stats["run_minutes"]))
if loop_time > 0:
print()
else:
csv_data = [timestamp.replace(microsecond=0).isoformat()]
csv_data.extend(str(g_stats[field]) for field in g_fields)
csv_data.extend(str(pd_stats[field]) for field in pd_fields)
if run_lengths:
for field in rl_fields:
if field.startswith("run_"):
csv_data.extend(str(substat) for substat in rl_stats[field])
else:
csv_data.append(str(rl_stats[field]))
print(",".join(csv_data))
return 0
next_loop = time.monotonic()
while True:
rc = loop_body()
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -1,51 +1,147 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Output get_status info in CSV format. # Output Starlink user terminal status info in CSV format.
# #
# This script pulls the current status once and prints to stdout. # This script pulls the current status and prints to stdout either
# once or in a periodic loop.
# #
###################################################################### ######################################################################
import datetime import datetime
import getopt
import logging
import sys
import time
import grpc import grpc
import spacex.api.device.device_pb2 import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc import spacex.api.device.device_pb2_grpc
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
timestamp = datetime.datetime.utcnow() def main():
arg_error = False
status = response.dish_get_status try:
opts, args = getopt.getopt(sys.argv[1:], "ht:H")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
# More alerts may be added in future, so rather than list them individually, print_usage = False
# build a bit field based on field numbers of the DishAlerts message. default_loop_time = 0
alert_bits = 0 loop_time = default_loop_time
for alert in status.alerts.ListFields(): print_header = False
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1)
csv_data = [ if not arg_error:
timestamp.replace(microsecond=0).isoformat(), if len(args) > 0:
status.device_info.id, arg_error = True
status.device_info.hardware_version, else:
status.device_info.software_version, for opt, arg in opts:
spacex.api.device.dish_pb2.DishState.Name(status.state) if opt == "-h":
] print_usage = True
csv_data.extend(str(x) for x in [ elif opt == "-t":
status.device_state.uptime_s, loop_time = float(arg)
status.snr, elif opt == "-H":
status.seconds_to_first_nonempty_slot, print_header = True
status.pop_ping_drop_rate,
status.downlink_throughput_bps, if print_usage or arg_error:
status.uplink_throughput_bps, print("Usage: " + sys.argv[0] + " [options...]")
status.pop_ping_latency_ms, print("Options:")
alert_bits, print(" -h: Be helpful")
status.obstruction_stats.fraction_obstructed, print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
status.obstruction_stats.currently_obstructed, str(default_loop_time))
status.obstruction_stats.last_24h_obstructed_s print(" -H: print CSV header instead of parsing file")
]) sys.exit(1 if arg_error else 0)
csv_data.extend(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed)
print(",".join(csv_data)) logging.basicConfig(format="%(levelname)s: %(message)s")
if print_header:
header = [
"datetimestamp_utc",
"hardware_version",
"software_version",
"state",
"uptime",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"alerts",
"fraction_obstructed",
"currently_obstructed",
"seconds_obstructed",
]
header.extend("wedges_fraction_obstructed_" + str(x) for x in range(12))
print(",".join(header))
sys.exit(0)
def loop_body():
timestamp = datetime.datetime.utcnow()
try:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status
# More alerts may be added in future, so rather than list them individually,
# build a bit field based on field numbers of the DishAlerts message.
alert_bits = 0
for alert in status.alerts.ListFields():
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1)
csv_data = [
timestamp.replace(microsecond=0).isoformat(),
status.device_info.id,
status.device_info.hardware_version,
status.device_info.software_version,
spacex.api.device.dish_pb2.DishState.Name(status.state),
]
csv_data.extend(
str(x) for x in [
status.device_state.uptime_s,
status.snr,
status.seconds_to_first_nonempty_slot,
status.pop_ping_drop_rate,
status.downlink_throughput_bps,
status.uplink_throughput_bps,
status.pop_ping_latency_ms,
alert_bits,
status.obstruction_stats.fraction_obstructed,
status.obstruction_stats.currently_obstructed,
status.obstruction_stats.last_24h_obstructed_s,
])
csv_data.extend(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed)
rc = 0
except grpc.RpcError:
if loop_time <= 0:
logging.error("Failed getting status info")
csv_data = [
timestamp.replace(microsecond=0).isoformat(), "", "", "", "DISH_UNREACHABLE"
]
rc = 1
print(",".join(csv_data))
return rc
next_loop = time.monotonic()
while True:
rc = loop_body()
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -1,118 +1,270 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Write get_status info to an InfluxDB database. # Write Starlink user terminal status info to an InfluxDB database.
# #
# This script will periodically poll current status and write it to # This script will poll current status and write it to the specified
# the specified InfluxDB database in a loop. # InfluxDB database either once or in a periodic loop.
# #
###################################################################### ######################################################################
import time
from influxdb import InfluxDBClient import getopt
from influxdb import SeriesHelper import logging
import os
import sys
import time
import warnings
import grpc import grpc
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
import spacex.api.device.device_pb2 import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc import spacex.api.device.device_pb2_grpc
verbose = True
sleep_time = 30
class DeviceStatusSeries(SeriesHelper): def main():
class Meta: arg_error = False
series_name = "spacex.starlink.user_terminal.status"
fields = [
"hardware_version",
"software_version",
"state",
"alert_motors_stuck",
"alert_thermal_throttle",
"alert_thermal_shutdown",
"alert_unexpected_location",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"currently_obstructed",
"fraction_obstructed"]
tags = ["id"]
influx_client = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15)
try:
dish_channel = None
last_id = None
last_failed = False
pending = 0
count = 0
while True:
try:
if dish_channel is None:
dish_channel = grpc.insecure_channel("192.168.100.1:9200")
stub = spacex.api.device.device_pb2_grpc.DeviceStub(dish_channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status
DeviceStatusSeries(
id=status.device_info.id,
hardware_version=status.device_info.hardware_version,
software_version=status.device_info.software_version,
state=spacex.api.device.dish_pb2.DishState.Name(status.state),
alert_motors_stuck=status.alerts.motors_stuck,
alert_thermal_throttle=status.alerts.thermal_throttle,
alert_thermal_shutdown=status.alerts.thermal_shutdown,
alert_unexpected_location=status.alerts.unexpected_location,
snr=status.snr,
seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot,
pop_ping_drop_rate=status.pop_ping_drop_rate,
downlink_throughput_bps=status.downlink_throughput_bps,
uplink_throughput_bps=status.uplink_throughput_bps,
pop_ping_latency_ms=status.pop_ping_latency_ms,
currently_obstructed=status.obstruction_stats.currently_obstructed,
fraction_obstructed=status.obstruction_stats.fraction_obstructed)
last_id = status.device_info.id
last_failed = False
except grpc.RpcError:
if dish_channel is not None:
dish_channel.close()
dish_channel = None
if last_failed:
if last_id is not None:
DeviceStatusSeries(id=last_id, state="DISH_UNREACHABLE")
else:
# Retry once, because the connection may have been lost while
# we were sleeping
last_failed = True
continue
pending = pending + 1
if verbose:
print("Samples: " + str(pending))
count = count + 1
if count > 5:
try:
DeviceStatusSeries.commit(influx_client)
if verbose:
print("Wrote " + str(pending))
pending = 0
except Exception as e:
print("Failed to write: " + str(e))
count = 0
if sleep_time > 0:
time.sleep(sleep_time)
else:
break
finally:
# Flush on error/exit
try: try:
DeviceStatusSeries.commit(influx_client) opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:D:IP:R:SU:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
print_usage = False
verbose = False
default_loop_time = 0
loop_time = default_loop_time
host_default = "localhost"
database_default = "starlinkstats"
icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None
flush_limit = 6
# For each of these check they are both set and not empty string
influxdb_host = os.environ.get("INFLUXDB_HOST")
if influxdb_host:
icargs["host"] = influxdb_host
influxdb_port = os.environ.get("INFLUXDB_PORT")
if influxdb_port:
icargs["port"] = int(influxdb_port)
influxdb_user = os.environ.get("INFLUXDB_USER")
if influxdb_user:
icargs["username"] = influxdb_user
influxdb_pwd = os.environ.get("INFLUXDB_PWD")
if influxdb_pwd:
icargs["password"] = influxdb_pwd
influxdb_db = os.environ.get("INFLUXDB_DB")
if influxdb_db:
icargs["database"] = influxdb_db
influxdb_rp = os.environ.get("INFLUXDB_RP")
if influxdb_rp:
rp = influxdb_rp
influxdb_ssl = os.environ.get("INFLUXDB_SSL")
if influxdb_ssl:
icargs["ssl"] = True
if influxdb_ssl.lower() == "secure":
icargs["verify_ssl"] = True
elif influxdb_ssl.lower() == "insecure":
icargs["verify_ssl"] = False
else:
icargs["verify_ssl"] = influxdb_ssl
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-h":
print_usage = True
elif opt == "-n":
icargs["host"] = arg
elif opt == "-p":
icargs["port"] = int(arg)
elif opt == "-t":
loop_time = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
icargs["ssl"] = True
icargs["verify_ssl"] = arg
elif opt == "-D":
icargs["database"] = arg
elif opt == "-I":
icargs["ssl"] = True
icargs["verify_ssl"] = False
elif opt == "-P":
icargs["password"] = arg
elif opt == "-R":
rp = arg
elif opt == "-S":
icargs["ssl"] = True
icargs["verify_ssl"] = True
elif opt == "-U":
icargs["username"] = arg
if "password" in icargs and "username" not in icargs:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -h: Be helpful")
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server")
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_loop_time))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify server")
print(" -D <name>: Database name to use, default: " + database_default)
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P <word>: Set password for authentication")
print(" -R <name>: Retention policy name to use")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U <name>: Set username for authentication")
sys.exit(1 if arg_error else 0)
logging.basicConfig(format="%(levelname)s: %(message)s")
class GlobalState:
pass
gstate = GlobalState()
gstate.dish_channel = None
gstate.dish_id = None
gstate.pending = 0
class DeviceStatusSeries(SeriesHelper):
class Meta:
series_name = "spacex.starlink.user_terminal.status"
fields = [
"hardware_version",
"software_version",
"state",
"alert_motors_stuck",
"alert_thermal_throttle",
"alert_thermal_shutdown",
"alert_unexpected_location",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"currently_obstructed",
"fraction_obstructed",
]
tags = ["id"]
retention_policy = rp
def conn_error(msg, *args):
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if loop_time > 0:
print(msg % args)
else:
logging.error(msg, *args)
def flush_pending(client):
try:
DeviceStatusSeries.commit(client)
if verbose:
print("Data points written: " + str(gstate.pending))
gstate.pending = 0
except Exception as e:
conn_error("Failed writing to InfluxDB database: %s", str(e))
return 1
return 0
def get_status_retry():
"""Try getting the status at most twice"""
channel_reused = True
while True:
try:
if gstate.dish_channel is None:
gstate.dish_channel = grpc.insecure_channel("192.168.100.1:9200")
channel_reused = False
stub = spacex.api.device.device_pb2_grpc.DeviceStub(gstate.dish_channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
except grpc.RpcError:
gstate.dish_channel.close()
gstate.dish_channel = None
if channel_reused:
# If the channel was open already, the connection may have
# been lost in the time since prior loop iteration, so after
# closing it, retry once, in case the dish is now reachable.
if verbose:
print("Dish RPC channel error")
else:
raise
def loop_body(client):
try:
status = get_status_retry()
DeviceStatusSeries(id=status.device_info.id,
hardware_version=status.device_info.hardware_version,
software_version=status.device_info.software_version,
state=spacex.api.device.dish_pb2.DishState.Name(status.state),
alert_motors_stuck=status.alerts.motors_stuck,
alert_thermal_throttle=status.alerts.thermal_throttle,
alert_thermal_shutdown=status.alerts.thermal_shutdown,
alert_unexpected_location=status.alerts.unexpected_location,
snr=status.snr,
seconds_to_first_nonempty_slot=status.seconds_to_first_nonempty_slot,
pop_ping_drop_rate=status.pop_ping_drop_rate,
downlink_throughput_bps=status.downlink_throughput_bps,
uplink_throughput_bps=status.uplink_throughput_bps,
pop_ping_latency_ms=status.pop_ping_latency_ms,
currently_obstructed=status.obstruction_stats.currently_obstructed,
fraction_obstructed=status.obstruction_stats.fraction_obstructed)
gstate.dish_id = status.device_info.id
except grpc.RpcError:
if gstate.dish_id is None:
conn_error("Dish unreachable and ID unknown, so not recording state")
return 1
else:
if verbose:
print("Dish unreachable")
DeviceStatusSeries(id=gstate.dish_id, state="DISH_UNREACHABLE")
gstate.pending += 1
if verbose: if verbose:
print("Wrote " + str(pending)) print("Data points queued: " + str(gstate.pending))
except Exception as e: if gstate.pending >= flush_limit:
print("Failed to write: " + str(e)) return flush_pending(client)
influx_client.close()
if dish_channel is not None: return 0
dish_channel.close()
if "verify_ssl" in icargs and not icargs["verify_ssl"]:
# user has explicitly said be insecure, so don't warn about it
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
influx_client = InfluxDBClient(**icargs)
try:
next_loop = time.monotonic()
while True:
rc = loop_body(influx_client)
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
finally:
# Flush on error/exit
if gstate.pending:
rc = flush_pending(influx_client)
influx_client.close()
if gstate.dish_channel is not None:
gstate.dish_channel.close()
sys.exit(rc)
if __name__ == '__main__':
main()

View file

@ -1,50 +1,188 @@
#!/usr/bin/python3 #!/usr/bin/python3
###################################################################### ######################################################################
# #
# Publish get_status info to a MQTT broker. # Publish Starlink user terminal status info to a MQTT broker.
# #
# This script pulls the current status once and publishes it to the # This script pulls the current status and publishes it to the
# specified MQTT broker. # specified MQTT broker either once or in a periodic loop.
# #
###################################################################### ######################################################################
import paho.mqtt.publish
import getopt
import logging
import sys
import time
try:
import ssl
ssl_ok = True
except ImportError:
ssl_ok = False
import grpc import grpc
import paho.mqtt.publish
import spacex.api.device.device_pb2 import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc import spacex.api.device.device_pb2_grpc
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status def main():
arg_error = False
# More alerts may be added in future, so rather than list them individually, try:
# build a bit field based on field numbers of the DishAlerts message. opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:ISP:U:")
alert_bits = 0 except getopt.GetoptError as err:
for alert in status.alerts.ListFields(): print(str(err))
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1) arg_error = True
topic_prefix = "starlink/dish_status/" + status.device_info.id + "/" print_usage = False
msgs = [(topic_prefix + "hardware_version", status.device_info.hardware_version, 0, False), verbose = False
(topic_prefix + "software_version", status.device_info.software_version, 0, False), default_loop_time = 0
(topic_prefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False), loop_time = default_loop_time
(topic_prefix + "uptime", status.device_state.uptime_s, 0, False), host_default = "localhost"
(topic_prefix + "snr", status.snr, 0, False), mqargs = {"hostname": host_default}
(topic_prefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False), username = None
(topic_prefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False), password = None
(topic_prefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False),
(topic_prefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False),
(topic_prefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False),
(topic_prefix + "alerts", alert_bits, 0, False),
(topic_prefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False),
(topic_prefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False),
# While the field name for this one implies it covers 24 hours, the
# empirical evidence suggests it only covers 12 hours. It also resets
# on dish reboot, so may not cover that whole period. Rather than try
# to convey that complexity in the topic label, just be a bit vague:
(topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False),
(topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)]
paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id) if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-h":
print_usage = True
elif opt == "-n":
mqargs["hostname"] = arg
elif opt == "-p":
mqargs["port"] = int(arg)
elif opt == "-t":
loop_time = float(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
mqargs["tls"] = {"ca_certs": arg}
elif opt == "-I":
if ssl_ok:
mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE}
else:
print("No SSL support found")
sys.exit(1)
elif opt == "-P":
password = arg
elif opt == "-S":
mqargs["tls"] = {}
elif opt == "-U":
username = arg
if username is None and password is not None:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -h: Be helpful")
print(" -n <name>: Hostname of MQTT broker, default: " + host_default)
print(" -p <num>: Port number to use on MQTT broker")
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_loop_time))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify broker")
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P: Set password for username/password authentication")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U: Set username for authentication")
sys.exit(1 if arg_error else 0)
if username is not None:
mqargs["auth"] = {"username": username}
if password is not None:
mqargs["auth"]["password"] = password
logging.basicConfig(format="%(levelname)s: %(message)s")
class GlobalState:
pass
gstate = GlobalState()
gstate.dish_id = None
def conn_error(msg, *args):
# Connection errors that happen in an interval loop are not critical
# failures, but are interesting enough to print in non-verbose mode.
if loop_time > 0:
print(msg % args)
else:
logging.error(msg, *args)
def loop_body():
try:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
status = response.dish_get_status
# More alerts may be added in future, so rather than list them individually,
# build a bit field based on field numbers of the DishAlerts message.
alert_bits = 0
for alert in status.alerts.ListFields():
alert_bits |= (1 if alert[1] else 0) << (alert[0].number - 1)
gstate.dish_id = status.device_info.id
topic_prefix = "starlink/dish_status/" + gstate.dish_id + "/"
msgs = [
(topic_prefix + "hardware_version", status.device_info.hardware_version, 0, False),
(topic_prefix + "software_version", status.device_info.software_version, 0, False),
(topic_prefix + "state", spacex.api.device.dish_pb2.DishState.Name(status.state), 0, False),
(topic_prefix + "uptime", status.device_state.uptime_s, 0, False),
(topic_prefix + "snr", status.snr, 0, False),
(topic_prefix + "seconds_to_first_nonempty_slot", status.seconds_to_first_nonempty_slot, 0, False),
(topic_prefix + "pop_ping_drop_rate", status.pop_ping_drop_rate, 0, False),
(topic_prefix + "downlink_throughput_bps", status.downlink_throughput_bps, 0, False),
(topic_prefix + "uplink_throughput_bps", status.uplink_throughput_bps, 0, False),
(topic_prefix + "pop_ping_latency_ms", status.pop_ping_latency_ms, 0, False),
(topic_prefix + "alerts", alert_bits, 0, False),
(topic_prefix + "fraction_obstructed", status.obstruction_stats.fraction_obstructed, 0, False),
(topic_prefix + "currently_obstructed", status.obstruction_stats.currently_obstructed, 0, False),
# While the field name for this one implies it covers 24 hours, the
# empirical evidence suggests it only covers 12 hours. It also resets
# on dish reboot, so may not cover that whole period. Rather than try
# to convey that complexity in the topic label, just be a bit vague:
(topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False),
(topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False),
]
except grpc.RpcError:
if gstate.dish_id is None:
conn_error("Dish unreachable and ID unknown, so not recording state")
return 1
if verbose:
print("Dish unreachable")
topic_prefix = "starlink/dish_status/" + gstate.dish_id + "/"
msgs = [(topic_prefix + "state", "DISH_UNREACHABLE", 0, False)]
try:
paho.mqtt.publish.multiple(msgs, client_id=gstate.dish_id, **mqargs)
if verbose:
print("Successfully published to MQTT broker")
except Exception as e:
conn_error("Failed publishing to MQTT broker: %s", str(e))
return 1
return 0
next_loop = time.monotonic()
while True:
rc = loop_body()
if loop_time > 0:
now = time.monotonic()
next_loop = max(next_loop + loop_time, now)
time.sleep(next_loop - now)
else:
break
sys.exit(rc)
if __name__ == '__main__':
main()

13
entrypoint.sh Normal file
View file

@ -0,0 +1,13 @@
#!/bin/sh
printenv >> /etc/environment
ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
grpcurl -plaintext -protoset-out dish.protoset 192.168.100.1:9200 describe SpaceX.API.Device.Device
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/device.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/common/status/status.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/command.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/common.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/dish.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto
python3 -m grpc_tools.protoc --descriptor_set_in=dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto
/usr/local/bin/python3 $@

View file

@ -16,6 +16,7 @@
import datetime import datetime
import sys import sys
import getopt import getopt
import logging
import starlink_json import starlink_json
@ -32,7 +33,6 @@ samples_default = 3600
samples = samples_default samples = samples_default
print_usage = False print_usage = False
verbose = False verbose = False
parse_all = False
print_header = False print_header = False
run_lengths = False run_lengths = False
@ -42,7 +42,7 @@ if not arg_error:
else: else:
for opt, arg in opts: for opt, arg in opts:
if opt == "-a": if opt == "-a":
parse_all = True samples = -1
elif opt == "-h": elif opt == "-h":
print_usage = True print_usage = True
elif opt == "-r": elif opt == "-r":
@ -61,16 +61,19 @@ if print_usage or arg_error:
print(" -a: Parse all valid samples") print(" -a: Parse all valid samples")
print(" -h: Be helpful") print(" -h: Be helpful")
print(" -r: Include ping drop run length stats") print(" -r: Include ping drop run length stats")
print(" -s <num>: Parse <num> data samples, default: " + str(samples_default)) print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose") print(" -v: Be verbose")
print(" -H: print CSV header instead of parsing file") print(" -H: print CSV header instead of parsing file")
sys.exit(1 if arg_error else 0) sys.exit(1 if arg_error else 0)
fields, rl_fields = starlink_json.history_ping_field_names() logging.basicConfig(format="%(levelname)s: %(message)s")
g_fields, pd_fields, rl_fields = starlink_json.history_ping_field_names()
if print_header: if print_header:
header = ["datetimestamp_utc"] header = ["datetimestamp_utc"]
header.extend(fields) header.extend(g_fields)
header.extend(pd_fields)
if run_lengths: if run_lengths:
for field in rl_fields: for field in rl_fields:
if field.startswith("run_"): if field.startswith("run_"):
@ -82,24 +85,23 @@ if print_header:
timestamp = datetime.datetime.utcnow() timestamp = datetime.datetime.utcnow()
stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", try:
-1 if parse_all else samples, g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-",
verbose) samples, verbose)
except starlink_json.JsonError as e:
if stats is None or rl_stats is None: logging.error("Failure getting ping stats: %s", str(e))
# verbose output already happened, so just bail.
sys.exit(1) sys.exit(1)
if verbose: if verbose:
print("Parsed samples: " + str(stats["samples"])) print("Parsed samples: " + str(g_stats["samples"]))
print("Total ping drop: " + str(stats["total_ping_drop"])) print("Total ping drop: " + str(pd_stats["total_ping_drop"]))
print("Count of drop == 1: " + str(stats["count_full_ping_drop"])) print("Count of drop == 1: " + str(pd_stats["count_full_ping_drop"]))
print("Obstructed: " + str(stats["count_obstructed"])) print("Obstructed: " + str(pd_stats["count_obstructed"]))
print("Obstructed ping drop: " + str(stats["total_obstructed_ping_drop"])) print("Obstructed ping drop: " + str(pd_stats["total_obstructed_ping_drop"]))
print("Obstructed drop == 1: " + str(stats["count_full_obstructed_ping_drop"])) print("Obstructed drop == 1: " + str(pd_stats["count_full_obstructed_ping_drop"]))
print("Unscheduled: " + str(stats["count_unscheduled"])) print("Unscheduled: " + str(pd_stats["count_unscheduled"]))
print("Unscheduled ping drop: " + str(stats["total_unscheduled_ping_drop"])) print("Unscheduled ping drop: " + str(pd_stats["total_unscheduled_ping_drop"]))
print("Unscheduled drop == 1: " + str(stats["count_full_unscheduled_ping_drop"])) print("Unscheduled drop == 1: " + str(pd_stats["count_full_unscheduled_ping_drop"]))
if run_lengths: if run_lengths:
print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"])) print("Initial drop run fragment: " + str(rl_stats["init_run_fragment"]))
print("Final drop run fragment: " + str(rl_stats["final_run_fragment"])) print("Final drop run fragment: " + str(rl_stats["final_run_fragment"]))
@ -107,7 +109,8 @@ if verbose:
print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"])) print("Per-minute drop runs: " + ", ".join(str(x) for x in rl_stats["run_minutes"]))
else: else:
csv_data = [timestamp.replace(microsecond=0).isoformat()] csv_data = [timestamp.replace(microsecond=0).isoformat()]
csv_data.extend(str(stats[field]) for field in fields) csv_data.extend(str(g_stats[field]) for field in g_fields)
csv_data.extend(str(pd_stats[field]) for field in pd_fields)
if run_lengths: if run_lengths:
for field in rl_fields: for field in rl_fields:
if field.startswith("run_"): if field.startswith("run_"):

View file

@ -4,13 +4,17 @@ This module may eventually contain more expansive parsing logic, but for now
it contains functions to parse the history data for some specific packet loss it contains functions to parse the history data for some specific packet loss
statistics. statistics.
General ping drop (packet loss) statistics: General statistics:
This group of statistics characterize the packet loss (labeled "ping drop" This group of statistics contains data relevant to all the other groups.
in the field names of the Starlink gRPC service protocol) in various ways.
The sample interval is currently 1 second. The sample interval is currently 1 second.
samples: The number of valid samples analyzed. samples: The number of valid samples analyzed.
General ping drop (packet loss) statistics:
This group of statistics characterize the packet loss (labeled "ping drop"
in the field names of the Starlink gRPC service protocol) in various ways.
total_ping_drop: The total amount of time, in sample intervals, that total_ping_drop: The total amount of time, in sample intervals, that
experienced ping drop. experienced ping drop.
count_full_ping_drop: The number of samples that experienced 100% count_full_ping_drop: The number of samples that experienced 100%
@ -62,13 +66,13 @@ Ping drop run length statistics:
No sample should be counted in more than one of the run length stats or No sample should be counted in more than one of the run length stats or
stat elements, so the total of all of them should be equal to stat elements, so the total of all of them should be equal to
count_full_ping_drop from the general stats. count_full_ping_drop from the ping drop stats.
Samples that experience less than 100% ping drop are not counted in this Samples that experience less than 100% ping drop are not counted in this
group of stats, even if they happen at the beginning or end of a run of group of stats, even if they happen at the beginning or end of a run of
100% ping drop samples. To compute the amount of time that experienced 100% ping drop samples. To compute the amount of time that experienced
ping loss in less than a single run of 100% ping drop, use ping loss in less than a single run of 100% ping drop, use
(total_ping_drop - count_full_ping_drop) from the general stats. (total_ping_drop - count_full_ping_drop) from the ping drop stats.
""" """
from itertools import chain from itertools import chain
@ -78,15 +82,61 @@ import grpc
import spacex.api.device.device_pb2 import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc import spacex.api.device.device_pb2_grpc
class GrpcError(Exception):
"""Provides error info when something went wrong with a gRPC call."""
def __init__(self, e, *args, **kwargs):
# grpc.RpcError is too verbose to print in whole, but it may also be
# a Call object, and that class has some minimally useful info.
if isinstance(e, grpc.Call):
msg = e.details()
elif isinstance(e, grpc.RpcError):
msg = "Unknown communication or service error"
else:
msg = str(e)
super().__init__(msg, *args, **kwargs)
def get_status():
"""Fetch status data and return it in grpc structure format.
Raises:
grpc.RpcError: Communication or service error.
"""
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
return response.dish_get_status
def get_id():
"""Return the ID from the dish status information.
Returns:
A string identifying the Starlink user terminal reachable from the
local network.
Raises:
GrpcError: No user terminal is currently reachable.
"""
try:
status = get_status()
return status.device_info.id
except grpc.RpcError as e:
raise GrpcError(e)
def history_ping_field_names(): def history_ping_field_names():
"""Return the field names of the packet loss stats. """Return the field names of the packet loss stats.
Returns: Returns:
A tuple with 2 lists, the first with general stat names and the A tuple with 3 lists, the first with general stat names, the second
second with ping drop run length stat names. with ping drop stat names, and the third with ping drop run length
stat names.
""" """
return [ return [
"samples", "samples",
], [
"total_ping_drop", "total_ping_drop",
"count_full_ping_drop", "count_full_ping_drop",
"count_obstructed", "count_obstructed",
@ -94,14 +144,15 @@ def history_ping_field_names():
"count_full_obstructed_ping_drop", "count_full_obstructed_ping_drop",
"count_unscheduled", "count_unscheduled",
"total_unscheduled_ping_drop", "total_unscheduled_ping_drop",
"count_full_unscheduled_ping_drop" "count_full_unscheduled_ping_drop",
], [ ], [
"init_run_fragment", "init_run_fragment",
"final_run_fragment", "final_run_fragment",
"run_seconds", "run_seconds",
"run_minutes" "run_minutes",
] ]
def get_history(): def get_history():
"""Fetch history data and return it in grpc structure format. """Fetch history data and return it in grpc structure format.
@ -113,6 +164,7 @@ def get_history():
response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={}))
return response.dish_get_history return response.dish_get_history
def history_ping_stats(parse_samples, verbose=False): def history_ping_stats(parse_samples, verbose=False):
"""Fetch, parse, and compute the packet loss stats. """Fetch, parse, and compute the packet loss stats.
@ -122,19 +174,18 @@ def history_ping_stats(parse_samples, verbose=False):
verbose (bool): Optionally produce verbose output. verbose (bool): Optionally produce verbose output.
Returns: Returns:
On success, a tuple with 2 dicts, the first mapping general stat names A tuple with 3 dicts, the first mapping general stat names to their
to their values and the second mapping ping drop run length stat names values, the second mapping ping drop stat names to their values and
to their values. the third mapping ping drop run length stat names to their values.
On failure, the tuple (None, None). Raises:
GrpcError: Failed getting history info from the Starlink user
terminal.
""" """
try: try:
history = get_history() history = get_history()
except grpc.RpcError: except grpc.RpcError as e:
if verbose: raise GrpcError(e)
# RpcError is too verbose to print the details.
print("Failed getting history")
return None, None
# 'current' is the count of data samples written to the ring buffer, # 'current' is the count of data samples written to the ring buffer,
# irrespective of buffer wrap. # irrespective of buffer wrap.
@ -154,13 +205,13 @@ def history_ping_stats(parse_samples, verbose=False):
# index to next data sample after the newest one. # index to next data sample after the newest one.
offset = current % samples offset = current % samples
tot = 0 tot = 0.0
count_full_drop = 0 count_full_drop = 0
count_unsched = 0 count_unsched = 0
total_unsched_drop = 0 total_unsched_drop = 0.0
count_full_unsched = 0 count_full_unsched = 0
count_obstruct = 0 count_obstruct = 0
total_obstruct_drop = 0 total_obstruct_drop = 0.0
count_full_obstruct = 0 count_full_obstruct = 0
second_runs = [0] * 60 second_runs = [0] * 60
@ -180,9 +231,10 @@ def history_ping_stats(parse_samples, verbose=False):
for i in sample_range: for i in sample_range:
d = history.pop_ping_drop_rate[i] d = history.pop_ping_drop_rate[i]
tot += d
if d >= 1: if d >= 1:
count_full_drop += d # just in case...
d = 1
count_full_drop += 1
run_length += 1 run_length += 1
elif run_length > 0: elif run_length > 0:
if init_run_length is None: if init_run_length is None:
@ -191,7 +243,7 @@ def history_ping_stats(parse_samples, verbose=False):
if run_length <= 60: if run_length <= 60:
second_runs[run_length - 1] += run_length second_runs[run_length - 1] += run_length
else: else:
minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length minute_runs[min((run_length-1) // 60 - 1, 59)] += run_length
run_length = 0 run_length = 0
elif init_run_length is None: elif init_run_length is None:
init_run_length = 0 init_run_length = 0
@ -199,7 +251,7 @@ def history_ping_stats(parse_samples, verbose=False):
count_unsched += 1 count_unsched += 1
total_unsched_drop += d total_unsched_drop += d
if d >= 1: if d >= 1:
count_full_unsched += d count_full_unsched += 1
# scheduled=false and obstructed=true do not ever appear to overlap, # scheduled=false and obstructed=true do not ever appear to overlap,
# but in case they do in the future, treat that as just unscheduled # but in case they do in the future, treat that as just unscheduled
# in order to avoid double-counting it. # in order to avoid double-counting it.
@ -207,7 +259,8 @@ def history_ping_stats(parse_samples, verbose=False):
count_obstruct += 1 count_obstruct += 1
total_obstruct_drop += d total_obstruct_drop += d
if d >= 1: if d >= 1:
count_full_obstruct += d count_full_obstruct += 1
tot += d
# If the entire sample set is one big drop run, it will be both initial # If the entire sample set is one big drop run, it will be both initial
# fragment (continued from prior sample range) and final one (continued # fragment (continued from prior sample range) and final one (continued
@ -219,6 +272,7 @@ def history_ping_stats(parse_samples, verbose=False):
return { return {
"samples": parse_samples, "samples": parse_samples,
}, {
"total_ping_drop": tot, "total_ping_drop": tot,
"count_full_ping_drop": count_full_drop, "count_full_ping_drop": count_full_drop,
"count_obstructed": count_obstruct, "count_obstructed": count_obstruct,
@ -226,10 +280,10 @@ def history_ping_stats(parse_samples, verbose=False):
"count_full_obstructed_ping_drop": count_full_obstruct, "count_full_obstructed_ping_drop": count_full_obstruct,
"count_unscheduled": count_unsched, "count_unscheduled": count_unsched,
"total_unscheduled_ping_drop": total_unsched_drop, "total_unscheduled_ping_drop": total_unsched_drop,
"count_full_unscheduled_ping_drop": count_full_unsched "count_full_unscheduled_ping_drop": count_full_unsched,
}, { }, {
"init_run_fragment": init_run_length, "init_run_fragment": init_run_length,
"final_run_fragment": run_length, "final_run_fragment": run_length,
"run_seconds": second_runs, "run_seconds": second_runs,
"run_minutes": minute_runs "run_minutes": minute_runs,
} }

View file

@ -14,15 +14,22 @@ import sys
from itertools import chain from itertools import chain
class JsonError(Exception):
"""Provides error info when something went wrong with JSON parsing."""
def history_ping_field_names(): def history_ping_field_names():
"""Return the field names of the packet loss stats. """Return the field names of the packet loss stats.
Returns: Returns:
A tuple with 2 lists, the first with general stat names and the A tuple with 3 lists, the first with general stat names, the second
second with ping drop run length stat names. with ping drop stat names, and the third with ping drop run length
stat names.
""" """
return [ return [
"samples", "samples",
], [
"total_ping_drop", "total_ping_drop",
"count_full_ping_drop", "count_full_ping_drop",
"count_obstructed", "count_obstructed",
@ -30,31 +37,34 @@ def history_ping_field_names():
"count_full_obstructed_ping_drop", "count_full_obstructed_ping_drop",
"count_unscheduled", "count_unscheduled",
"total_unscheduled_ping_drop", "total_unscheduled_ping_drop",
"count_full_unscheduled_ping_drop" "count_full_unscheduled_ping_drop",
], [ ], [
"init_run_fragment", "init_run_fragment",
"final_run_fragment", "final_run_fragment",
"run_seconds", "run_seconds",
"run_minutes" "run_minutes",
] ]
def get_history(filename): def get_history(filename):
"""Read JSON data and return the raw history in dict format. """Read JSON data and return the raw history in dict format.
Args: Args:
filename (str): Filename from which to read JSON data, or "-" to read filename (str): Filename from which to read JSON data, or "-" to read
from standard input. from standard input.
Raises:
Various exceptions depending on Python version: Failure to open or
read input or invalid JSON read on input.
""" """
if filename == "-": if filename == "-":
json_data = json.load(sys.stdin) json_data = json.load(sys.stdin)
else: else:
json_file = open(filename) with open(filename) as json_file:
try:
json_data = json.load(json_file) json_data = json.load(json_file)
finally:
json_file.close()
return json_data["dishGetHistory"] return json_data["dishGetHistory"]
def history_ping_stats(filename, parse_samples, verbose=False): def history_ping_stats(filename, parse_samples, verbose=False):
"""Fetch, parse, and compute the packet loss stats. """Fetch, parse, and compute the packet loss stats.
@ -66,18 +76,19 @@ def history_ping_stats(filename, parse_samples, verbose=False):
verbose (bool): Optionally produce verbose output. verbose (bool): Optionally produce verbose output.
Returns: Returns:
On success, a tuple with 2 dicts, the first mapping general stat names A tuple with 3 dicts, the first mapping general stat names to their
to their values and the second mapping ping drop run length stat names values, the second mapping ping drop stat names to their values and
to their values. the third mapping ping drop run length stat names to their values.
On failure, the tuple (None, None). Raises:
JsonError: Failure to open, read, or parse JSON on input.
""" """
try: try:
history = get_history(filename) history = get_history(filename)
except ValueError as e:
raise JsonError("Failed to parse JSON: " + str(e))
except Exception as e: except Exception as e:
if verbose: raise JsonError(e)
print("Failed getting history: " + str(e))
return None, None
# "current" is the count of data samples written to the ring buffer, # "current" is the count of data samples written to the ring buffer,
# irrespective of buffer wrap. # irrespective of buffer wrap.
@ -97,13 +108,13 @@ def history_ping_stats(filename, parse_samples, verbose=False):
# index to next data sample after the newest one. # index to next data sample after the newest one.
offset = current % samples offset = current % samples
tot = 0 tot = 0.0
count_full_drop = 0 count_full_drop = 0
count_unsched = 0 count_unsched = 0
total_unsched_drop = 0 total_unsched_drop = 0.0
count_full_unsched = 0 count_full_unsched = 0
count_obstruct = 0 count_obstruct = 0
total_obstruct_drop = 0 total_obstruct_drop = 0.0
count_full_obstruct = 0 count_full_obstruct = 0
second_runs = [0] * 60 second_runs = [0] * 60
@ -123,9 +134,10 @@ def history_ping_stats(filename, parse_samples, verbose=False):
for i in sample_range: for i in sample_range:
d = history["popPingDropRate"][i] d = history["popPingDropRate"][i]
tot += d
if d >= 1: if d >= 1:
count_full_drop += d # just in case...
d = 1
count_full_drop += 1
run_length += 1 run_length += 1
elif run_length > 0: elif run_length > 0:
if init_run_length is None: if init_run_length is None:
@ -134,7 +146,7 @@ def history_ping_stats(filename, parse_samples, verbose=False):
if run_length <= 60: if run_length <= 60:
second_runs[run_length - 1] += run_length second_runs[run_length - 1] += run_length
else: else:
minute_runs[min((run_length - 1)//60 - 1, 59)] += run_length minute_runs[min((run_length-1) // 60 - 1, 59)] += run_length
run_length = 0 run_length = 0
elif init_run_length is None: elif init_run_length is None:
init_run_length = 0 init_run_length = 0
@ -142,7 +154,7 @@ def history_ping_stats(filename, parse_samples, verbose=False):
count_unsched += 1 count_unsched += 1
total_unsched_drop += d total_unsched_drop += d
if d >= 1: if d >= 1:
count_full_unsched += d count_full_unsched += 1
# scheduled=false and obstructed=true do not ever appear to overlap, # scheduled=false and obstructed=true do not ever appear to overlap,
# but in case they do in the future, treat that as just unscheduled # but in case they do in the future, treat that as just unscheduled
# in order to avoid double-counting it. # in order to avoid double-counting it.
@ -150,7 +162,8 @@ def history_ping_stats(filename, parse_samples, verbose=False):
count_obstruct += 1 count_obstruct += 1
total_obstruct_drop += d total_obstruct_drop += d
if d >= 1: if d >= 1:
count_full_obstruct += d count_full_obstruct += 1
tot += d
# If the entire sample set is one big drop run, it will be both initial # If the entire sample set is one big drop run, it will be both initial
# fragment (continued from prior sample range) and final one (continued # fragment (continued from prior sample range) and final one (continued
@ -162,6 +175,7 @@ def history_ping_stats(filename, parse_samples, verbose=False):
return { return {
"samples": parse_samples, "samples": parse_samples,
}, {
"total_ping_drop": tot, "total_ping_drop": tot,
"count_full_ping_drop": count_full_drop, "count_full_ping_drop": count_full_drop,
"count_obstructed": count_obstruct, "count_obstructed": count_obstruct,
@ -169,10 +183,10 @@ def history_ping_stats(filename, parse_samples, verbose=False):
"count_full_obstructed_ping_drop": count_full_obstruct, "count_full_obstructed_ping_drop": count_full_obstruct,
"count_unscheduled": count_unsched, "count_unscheduled": count_unsched,
"total_unscheduled_ping_drop": total_unsched_drop, "total_unscheduled_ping_drop": total_unsched_drop,
"count_full_unscheduled_ping_drop": count_full_unsched "count_full_unscheduled_ping_drop": count_full_unsched,
}, { }, {
"init_run_fragment": init_run_length, "init_run_fragment": init_run_length,
"final_run_fragment": run_length, "final_run_fragment": run_length,
"run_seconds": second_runs, "run_seconds": second_runs,
"run_minutes": minute_runs "run_minutes": minute_runs,
} }