diff --git a/README.md b/README.md index 78960f3..1c4ef4d 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ The scripts that use [MQTT](https://mqtt.org/) for output require the `paho-mqtt The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) for output require the `influxdb` Python package. Information about how to install that can be found at https://github.com/influxdata/influxdb-python. Note that this is the (slightly) older version of the InfluxDB client Python module, not the InfluxDB 2.0 client. It can still be made to work with an InfluxDB 2.0 server, but doing so requires using `influx v1` [CLI commands](https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/v1/) on the server to map the 1.x username, password, and database names to their 2.0 equivalents. +Note that the Python package versions available from various Linux distributions (ie: installed via `apt-get` or similar) tend to run a bit behind those available to install via `pip`. While the distro packages should work OK as long as they aren't extremely old, they may not work as well as the later versions. + Running the scripts within a [Docker](https://www.docker.com/) container requires Docker to be installed. Information about how to install that can be found at https://docs.docker.com/engine/install/ ## Usage @@ -78,6 +80,10 @@ python3 dishStatusInflux.py -t 30 [... probably other args to specify server opt Some of the scripts (currently only the InfluxDB ones) also support specifying options through environment variables. See details in the scripts for the environment variables that map to options. +#### Bulk history data collection + +`dishStatusInflux.py` also supports a bulk mode that collects and writes the full second-by-second data to the server instead of summary stats. To select bulk mode, use the `-b` option. You'll probably also want to use the `-t` option to have it run in a loop. + ### Other scripts `dishDumpStatus.py` is a simple example of how to use the grpc modules (the ones generated by protoc, not `starlink_grpc.py`) directly. Just run it as: diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 2ce9cbd..1312b1a 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -31,6 +31,10 @@ from influxdb import InfluxDBClient import starlink_grpc +BULK_MEASUREMENT = "spacex.starlink.user_terminal.history" +PING_MEASUREMENT = "spacex.starlink.user_terminal.ping_stats" +MAX_QUEUE_LENGTH = 864000 + class Terminated(Exception): pass @@ -45,7 +49,7 @@ def main(): arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "abhn:p:rs:t:vC:D:IP:R:SU:") + opts, args = getopt.getopt(sys.argv[1:], "abhkn:p:rs:t:vC:D:IP:R:SU:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -58,6 +62,7 @@ def main(): default_loop_time = 0 loop_time = default_loop_time bulk_mode = False + bulk_skip_query = False run_lengths = False host_default = "localhost" database_default = "starlinkstats" @@ -106,6 +111,8 @@ def main(): bulk_mode = True elif opt == "-h": print_usage = True + elif opt == "-k": + bulk_skip_query = True elif opt == "-n": icargs["host"] = arg elif opt == "-p": @@ -146,12 +153,13 @@ def main(): print(" -a: Parse all valid samples") print(" -b: Bulk mode: write individual sample data instead of summary stats") print(" -h: Be helpful") + print(" -k: Skip querying for prior sample write point in bulk mode") print(" -n : Hostname of InfluxDB server, default: " + host_default) print(" -p : Port number to use on InfluxDB server") print(" -r: Include ping drop run length stats") print(" -s : Number of data samples to parse; in bulk mode, applies to first") - print(" loop iteration only, default: loop interval, if set, else " + - str(samples_default)) + print(" loop iteration only, default: -1 in bulk mode, loop interval if") + print(" loop interval set, else " + str(samples_default)) print(" -t : Loop interval in seconds or 0 for no loop, default: " + str(default_loop_time)) print(" -v: Be verbose") @@ -165,7 +173,7 @@ def main(): sys.exit(1 if arg_error else 0) if samples is None: - samples = int(loop_time) if loop_time > 0 else samples_default + samples = -1 if bulk_mode else int(loop_time) if loop_time > 0 else samples_default logging.basicConfig(format="%(levelname)s: %(message)s") @@ -177,6 +185,7 @@ def main(): gstate.points = [] gstate.counter = None gstate.timestamp = None + gstate.query_done = bulk_skip_query def conn_error(msg, *args): # Connection errors that happen in an interval loop are not critical @@ -187,24 +196,62 @@ def main(): logging.error(msg, *args) def flush_points(client): + # Don't flush points to server if the counter query failed, since some + # may be discarded later. Write would probably fail, too, anyway. + if bulk_mode and not gstate.query_done: + return 1 + try: while len(gstate.points) > max_batch: - client.write_points(gstate.points[:max_batch], retention_policy=rp) + client.write_points(gstate.points[:max_batch], + time_precision="s", + retention_policy=rp) if verbose: print("Data points written: " + str(max_batch)) del gstate.points[:max_batch] if gstate.points: - client.write_points(gstate.points, retention_policy=rp) + client.write_points(gstate.points, time_precision="s", retention_policy=rp) if verbose: print("Data points written: " + str(len(gstate.points))) gstate.points.clear() except Exception as e: conn_error("Failed writing to InfluxDB database: %s", str(e)) + # If failures persist, don't just use infinite memory. Max queue + # is currently 10 days of bulk data, so something is very wrong + # if it's ever exceeded. + if len(gstate.points) > MAX_QUEUE_LENGTH: + logging.error("Max write queue exceeded, discarding data.") + del gstate.points[:-MAX_QUEUE_LENGTH] return 1 return 0 - def process_bulk_data(): + def query_counter(client, now, len_points): + try: + # fetch the latest point where counter field was recorded + result = client.query("SELECT counter FROM \"{0}\" " + "WHERE time>={1}s AND time<{2}s AND id=$id " + "ORDER by time DESC LIMIT 1;".format( + BULK_MEASUREMENT, now - len_points, now), + bind_params={"id": gstate.dish_id}, + epoch="s") + rpoints = list(result.get_points()) + if rpoints: + counter = rpoints[0].get("counter", None) + timestamp = rpoints[0].get("time", 0) + if counter and timestamp: + return int(counter), int(timestamp) + except TypeError as e: + # bind_params was added in influxdb-python v5.2.3. That would be + # easy enough to work around, but older versions had other problems + # with query(), so just skip this functionality. + logging.error( + "Failed running query, probably due to influxdb-python version too old. " + "Skipping resumption from prior counter value. Reported error was: %s", str(e)) + + return None, 0 + + def process_bulk_data(client): before = time.time() start = gstate.counter @@ -215,7 +262,7 @@ def main(): parsed_samples = general["samples"] new_counter = general["current"] timestamp = gstate.timestamp - # Check this first, so it doesn't report as lost time sync + # check this first, so it doesn't report as lost time sync if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: timestamp = None # Allow up to 2 seconds of time drift before forcibly re-syncing, since @@ -226,26 +273,57 @@ def main(): str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc))) timestamp = None if timestamp is None: - timestamp = before - if verbose: - print("Establishing new time base: " + str(new_counter) + " -> " + - str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) + timestamp = int(before) + if verbose and gstate.query_done: + print("Establishing new time base: {0} -> {1}".format( + new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc))) timestamp -= parsed_samples for i in range(parsed_samples): + timestamp += 1 gstate.points.append({ - "measurement": "spacex.starlink.user_terminal.history", + "measurement": BULK_MEASUREMENT, "tags": { "id": gstate.dish_id }, - "time": datetime.utcfromtimestamp(timestamp), + "time": timestamp, "fields": {k: v[i] for k, v in bulk.items() if v[i] is not None}, }) - timestamp += 1 + + # save off counter value for script restart + if parsed_samples: + gstate.points[-1]["fields"]["counter"] = new_counter gstate.counter = new_counter gstate.timestamp = timestamp + # This is here and not before the points being processed because if the + # query previously failed, there will be points that were processed in + # a prior loop. This avoids having to handle that as a special case. + if not gstate.query_done: + try: + db_counter, db_timestamp = query_counter(client, timestamp, len(gstate.points)) + except Exception as e: + # could be temporary outage, so try again next time + conn_error("Failed querying InfluxDB for prior count: %s", str(e)) + return + gstate.query_done = True + start_counter = new_counter - len(gstate.points) + if db_counter and start_counter <= db_counter < new_counter: + del gstate.points[:db_counter - start_counter] + if before - 2.0 <= db_timestamp + len(gstate.points) <= after + 2.0: + if verbose: + print("Using existing time base: {0} -> {1}".format( + db_counter, datetime.fromtimestamp(db_timestamp, tz=timezone.utc))) + for point in gstate.points: + db_timestamp += 1 + point["time"] = db_timestamp + gstate.timestamp = db_timestamp + return + if verbose: + print("Establishing new time base: {0} -> {1}".format( + new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc))) + def process_ping_stats(): timestamp = time.time() @@ -262,11 +340,11 @@ def main(): all_stats[k] = v gstate.points.append({ - "measurement": "spacex.starlink.user_terminal.ping_stats", + "measurement": PING_MEASUREMENT, "tags": { "id": gstate.dish_id }, - "time": datetime.utcfromtimestamp(timestamp), + "time": int(timestamp), "fields": all_stats, }) @@ -282,7 +360,7 @@ def main(): if bulk_mode: try: - process_bulk_data() + process_bulk_data(client) except starlink_grpc.GrpcError as e: conn_error("Failure getting history: %s", str(e)) return 1 @@ -306,7 +384,12 @@ def main(): warnings.filterwarnings("ignore", message="Unverified HTTPS request") signal.signal(signal.SIGTERM, handle_sigterm) - influx_client = InfluxDBClient(**icargs) + try: + # attempt to hack around breakage between influxdb-python client and 2.0 server: + influx_client = InfluxDBClient(**icargs, headers={"Accept": "application/json"}) + except TypeError: + # ...unless influxdb-python package version is too old + influx_client = InfluxDBClient(**icargs) try: next_loop = time.monotonic() while True: