From ce44f3c021d7afa63a5cfcac78fb042f9e89ba1b Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sun, 10 Jan 2021 21:36:44 -0800 Subject: [PATCH] SSL/TLS support for InfluxDB and MQTT scripts SSL/TLS support for InfluxDB and MQTT scripts Copy the command line option handling into the status scripts to facilitate this. Also copy the setting from env from dishStatusInflux_cron.py. Better error handling for failures while writing to the data backend. Error printing verbosity is now a bit inconsistent, but I'll address that separately. Still to be done is dishStatusInflux_cron.py, pending a decision on what to do with that script, given that dishStatusInflux.py can now be run in one-shot mode. This is related to issue #2. --- README.md | 8 +-- dishHistoryInflux.py | 54 ++++++++++++++++++- dishHistoryMqtt.py | 43 ++++++++++----- dishStatusCsv.py | 63 ++++++++++++++++++++-- dishStatusInflux.py | 123 ++++++++++++++++++++++++++++++++++++++++--- dishStatusMqtt.py | 88 +++++++++++++++++++++++++++++-- 6 files changed, 350 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 6d79491..cc071c5 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) f ## Usage -For `parseJsonHistory.py`, the easiest way to use it is to pipe the `grpcurl` command directly into it. For example: +`parseJsonHistory.py` takes input from a file and writes its output to standard output. The easiest way to use it is to pipe the `grpcurl` command directly into it. For example: ``` grpcurl -plaintext -d {\"get_history\":{}} 192.168.100.1:9200 SpaceX.API.Device.Device/Handle | python parseJsonHistory.py ``` @@ -41,7 +41,7 @@ python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi.proto python3 -m grpc_tools.protoc --descriptor_set_in=../dish.protoset --python_out=. --grpc_python_out=. spacex/api/device/wifi_config.proto ``` -Then move the resulting files to where the Python scripts can find them, such as in the same directory as the scripts themselves. +Then move the resulting files to where the Python scripts can find them. Once those are available, the `dishHistoryStats.py` script can be used in place of the `grpcurl | parseJsonHistory.py` pipeline, with most of the same command line options. @@ -50,13 +50,15 @@ To collect and record summary stats every hour, you can put something like the f 00 * * * * [ -e ~/dishStats.csv ] || ~/bin/dishHistoryStats.py -H >~/dishStats.csv; ~/bin/dishHistoryStats.py >>~/dishStats.csv ``` +`dishHistoryInflux.py` and `dishHistoryMqtt.py` are similar, but they send their output to an InfluxDB server and a MQTT broker, respectively. Run them with `-h` command line option for details on how to specify server and/or database options. + `dishDumpStatus.py` is even simpler. Just run it as: ``` python3 dishDumpStatus.py ``` and revel in copious amounts of dish status information. OK, maybe it's not as impressive as all that. This one is really just meant to be a starting point for real functionality to be added to it. The information this script pulls is mostly what appears related to the dish in the Debug Data section of the Starlink app. -`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. These scripts currently lack any way to configure them, such as setting server host or authentication credentials, other than by changing the hard-coded values in the scripts. +`dishStatusCsv.py`, `dishStatusInflux.py`, and `dishStatusMqtt.py` output the same status data, but to various data backends. As with the corresponding history scripts, run them with `-h` command line option for usage details. ## To Be Done (Maybe) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 966eca2..bdbb6e8 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -11,9 +11,11 @@ ###################################################################### import datetime +import os import sys import getopt +import warnings from influxdb import InfluxDBClient import starlink_grpc @@ -21,7 +23,7 @@ import starlink_grpc arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vD:P:R:U:") + opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vC:D:IP:R:SU:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -37,6 +39,35 @@ database_default = "dishstats" icargs = {"host": host_default, "timeout": 5, "database": database_default} rp = None +# For each of these check they are both set and not empty string +influxdb_host = os.environ.get("INFLUXDB_HOST") +if influxdb_host: + icargs["host"] = influxdb_host +influxdb_port = os.environ.get("INFLUXDB_PORT") +if influxdb_port: + icargs["port"] = int(influxdb_port) +influxdb_user = os.environ.get("INFLUXDB_USER") +if influxdb_user: + icargs["username"] = influxdb_user +influxdb_pwd = os.environ.get("INFLUXDB_PWD") +if influxdb_pwd: + icargs["password"] = influxdb_pwd +influxdb_db = os.environ.get("INFLUXDB_DB") +if influxdb_db: + icargs["database"] = influxdb_db +influxdb_rp = os.environ.get("INFLUXDB_RP") +if influxdb_rp: + rp = influxdb_rp +influxdb_ssl = os.environ.get("INFLUXDB_SSL") +if influxdb_ssl: + icargs["ssl"] = True + if influxdb_ssl.lower() == "secure": + icargs["verify_ssl"] = True + elif influxdb_ssl.lower() == "insecure": + icargs["verify_ssl"] = False + else: + icargs["verify_ssl"] = influxdb_ssl + if not arg_error: if len(args) > 0: arg_error = True @@ -56,12 +87,21 @@ if not arg_error: samples = int(arg) elif opt == "-v": verbose = True + elif opt == "-C": + icargs["ssl"] = True + icargs["verify_ssl"] = arg elif opt == "-D": icargs["database"] = arg + elif opt == "-I": + icargs["ssl"] = True + icargs["verify_ssl"] = False elif opt == "-P": icargs["password"] = arg elif opt == "-R": rp = arg + elif opt == "-S": + icargs["ssl"] = True + icargs["verify_ssl"] = True elif opt == "-U": icargs["username"] = arg @@ -79,9 +119,12 @@ if print_usage or arg_error: print(" -r: Include ping drop run length stats") print(" -s : Number of data samples to parse, default: " + str(samples_default)) print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify server") print(" -D : Database name to use, default: " + database_default) + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") print(" -P : Set password for authentication") print(" -R : Retention policy name to use") + print(" -S: Enable SSL/TLS using default CA cert") print(" -U : Set username for authentication") sys.exit(1 if arg_error else 0) @@ -117,8 +160,17 @@ points = [{ "fields": all_stats, }] +if "verify_ssl" in icargs and not icargs["verify_ssl"]: + # user has explicitly said be insecure, so don't warn about it + warnings.filterwarnings("ignore", message="Unverified HTTPS request") + influx_client = InfluxDBClient(**icargs) try: influx_client.write_points(points, retention_policy=rp) + rc = 0 +except Exception as e: + print("Failed writing to InfluxDB database: " + str(e)) + rc = 1 finally: influx_client.close() +sys.exit(rc) diff --git a/dishHistoryMqtt.py b/dishHistoryMqtt.py index 0f819e3..2291150 100644 --- a/dishHistoryMqtt.py +++ b/dishHistoryMqtt.py @@ -13,6 +13,12 @@ import sys import getopt +try: + import ssl + ssl_ok = True +except ImportError: + ssl_ok = False + import paho.mqtt.publish import starlink_grpc @@ -20,7 +26,7 @@ import starlink_grpc arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vU:P:") + opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:vC:ISP:U:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -32,8 +38,7 @@ print_usage = False verbose = False run_lengths = False host_default = "localhost" -host = host_default -port = None +mqargs = {"hostname": host_default} username = None password = None @@ -47,17 +52,27 @@ if not arg_error: elif opt == "-h": print_usage = True elif opt == "-n": - host = arg + mqargs["hostname"] = arg elif opt == "-p": - port = int(arg) + mqargs["port"] = int(arg) elif opt == "-r": run_lengths = True elif opt == "-s": samples = int(arg) elif opt == "-v": verbose = True + elif opt == "-C": + mqargs["tls"] = {"ca_certs": arg} + elif opt == "-I": + if ssl_ok: + mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE} + else: + print("No SSL support found") + sys.exit(1) elif opt == "-P": password = arg + elif opt == "-S": + mqargs["tls"] = {} elif opt == "-U": username = arg @@ -75,7 +90,10 @@ if print_usage or arg_error: print(" -r: Include ping drop run length stats") print(" -s : Number of data samples to parse, default: " + str(samples_default)) print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify broker") + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") print(" -P: Set password for username/password authentication") + print(" -S: Enable SSL/TLS using default CA cert") print(" -U: Set username for authentication") sys.exit(1 if arg_error else 0) @@ -102,12 +120,13 @@ if run_lengths: else: msgs.append((topic_prefix + k, v, 0, False)) -optargs = {} if username is not None: - auth = {"username": username} + mqargs["auth"] = {"username": username} if password is not None: - auth["password"] = password - optargs["auth"] = auth -if port is not None: - optargs["port"] = port -paho.mqtt.publish.multiple(msgs, hostname=host, client_id=dish_id, **optargs) + mqargs["auth"]["password"] = password + +try: + paho.mqtt.publish.multiple(msgs, client_id=dish_id, **mqargs) +except Exception as e: + print("Failed publishing to MQTT broker: " + str(e)) + sys.exit(1) diff --git a/dishStatusCsv.py b/dishStatusCsv.py index 98269e8..0c96587 100644 --- a/dishStatusCsv.py +++ b/dishStatusCsv.py @@ -6,16 +6,73 @@ # This script pulls the current status once and prints to stdout. # ###################################################################### + import datetime +import sys +import getopt import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) +arg_error = False + +try: + opts, args = getopt.getopt(sys.argv[1:], "hH") +except getopt.GetoptError as err: + print(str(err)) + arg_error = True + +print_usage = False +print_header = False + +if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-H": + print_header = True + +if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -H: print CSV header instead of parsing file") + sys.exit(1 if arg_error else 0) + +if print_header: + header = [ + "datetimestamp_utc", + "hardware_version", + "software_version", + "state", + "uptime", + "snr", + "seconds_to_first_nonempty_slot", + "pop_ping_drop_rate", + "downlink_throughput_bps", + "uplink_throughput_bps", + "pop_ping_latency_ms", + "alerts", + "fraction_obstructed", + "currently_obstructed", + "seconds_obstructed" + ] + header.extend("wedges_fraction_obstructed_" + str(x) for x in range(12)) + print(",".join(header)) + sys.exit(0) + +try: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) +except grpc.RpcError: + print("Failed getting status info") + sys.exit(1) timestamp = datetime.datetime.utcnow() diff --git a/dishStatusInflux.py b/dishStatusInflux.py index f297c9b..79c809c 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -3,12 +3,17 @@ # # Write Starlink user terminal status info to an InfluxDB database. # -# This script will periodically poll current status and write it to -# the specified InfluxDB database in a loop. +# This script will poll current status and write it to the specified +# InfluxDB database either once or in a periodic loop. # ###################################################################### -import time +import time +import os +import sys +import getopt + +import warnings from influxdb import InfluxDBClient from influxdb import SeriesHelper @@ -17,8 +22,106 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -verbose = True -sleep_time = 30 +arg_error = False + +try: + opts, args = getopt.getopt(sys.argv[1:], "hn:p:t:vC:D:IP:R:SU:") +except getopt.GetoptError as err: + print(str(err)) + arg_error = True + +print_usage = False +verbose = False +host_default = "localhost" +database_default = "dishstats" +icargs = {"host": host_default, "timeout": 5, "database": database_default} +rp = None +default_sleep_time = 30 +sleep_time = default_sleep_time + +# For each of these check they are both set and not empty string +influxdb_host = os.environ.get("INFLUXDB_HOST") +if influxdb_host: + icargs["host"] = influxdb_host +influxdb_port = os.environ.get("INFLUXDB_PORT") +if influxdb_port: + icargs["port"] = int(influxdb_port) +influxdb_user = os.environ.get("INFLUXDB_USER") +if influxdb_user: + icargs["username"] = influxdb_user +influxdb_pwd = os.environ.get("INFLUXDB_PWD") +if influxdb_pwd: + icargs["password"] = influxdb_pwd +influxdb_db = os.environ.get("INFLUXDB_DB") +if influxdb_db: + icargs["database"] = influxdb_db +influxdb_rp = os.environ.get("INFLUXDB_RP") +if influxdb_rp: + rp = influxdb_rp +influxdb_ssl = os.environ.get("INFLUXDB_SSL") +if influxdb_ssl: + icargs["ssl"] = True + if influxdb_ssl.lower() == "secure": + icargs["verify_ssl"] = True + elif influxdb_ssl.lower() == "insecure": + icargs["verify_ssl"] = False + else: + icargs["verify_ssl"] = influxdb_ssl + +if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-n": + icargs["host"] = arg + elif opt == "-p": + icargs["port"] = int(arg) + elif opt == "-t": + sleep_time = int(arg) + elif opt == "-v": + verbose = True + elif opt == "-C": + icargs["ssl"] = True + icargs["verify_ssl"] = arg + elif opt == "-D": + icargs["database"] = arg + elif opt == "-I": + icargs["ssl"] = True + icargs["verify_ssl"] = False + elif opt == "-P": + icargs["password"] = arg + elif opt == "-R": + rp = arg + elif opt == "-S": + icargs["ssl"] = True + icargs["verify_ssl"] = True + elif opt == "-U": + icargs["username"] = arg + +if "password" in icargs and "username" not in icargs: + print("Password authentication requires username to be set") + arg_error = True + +if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -n : Hostname of InfluxDB server, default: " + host_default) + print(" -p : Port number to use on InfluxDB server") + print(" -t : Loop interval in seconds or 0 for no loop, default: " + + str(default_sleep_time)) + print(" -v: Be verbose") + print(" -C : Enable SSL/TLS using specified CA cert to verify server") + print(" -D : Database name to use, default: " + database_default) + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P : Set password for authentication") + print(" -R : Retention policy name to use") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U : Set username for authentication") + sys.exit(1 if arg_error else 0) class DeviceStatusSeries(SeriesHelper): class Meta: @@ -40,8 +143,13 @@ class DeviceStatusSeries(SeriesHelper): "currently_obstructed", "fraction_obstructed"] tags = ["id"] + retention_policy = rp -influx_client = InfluxDBClient(host="localhost", port=8086, username="script-user", password="password", database="dishstats", ssl=False, retries=1, timeout=15) +if "verify_ssl" in icargs and not icargs["verify_ssl"]: + # user has explicitly said be insecure, so don't warn about it + warnings.filterwarnings("ignore", message="Unverified HTTPS request") + +influx_client = InfluxDBClient(**icargs) try: dish_channel = None @@ -111,8 +219,11 @@ finally: DeviceStatusSeries.commit(influx_client) if verbose: print("Wrote " + str(pending)) + rc = 0 except Exception as e: print("Failed to write: " + str(e)) + rc = 1 influx_client.close() if dish_channel is not None: dish_channel.close() + sys.exit(rc) diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py index fff7101..ee4c667 100644 --- a/dishStatusMqtt.py +++ b/dishStatusMqtt.py @@ -7,6 +7,16 @@ # specified MQTT broker. # ###################################################################### + +import sys +import getopt + +try: + import ssl + ssl_ok = True +except ImportError: + ssl_ok = False + import paho.mqtt.publish import grpc @@ -14,9 +24,70 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc -with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) +arg_error = False + +try: + opts, args = getopt.getopt(sys.argv[1:], "hn:p:C:ISP:U:") +except getopt.GetoptError as err: + print(str(err)) + arg_error = True + +print_usage = False +host_default = "localhost" +mqargs = {"hostname": host_default} +username = None +password = None + +if not arg_error: + if len(args) > 0: + arg_error = True + else: + for opt, arg in opts: + if opt == "-h": + print_usage = True + elif opt == "-n": + mqargs["hostname"] = arg + elif opt == "-p": + mqargs["port"] = int(arg) + elif opt == "-C": + mqargs["tls"] = {"ca_certs": arg} + elif opt == "-I": + if ssl_ok: + mqargs["tls"] = {"cert_reqs": ssl.CERT_NONE} + else: + print("No SSL support found") + sys.exit(1) + elif opt == "-P": + password = arg + elif opt == "-S": + mqargs["tls"] = {} + elif opt == "-U": + username = arg + +if username is None and password is not None: + print("Password authentication requires username to be set") + arg_error = True + +if print_usage or arg_error: + print("Usage: " + sys.argv[0] + " [options...]") + print("Options:") + print(" -h: Be helpful") + print(" -n : Hostname of MQTT broker, default: " + host_default) + print(" -p : Port number to use on MQTT broker") + print(" -C : Enable SSL/TLS using specified CA cert to verify broker") + print(" -I: Enable SSL/TLS but disable certificate verification (INSECURE!)") + print(" -P: Set password for username/password authentication") + print(" -S: Enable SSL/TLS using default CA cert") + print(" -U: Set username for authentication") + sys.exit(1 if arg_error else 0) + +try: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) +except grpc.RpcError: + print("Failed getting status info") + sys.exit(1) status = response.dish_get_status @@ -47,4 +118,13 @@ msgs = [(topic_prefix + "hardware_version", status.device_info.hardware_version, (topic_prefix + "seconds_obstructed", status.obstruction_stats.last_24h_obstructed_s, 0, False), (topic_prefix + "wedges_fraction_obstructed", ",".join(str(x) for x in status.obstruction_stats.wedge_abs_fraction_obstructed), 0, False)] -paho.mqtt.publish.multiple(msgs, hostname="localhost", client_id=status.device_info.id) +if username is not None: + mqargs["auth"] = {"username": username} + if password is not None: + mqargs["auth"]["password"] = password + +try: + paho.mqtt.publish.multiple(msgs, client_id=status.device_info.id, **mqargs) +except Exception as e: + print("Failed publishing to MQTT broker: " + str(e)) + sys.exit(1)