SSL/TLS support for InfluxDB and MQTT scripts

SSL/TLS support for InfluxDB and MQTT scripts

Copy the command line option handling into the status scripts to facilitate this. Also copy the setting from env from dishStatusInflux_cron.py.

Better error handling for failures while writing to the data backend. Error printing verbosity is now a bit inconsistent, but I'll address that separately.

Still to be done is dishStatusInflux_cron.py, pending a decision on what to do with that script, given that dishStatusInflux.py can now be run in one-shot mode.

This is related to issue #2.
This commit is contained in:
sparky8512 2021-01-10 21:36:44 -08:00
parent f067f08952
commit ce44f3c021
6 changed files with 350 additions and 29 deletions

View file

@ -17,7 +17,7 @@ The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) f
## Usage
For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example:
`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example:
```
grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py
```
@ -41,7 +41,7 @@ python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=.
python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto
python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto
```
Then move the resulting files to where the Python scripts can find them, such as in the same directory as the scripts themselves.
Then move the resulting files to where the Python scripts can find them.
Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options.
@ -50,13 +50,15 @@ To collect and record summary stats every hour, you can put something like the f
00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dishHistoryStats.py -H >~/dishStats.csv; ~/bin/dishHistoryStats.py >>~/dishStats.csv
```
`dishHistoryInflux.py` and `dishHistoryMqtt.py` are similar, but they send their output to an InfluxDB server and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options.
`dishDumpStatus.py` is even simpler. Just run it as:
```
python3 dishDumpStatus.py
```
and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app.
`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts.
`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. As with the corresponding history scripts, run them with `-h` command line option for usage details.
## To Be Done (Maybe)

View file

@ -11,9 +11,11 @@
######################################################################
import datetime
import os
import sys
import getopt
import warnings
from influxdb import InfluxDBClient
import starlink_grpc
@ -21,7 +23,7 @@ import starlink_grpc
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vD:P:R:U:")
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vC:D:IP:R:SU:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
@ -37,6 +39,35 @@ database_default = "dishstats"
icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None
# For each of these check they are both set and not empty string
influxdb_host = os.environ.get("INFLUXDB_HOST")
if influxdb_host:
icargs["host"] = influxdb_host
influxdb_port = os.environ.get("INFLUXDB_PORT")
if influxdb_port:
icargs["port"] = int(influxdb_port)
influxdb_user = os.environ.get("INFLUXDB_USER")
if influxdb_user:
icargs["username"] = influxdb_user
influxdb_pwd = os.environ.get("INFLUXDB_PWD")
if influxdb_pwd:
icargs["password"] = influxdb_pwd
influxdb_db = os.environ.get("INFLUXDB_DB")
if influxdb_db:
icargs["database"] = influxdb_db
influxdb_rp = os.environ.get("INFLUXDB_RP")
if influxdb_rp:
rp = influxdb_rp
influxdb_ssl = os.environ.get("INFLUXDB_SSL")
if influxdb_ssl:
icargs["ssl"] = True
if influxdb_ssl.lower() == "secure":
icargs["verify_ssl"] = True
elif influxdb_ssl.lower() == "insecure":
icargs["verify_ssl"] = False
else:
icargs["verify_ssl"] = influxdb_ssl
if not arg_error:
if len(args) > 0:
arg_error = True
@ -56,12 +87,21 @@ if not arg_error:
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
icargs["ssl"] = True
icargs["verify_ssl"] = arg
elif opt == "-D":
icargs["database"] = arg
elif opt == "-I":
icargs["ssl"] = True
icargs["verify_ssl"] = False
elif opt == "-P":
icargs["password"] = arg
elif opt == "-R":
rp = arg
elif opt == "-S":
icargs["ssl"] = True
icargs["verify_ssl"] = True
elif opt == "-U":
icargs["username"] = arg
@ -79,9 +119,12 @@ if print_usage or arg_error:
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify server")
print(" -D <name>: Database name to use, default: " + database_default)
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P <word>: Set password for authentication")
print(" -R <name>: Retention policy name to use")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U <name>: Set username for authentication")
sys.exit(1 if arg_error else 0)
@ -117,8 +160,17 @@ points = [{
"fields": all_stats,
}]
if "verify_ssl" in icargs and not icargs["verify_ssl"]:
# user has explicitly said be insecure, so don't warn about it
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
influx_client = InfluxDBClient(**icargs)
try:
influx_client.write_points(points, retention_policy=rp)
rc = 0
except Exception as e:
print("Failed writing to InfluxDB database: " + str(e))
rc = 1
finally:
influx_client.close()
sys.exit(rc)

View file

@ -13,6 +13,12 @@
import sys
import getopt
try:
import ssl
ssl_ok = True
except ImportError:
ssl_ok = False
import paho.mqtt.publish
import starlink_grpc
@ -20,7 +26,7 @@ import starlink_grpc
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vU:P:")
opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vC:ISP:U:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
@ -32,8 +38,7 @@ print_usage = False
verbose = False
run_lengths = False
host_default = "localhost"
host = host_default
port = None
mqargs = {"hostname": host_default}
username = None
password = None
@ -47,17 +52,27 @@ if not arg_error:
elif opt == "-h":
print_usage = True
elif opt == "-n":
host = arg
mqargs["hostname"] = arg
elif opt == "-p":
port = int(arg)
mqargs["port"] = int(arg)
elif opt == "-r":
run_lengths = True
elif opt == "-s":
samples = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
mqargs["tls"] = {"ca_certs": arg}
elif opt == "-I":
if ssl_ok:
mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE}
else:
print("No SSL support found")
sys.exit(1)
elif opt == "-P":
password = arg
elif opt == "-S":
mqargs["tls"] = {}
elif opt == "-U":
username = arg
@ -75,7 +90,10 @@ if print_usage or arg_error:
print(" -r: Include ping drop run length stats")
print(" -s <num>: Number of data samples to parse, default: " + str(samples_default))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify broker")
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P: Set password for username/password authentication")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U: Set username for authentication")
sys.exit(1 if arg_error else 0)
@ -102,12 +120,13 @@ if run_lengths:
else:
msgs.append((topic_prefix + k, v, 0, False))
optargs = {}
if username is not None:
auth = {"username": username}
mqargs["auth"] = {"username": username}
if password is not None:
auth["password"] = password
optargs["auth"] = auth
if port is not None:
optargs["port"] = port
paho.mqtt.publish.multiple(msgs, hostname=host, client_id=dish_id, **optargs)
mqargs["auth"]["password"] = password
try:
paho.mqtt.publish.multiple(msgs, client_id=dish_id, **mqargs)
except Exception as e:
print("Failed publishing to MQTT broker: " + str(e))
sys.exit(1)

View file

@ -6,16 +6,73 @@
# This script pulls the current status once and prints to stdout.
#
######################################################################
import datetime
import sys
import getopt
import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
with grpc.insecure_channel("192.168.100.1:9200") as channel:
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "hH")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
print_usage = False
print_header = False
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-h":
print_usage = True
elif opt == "-H":
print_header = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -h: Be helpful")
print(" -H: print CSV header instead of parsing file")
sys.exit(1 if arg_error else 0)
if print_header:
header = [
"datetimestamp_utc",
"hardware_version",
"software_version",
"state",
"uptime",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"alerts",
"fraction_obstructed",
"currently_obstructed",
"seconds_obstructed"
]
header.extend("wedges_fraction_obstructed_" + str(x) for x in range(12))
print(",".join(header))
sys.exit(0)
try:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
except grpc.RpcError:
print("Failed getting status info")
sys.exit(1)
timestamp = datetime.datetime.utcnow()

View file

@ -3,12 +3,17 @@
#
# Write Starlink user terminal status info to an InfluxDB database.
#
# This script will periodically poll current status and write it to
# the specified InfluxDB database in a loop.
# This script will poll current status and write it to the specified
# InfluxDB database either once or in a periodic loop.
#
######################################################################
import time
import time
import os
import sys
import getopt
import warnings
from influxdb import InfluxDBClient
from influxdb import SeriesHelper
@ -17,8 +22,106 @@ import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
verbose = True
sleep_time = 30
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:D:IP:R:SU:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
print_usage = False
verbose = False
host_default = "localhost"
database_default = "dishstats"
icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None
default_sleep_time = 30
sleep_time = default_sleep_time
# For each of these check they are both set and not empty string
influxdb_host = os.environ.get("INFLUXDB_HOST")
if influxdb_host:
icargs["host"] = influxdb_host
influxdb_port = os.environ.get("INFLUXDB_PORT")
if influxdb_port:
icargs["port"] = int(influxdb_port)
influxdb_user = os.environ.get("INFLUXDB_USER")
if influxdb_user:
icargs["username"] = influxdb_user
influxdb_pwd = os.environ.get("INFLUXDB_PWD")
if influxdb_pwd:
icargs["password"] = influxdb_pwd
influxdb_db = os.environ.get("INFLUXDB_DB")
if influxdb_db:
icargs["database"] = influxdb_db
influxdb_rp = os.environ.get("INFLUXDB_RP")
if influxdb_rp:
rp = influxdb_rp
influxdb_ssl = os.environ.get("INFLUXDB_SSL")
if influxdb_ssl:
icargs["ssl"] = True
if influxdb_ssl.lower() == "secure":
icargs["verify_ssl"] = True
elif influxdb_ssl.lower() == "insecure":
icargs["verify_ssl"] = False
else:
icargs["verify_ssl"] = influxdb_ssl
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-h":
print_usage = True
elif opt == "-n":
icargs["host"] = arg
elif opt == "-p":
icargs["port"] = int(arg)
elif opt == "-t":
sleep_time = int(arg)
elif opt == "-v":
verbose = True
elif opt == "-C":
icargs["ssl"] = True
icargs["verify_ssl"] = arg
elif opt == "-D":
icargs["database"] = arg
elif opt == "-I":
icargs["ssl"] = True
icargs["verify_ssl"] = False
elif opt == "-P":
icargs["password"] = arg
elif opt == "-R":
rp = arg
elif opt == "-S":
icargs["ssl"] = True
icargs["verify_ssl"] = True
elif opt == "-U":
icargs["username"] = arg
if "password" in icargs and "username" not in icargs:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -h: Be helpful")
print(" -n <name>: Hostname of InfluxDB server, default: " + host_default)
print(" -p <num>: Port number to use on InfluxDB server")
print(" -t <num>: Loop interval in seconds or 0 for no loop, default: " +
str(default_sleep_time))
print(" -v: Be verbose")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify server")
print(" -D <name>: Database name to use, default: " + database_default)
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P <word>: Set password for authentication")
print(" -R <name>: Retention policy name to use")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U <name>: Set username for authentication")
sys.exit(1 if arg_error else 0)
class DeviceStatusSeries(SeriesHelper):
class Meta:
@ -40,8 +143,13 @@ class DeviceStatusSeries(SeriesHelper):
"currently_obstructed",
"fraction_obstructed"]
tags = ["id"]
retention_policy = rp
influx_client = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15)
if "verify_ssl" in icargs and not icargs["verify_ssl"]:
# user has explicitly said be insecure, so don't warn about it
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
influx_client = InfluxDBClient(**icargs)
try:
dish_channel = None
@ -111,8 +219,11 @@ finally:
DeviceStatusSeries.commit(influx_client)
if verbose:
print("Wrote " + str(pending))
rc = 0
except Exception as e:
print("Failed to write: " + str(e))
rc = 1
influx_client.close()
if dish_channel is not None:
dish_channel.close()
sys.exit(rc)

View file

@ -7,6 +7,16 @@
# specified MQTT broker.
#
######################################################################
import sys
import getopt
try:
import ssl
ssl_ok = True
except ImportError:
ssl_ok = False
import paho.mqtt.publish
import grpc
@ -14,9 +24,70 @@ import grpc
import spacex.api.device.device_pb2
import spacex.api.device.device_pb2_grpc
with grpc.insecure_channel("192.168.100.1:9200") as channel:
arg_error = False
try:
opts, args = getopt.getopt(sys.argv[1:], "hn:p:C:ISP:U:")
except getopt.GetoptError as err:
print(str(err))
arg_error = True
print_usage = False
host_default = "localhost"
mqargs = {"hostname": host_default}
username = None
password = None
if not arg_error:
if len(args) > 0:
arg_error = True
else:
for opt, arg in opts:
if opt == "-h":
print_usage = True
elif opt == "-n":
mqargs["hostname"] = arg
elif opt == "-p":
mqargs["port"] = int(arg)
elif opt == "-C":
mqargs["tls"] = {"ca_certs": arg}
elif opt == "-I":
if ssl_ok:
mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE}
else:
print("No SSL support found")
sys.exit(1)
elif opt == "-P":
password = arg
elif opt == "-S":
mqargs["tls"] = {}
elif opt == "-U":
username = arg
if username is None and password is not None:
print("Password authentication requires username to be set")
arg_error = True
if print_usage or arg_error:
print("Usage: " + sys.argv[0] + " [options...]")
print("Options:")
print(" -h: Be helpful")
print(" -n <name>: Hostname of MQTT broker, default: " + host_default)
print(" -p <num>: Port number to use on MQTT broker")
print(" -C <filename>: Enable SSL/TLS using specified CA cert to verify broker")
print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)")
print(" -P: Set password for username/password authentication")
print(" -S: Enable SSL/TLS using default CA cert")
print(" -U: Set username for authentication")
sys.exit(1 if arg_error else 0)
try:
with grpc.insecure_channel("192.168.100.1:9200") as channel:
stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel)
response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={}))
except grpc.RpcError:
print("Failed getting status info")
sys.exit(1)
status = response.dish_get_status
@ -47,4 +118,13 @@ msgs = [(topic_prefix + "hardware_version", status.device_info.hardware_version,
(topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False),
(topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)]
paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id)
if username is not None:
mqargs["auth"] = {"username": username}
if password is not None:
mqargs["auth"]["password"] = password
try:
paho.mqtt.publish.multiple(msgs, client_id=status.device_info.id, **mqargs)
except Exception as e:
print("Failed publishing to MQTT broker: " + str(e))
sys.exit(1)