diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 9b8fe73..d757901 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -14,6 +14,7 @@ import datetime import os import sys import getopt +import logging import warnings from influxdb import InfluxDBClient @@ -128,19 +129,20 @@ if print_usage or arg_error: print(" -U : Set username for authentication") sys.exit(1 if arg_error else 0) -dish_id = starlink_grpc.get_id() +logging.basicConfig(format="%(levelname)s: %(message)s") -if dish_id is None: - if verbose: - print("Unable to connect to Starlink user terminal") +try: + dish_id = starlink_grpc.get_id() +except starlink_grpc.GrpcError as e: + logging.error("Failure getting dish ID: " + str(e)) sys.exit(1) timestamp = datetime.datetime.utcnow() -g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) - -if g_stats is None: - # verbose output already happened, so just bail. +try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) +except starlink_grpc.GrpcError as e: + logging.error("Failure getting ping stats: " + str(e)) sys.exit(1) all_stats = g_stats.copy() @@ -169,7 +171,7 @@ try: influx_client.write_points(points, retention_policy=rp) rc = 0 except Exception as e: - print("Failed writing to InfluxDB database: " + str(e)) + logging.error("Failed writing to InfluxDB database: " + str(e)) rc = 1 finally: influx_client.close() diff --git a/dishHistoryMqtt.py b/dishHistoryMqtt.py index 2291150..e9267cc 100644 --- a/dishHistoryMqtt.py +++ b/dishHistoryMqtt.py @@ -12,6 +12,7 @@ import sys import getopt +import logging try: import ssl @@ -97,17 +98,18 @@ if print_usage or arg_error: print(" -U: Set username for authentication") sys.exit(1 if arg_error else 0) -dish_id = starlink_grpc.get_id() +logging.basicConfig(format="%(levelname)s: %(message)s") -if dish_id is None: - if verbose: - print("Unable to connect to Starlink user terminal") +try: + dish_id = starlink_grpc.get_id() +except starlink_grpc.GrpcError as e: + logging.error("Failure getting dish ID: " + str(e)) sys.exit(1) -g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) - -if g_stats is None: - # verbose output already happened, so just bail. +try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) +except starlink_grpc.GrpcError as e: + logging.error("Failure getting ping stats: " + str(e)) sys.exit(1) topic_prefix = "starlink/dish_ping_stats/" + dish_id + "/" @@ -128,5 +130,5 @@ if username is not None: try: paho.mqtt.publish.multiple(msgs, client_id=dish_id, **mqargs) except Exception as e: - print("Failed publishing to MQTT broker: " + str(e)) + logging.error("Failed publishing to MQTT broker: " + str(e)) sys.exit(1) diff --git a/dishHistoryStats.py b/dishHistoryStats.py index ab2d9bb..683f490 100644 --- a/dishHistoryStats.py +++ b/dishHistoryStats.py @@ -13,6 +13,7 @@ import datetime import sys import getopt +import logging import starlink_grpc @@ -61,6 +62,8 @@ if print_usage or arg_error: print(" -H: print CSV header instead of parsing file") sys.exit(1 if arg_error else 0) +logging.basicConfig(format="%(levelname)s: %(message)s") + g_fields, pd_fields, rl_fields = starlink_grpc.history_ping_field_names() if print_header: @@ -78,10 +81,10 @@ if print_header: timestamp = datetime.datetime.utcnow() -g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) - -if g_stats is None: - # verbose output already happened, so just bail. +try: + g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) +except starlink_grpc.GrpcError as e: + logging.error("Failure getting ping stats: " + str(e)) sys.exit(1) if verbose: diff --git a/dishStatusCsv.py b/dishStatusCsv.py index 0c96587..c8b7968 100644 --- a/dishStatusCsv.py +++ b/dishStatusCsv.py @@ -10,6 +10,7 @@ import datetime import sys import getopt +import logging import grpc @@ -44,6 +45,8 @@ if print_usage or arg_error: print(" -H: print CSV header instead of parsing file") sys.exit(1 if arg_error else 0) +logging.basicConfig(format="%(levelname)s: %(message)s") + if print_header: header = [ "datetimestamp_utc", @@ -71,7 +74,7 @@ try: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) except grpc.RpcError: - print("Failed getting status info") + logging.error("Failed getting status info") sys.exit(1) timestamp = datetime.datetime.utcnow() diff --git a/dishStatusInflux.py b/dishStatusInflux.py index 2a4987d..98c5d01 100644 --- a/dishStatusInflux.py +++ b/dishStatusInflux.py @@ -12,8 +12,9 @@ import time import os import sys import getopt - +import logging import warnings + from influxdb import InfluxDBClient from influxdb import SeriesHelper @@ -123,6 +124,18 @@ if print_usage or arg_error: print(" -U : Set username for authentication") sys.exit(1 if arg_error else 0) +logging.basicConfig(format="%(levelname)s: %(message)s") + +def conn_error(msg): + # Connection errors that happen while running in an interval loop are + # not critical failures, because they can (usually) be retried, or + # because they will be recorded as dish state unavailable. They're still + # interesting, though, so print them even in non-verbose mode. + if sleep_time > 0: + print(msg) + else: + logging.error(msg) + class DeviceStatusSeries(SeriesHelper): class Meta: series_name = "spacex.starlink.user_terminal.status" @@ -151,6 +164,7 @@ if "verify_ssl" in icargs and not icargs["verify_ssl"]: influx_client = InfluxDBClient(**icargs) +rc = 0 try: dish_channel = None last_id = None @@ -182,6 +196,7 @@ try: pop_ping_latency_ms=status.pop_ping_latency_ms, currently_obstructed=status.obstruction_stats.currently_obstructed, fraction_obstructed=status.obstruction_stats.fraction_obstructed) + pending += 1 last_id = status.device_info.id last_failed = False except grpc.RpcError: @@ -189,25 +204,36 @@ try: dish_channel.close() dish_channel = None if last_failed: - if last_id is not None: + if last_id is None: + conn_error("Dish unreachable and ID unknown, so not recording state") + # When not looping, report this as failure exit status + rc = 1 + else: + if verbose: + print("Dish unreachable") DeviceStatusSeries(id=last_id, state="DISH_UNREACHABLE") + pending += 1 else: + if verbose: + print("Dish RPC channel error") # Retry once, because the connection may have been lost while # we were sleeping last_failed = True continue - pending = pending + 1 if verbose: - print("Samples: " + str(pending)) - count = count + 1 + print("Samples queued: " + str(pending)) + count += 1 if count > 5: try: - DeviceStatusSeries.commit(influx_client) + if pending: + DeviceStatusSeries.commit(influx_client) + rc = 0 if verbose: - print("Wrote " + str(pending)) + print("Samples written: " + str(pending)) pending = 0 except Exception as e: - print("Failed to write: " + str(e)) + conn_error("Failed to write: " + str(e)) + rc = 1 count = 0 if sleep_time > 0: time.sleep(sleep_time) @@ -216,12 +242,13 @@ try: finally: # Flush on error/exit try: - DeviceStatusSeries.commit(influx_client) + if pending: + DeviceStatusSeries.commit(influx_client) + rc = 0 if verbose: - print("Wrote " + str(pending)) - rc = 0 + print("Samples written: " + str(pending)) except Exception as e: - print("Failed to write: " + str(e)) + conn_error("Failed to write: " + str(e)) rc = 1 influx_client.close() if dish_channel is not None: diff --git a/dishStatusMqtt.py b/dishStatusMqtt.py index ee4c667..e91763f 100644 --- a/dishStatusMqtt.py +++ b/dishStatusMqtt.py @@ -10,6 +10,7 @@ import sys import getopt +import logging try: import ssl @@ -81,12 +82,14 @@ if print_usage or arg_error: print(" -U: Set username for authentication") sys.exit(1 if arg_error else 0) +logging.basicConfig(format="%(levelname)s: %(message)s") + try: with grpc.insecure_channel("192.168.100.1:9200") as channel: stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) except grpc.RpcError: - print("Failed getting status info") + logging.error("Failed getting status info") sys.exit(1) status = response.dish_get_status @@ -126,5 +129,5 @@ if username is not None: try: paho.mqtt.publish.multiple(msgs, client_id=status.device_info.id, **mqargs) except Exception as e: - print("Failed publishing to MQTT broker: " + str(e)) + logging.error("Failed publishing to MQTT broker: " + str(e)) sys.exit(1) diff --git a/parseJsonHistory.py b/parseJsonHistory.py index 3e8d1aa..50fe1ff 100644 --- a/parseJsonHistory.py +++ b/parseJsonHistory.py @@ -16,6 +16,7 @@ import datetime import sys import getopt +import logging import starlink_json @@ -65,6 +66,8 @@ if print_usage or arg_error: print(" -H: print CSV header instead of parsing file") sys.exit(1 if arg_error else 0) +logging.basicConfig(format="%(levelname)s: %(message)s") + g_fields, pd_fields, rl_fields = starlink_json.history_ping_field_names() if print_header: @@ -82,11 +85,11 @@ if print_header: timestamp = datetime.datetime.utcnow() -g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", - samples, verbose) - -if g_stats is None: - # verbose output already happened, so just bail. +try: + g_stats, pd_stats, rl_stats = starlink_json.history_ping_stats(args[0] if args else "-", + samples, verbose) +except starlink_json.JsonError as e: + logging.error("Failure getting ping stats: " + str(e)) sys.exit(1) if verbose: diff --git a/starlink_grpc.py b/starlink_grpc.py index 09ec8d0..ec65b14 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -82,6 +82,21 @@ import grpc import spacex.api.device.device_pb2 import spacex.api.device.device_pb2_grpc + +class GrpcError(Exception): + """Provides error info when something went wrong with a gRPC call.""" + def __init__(self, e, *args, **kwargs): + # grpc.RpcError is too verbose to print in whole, but it may also be + # a Call object, and that class has some minimally useful info. + if isinstance(e, grpc.Call): + msg = e.details() + elif isinstance(e, grpc.RpcError): + msg = "Unknown communication or service error" + else: + msg = str(e) + super().__init__(msg, *args, **kwargs) + + def get_status(): """Fetch status data and return it in grpc structure format. @@ -98,13 +113,16 @@ def get_id(): Returns: A string identifying the Starlink user terminal reachable from the - local network, or None if no user terminal is currently reachable. + local network. + + Raises: + GrpcError: No user terminal is currently reachable. """ try: status = get_status() return status.device_info.id - except grpc.RpcError: - return None + except grpc.RpcError as e: + raise GrpcError(e) def history_ping_field_names(): """Return the field names of the packet loss stats. @@ -152,20 +170,18 @@ def history_ping_stats(parse_samples, verbose=False): verbose (bool): Optionally produce verbose output. Returns: - On success, a tuple with 3 dicts, the first mapping general stat names - to their values, the second mapping ping drop stat names to their - values and the third mapping ping drop run length stat names to their - values. + A tuple with 3 dicts, the first mapping general stat names to their + values, the second mapping ping drop stat names to their values and + the third mapping ping drop run length stat names to their values. - On failure, the tuple (None, None, None). + Raises: + GrpcError: Failed getting history info from the Starlink user + terminal. """ try: history = get_history() - except grpc.RpcError: - if verbose: - # RpcError is too verbose to print the details. - print("Failed getting history") - return None, None, None + except grpc.RpcError as e: + raise GrpcError(e) # 'current' is the count of data samples written to the ring buffer, # irrespective of buffer wrap. @@ -231,7 +247,7 @@ def history_ping_stats(parse_samples, verbose=False): count_unsched += 1 total_unsched_drop += d if d >= 1: - count_full_unsched += d + count_full_unsched += 1 # scheduled=false and obstructed=true do not ever appear to overlap, # but in case they do in the future, treat that as just unscheduled # in order to avoid double-counting it. diff --git a/starlink_json.py b/starlink_json.py index 383e875..7396c5a 100644 --- a/starlink_json.py +++ b/starlink_json.py @@ -14,6 +14,11 @@ import sys from itertools import chain + +class JsonError(Exception): + """Provides error info when something went wrong with JSON parsing.""" + + def history_ping_field_names(): """Return the field names of the packet loss stats. @@ -46,15 +51,16 @@ def get_history(filename): Args: filename (str): Filename from which to read JSON data, or "-" to read from standard input. + + Raises: + Various exceptions depending on Python version: Failure to open or + read input or invalid JSON read on input. """ if filename == "-": json_data = json.load(sys.stdin) else: - json_file = open(filename) - try: + with open(filename) as json_file: json_data = json.load(json_file) - finally: - json_file.close() return json_data["dishGetHistory"] def history_ping_stats(filename, parse_samples, verbose=False): @@ -68,19 +74,19 @@ def history_ping_stats(filename, parse_samples, verbose=False): verbose (bool): Optionally produce verbose output. Returns: - On success, a tuple with 3 dicts, the first mapping general stat names - to their values, the second mapping ping drop stat names to their - values and the third mapping ping drop run length stat names to their - values. + A tuple with 3 dicts, the first mapping general stat names to their + values, the second mapping ping drop stat names to their values and + the third mapping ping drop run length stat names to their values. - On failure, the tuple (None, None, None). + Raises: + JsonError: Failure to open, read, or parse JSON on input. """ try: history = get_history(filename) + except ValueError as e: + raise JsonError("Failed to parse JSON: " + str(e)) except Exception as e: - if verbose: - print("Failed getting history: " + str(e)) - return None, None, None + raise JsonError(e) # "current" is the count of data samples written to the ring buffer, # irrespective of buffer wrap. @@ -100,13 +106,13 @@ def history_ping_stats(filename, parse_samples, verbose=False): # index to next data sample after the newest one. offset = current % samples - tot = 0 + tot = 0.0 count_full_drop = 0 count_unsched = 0 - total_unsched_drop = 0 + total_unsched_drop = 0.0 count_full_unsched = 0 count_obstruct = 0 - total_obstruct_drop = 0 + total_obstruct_drop = 0.0 count_full_obstruct = 0 second_runs = [0] * 60 @@ -126,9 +132,10 @@ def history_ping_stats(filename, parse_samples, verbose=False): for i in sample_range: d = history["popPingDropRate"][i] - tot += d if d >= 1: - count_full_drop += d + # just in case... + d = 1 + count_full_drop += 1 run_length += 1 elif run_length > 0: if init_run_length is None: @@ -145,7 +152,7 @@ def history_ping_stats(filename, parse_samples, verbose=False): count_unsched += 1 total_unsched_drop += d if d >= 1: - count_full_unsched += d + count_full_unsched += 1 # scheduled=false and obstructed=true do not ever appear to overlap, # but in case they do in the future, treat that as just unscheduled # in order to avoid double-counting it. @@ -153,7 +160,8 @@ def history_ping_stats(filename, parse_samples, verbose=False): count_obstruct += 1 total_obstruct_drop += d if d >= 1: - count_full_obstruct += d + count_full_obstruct += 1 + tot += d # If the entire sample set is one big drop run, it will be both initial # fragment (continued from prior sample range) and final one (continued