From 0663008be708b8cc0bfef05952c0a5e3fd0666ea Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Sun, 17 Jan 2021 16:29:56 -0800 Subject: [PATCH 1/6] Add first pass of bulk data mode Adds a new option, -b, that will write the individual sample data to InfluxDB server, instead of summary data. --- dishHistoryInflux.py | 65 +++++++++++++++++++------- starlink_grpc.py | 106 ++++++++++++++++++++++++++++++++----------- 2 files changed, 127 insertions(+), 44 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index fcecaec..0e1380a 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -37,7 +37,7 @@ def main(): arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "ahn:p:rs:t:vC:D:IP:R:SU:") + opts, args = getopt.getopt(sys.argv[1:], "abhn:p:rs:t:vC:D:IP:R:SU:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -49,6 +49,7 @@ def main(): verbose = False default_loop_time = 0 loop_time = default_loop_time + bulk_mode = False run_lengths = False host_default = "localhost" database_default = "starlinkstats" @@ -92,6 +93,8 @@ def main(): for opt, arg in opts: if opt == "-a": samples = -1 + elif opt == "-b": + bulk_mode = True elif opt == "-h": print_usage = True elif opt == "-n": @@ -132,6 +135,7 @@ def main(): print("Usage: " + sys.argv[0] + " [options...]") print("Options:") print(" -a: Parse all valid samples") + print(" -b: Bulk mode: write individual sample data instead of summary stats") print(" -h: Be helpful") print(" -n : Hostname of InfluxDB server, default: " + host_default) print(" -p : Port number to use on InfluxDB server") @@ -182,25 +186,28 @@ def main(): return 0 - def loop_body(client): - if gstate.dish_id is None: - try: - gstate.dish_id = starlink_grpc.get_id() - if verbose: - print("Using dish ID: " + gstate.dish_id) - except starlink_grpc.GrpcError as e: - conn_error("Failure getting dish ID: %s", str(e)) - return 1 - + def process_bulk_data(): timestamp = datetime.datetime.utcnow() - try: - g_stats, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) - except starlink_grpc.GrpcError as e: - conn_error("Failure getting ping stats: %s", str(e)) - return 1 + general, bulk = starlink_grpc.history_bulk_data(samples, verbose) - all_stats = g_stats.copy() + parsed_samples = general["samples"] + for i in range(parsed_samples): + gstate.points.append({ + "measurement": "spacex.starlink.user_terminal.history", + "tags": { + "id": gstate.dish_id + }, + "time": timestamp + datetime.timedelta(seconds=i - parsed_samples), + "fields": {k: v[i] for k, v in bulk.items()}, + }) + + def process_ping_stats(): + timestamp = datetime.datetime.utcnow() + + general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) + + all_stats = general.copy() all_stats.update(pd_stats) if run_lengths: for k, v in rl_stats.items(): @@ -218,6 +225,30 @@ def main(): "time": timestamp, "fields": all_stats, }) + + def loop_body(client): + if gstate.dish_id is None: + try: + gstate.dish_id = starlink_grpc.get_id() + if verbose: + print("Using dish ID: " + gstate.dish_id) + except starlink_grpc.GrpcError as e: + conn_error("Failure getting dish ID: %s", str(e)) + return 1 + + if bulk_mode: + try: + process_bulk_data() + except starlink_grpc.GrpcError as e: + conn_error("Failure getting history: %s", str(e)) + return 1 + else: + try: + process_ping_stats() + except starlink_grpc.GrpcError as e: + conn_error("Failure getting ping stats: %s", str(e)) + return 1 + if verbose: print("Data points queued: " + str(len(gstate.points))) diff --git a/starlink_grpc.py b/starlink_grpc.py index 40e3572..3586529 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -10,6 +10,11 @@ General statistics: The sample interval is currently 1 second. samples: The number of valid samples analyzed. + current: XXX explain + +Bulk history data: + XXX to be written, but it'll be same as some of the items in status info, + just as lists for each. General ping drop (packet loss) statistics: This group of statistics characterize the packet loss (labeled "ping drop" @@ -136,6 +141,7 @@ def history_ping_field_names(): """ return [ "samples", + "current", ], [ "total_ping_drop", "count_full_ping_drop", @@ -165,6 +171,77 @@ def get_history(): return response.dish_get_history +def compute_sample_range(history, parse_samples, verbose=False): + # 'current' is the count of data samples written to the ring buffer, + # irrespective of buffer wrap. + current = int(history.current) + samples = len(history.pop_ping_drop_rate) + + if verbose: + print("current counter: " + str(current)) + print("All samples: " + str(samples)) + + samples = min(samples, current) + + if verbose: + print("Valid samples: " + str(samples)) + + # This is ring buffer offset, so both index to oldest data sample and + # index to next data sample after the newest one. + offset = current % samples + + if parse_samples < 0 or samples < parse_samples: + parse_samples = samples + + # Parse the most recent parse_samples-sized set of samples. This will + # iterate samples in order from oldest to newest. + if parse_samples <= offset: + sample_range = range(offset - parse_samples, offset) + else: + sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + + return sample_range, parse_samples, current + + +def history_bulk_data(parse_samples, verbose=False): + try: + history = get_history() + except grpc.RpcError as e: + raise GrpcError(e) + + sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) + + pop_ping_drop_rate = [] + pop_ping_latency_ms = [] + downlink_throughput_bps = [] + uplink_throughput_bps = [] + snr = [] + scheduled = [] + obstructed = [] + + for i in sample_range: + pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) + pop_ping_latency_ms.append(history.pop_ping_latency_ms[i]) + downlink_throughput_bps.append(history.downlink_throughput_bps[i]) + uplink_throughput_bps.append(history.uplink_throughput_bps[i]) + snr.append(history.snr[i]) + scheduled.append(history.scheduled[i]) + obstructed.append(history.obstructed[i]) + + return { + "samples": parse_samples, + "current": current, + }, { + "pop_ping_drop_rate": pop_ping_drop_rate, + "pop_ping_latency_ms": pop_ping_latency_ms, + "downlink_throughput_bps": downlink_throughput_bps, + "uplink_throughput_bps": uplink_throughput_bps, + "snr": snr, + "scheduled": scheduled, + "obstructed": obstructed, + } + + def history_ping_stats(parse_samples, verbose=False): """Fetch, parse, and compute the packet loss stats. @@ -187,23 +264,7 @@ def history_ping_stats(parse_samples, verbose=False): except grpc.RpcError as e: raise GrpcError(e) - # 'current' is the count of data samples written to the ring buffer, - # irrespective of buffer wrap. - current = int(history.current) - samples = len(history.pop_ping_drop_rate) - - if verbose: - print("current counter: " + str(current)) - print("All samples: " + str(samples)) - - samples = min(samples, current) - - if verbose: - print("Valid samples: " + str(samples)) - - # This is ring buffer offset, so both index to oldest data sample and - # index to next data sample after the newest one. - offset = current % samples + sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) tot = 0.0 count_full_drop = 0 @@ -219,16 +280,6 @@ def history_ping_stats(parse_samples, verbose=False): run_length = 0 init_run_length = None - if parse_samples < 0 or samples < parse_samples: - parse_samples = samples - - # Parse the most recent parse_samples-sized set of samples. This will - # iterate samples in order from oldest to newest. - if parse_samples <= offset: - sample_range = range(offset - parse_samples, offset) - else: - sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) - for i in sample_range: d = history.pop_ping_drop_rate[i] if d >= 1: @@ -272,6 +323,7 @@ def history_ping_stats(parse_samples, verbose=False): return { "samples": parse_samples, + "current": current, }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop, From 9b96c5dcc6a59957c14461ea6d8c5511e014fe6c Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Mon, 18 Jan 2021 13:30:34 -0800 Subject: [PATCH 2/6] Implement sample counter tracking in bulk mode Add tracking of exactly which samples have already been sent off to InfluxDB so that samples are neither missed nor repeated due to minor time deltas in OS task scheduling. For now, this is only being applied to bulk mode. Make the -s option only apply to the first loop iteration for bulk mode, since subsequent loops will want to pick up all samples since prior iteration. Also, omit the latency field from the data point sent to InfluxDB for samples where the ping drop is 100%. The raw history data apparently just repeats prior value in this case, probably because it cannot just leave a hole in the data array and there is no good way to indicate invalid. Related to issue #5 --- dishHistoryInflux.py | 53 +++++++++++---- starlink_grpc.py | 150 ++++++++++++++++++++++++++++++++----------- 2 files changed, 154 insertions(+), 49 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 0e1380a..56d1e15 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -1,17 +1,25 @@ #!/usr/bin/python3 ###################################################################### # -# Write Starlink user terminal packet loss statistics to an InfluxDB -# database. +# Write Starlink user terminal packet loss, latency, and usage data +# to an InfluxDB database. # # This script examines the most recent samples from the history data, -# computes several different metrics related to packet loss, and -# writes those to the specified InfluxDB database. +# and either writes them in whole, or computes several different +# metrics related to packet loss and writes those, to the specified +# InfluxDB database. +# +# NOTE: The Starlink user terminal does not include time values with +# its history or status data, so this script uses current system time +# to compute the timestamps it sends to InfluxDB. It is recommended +# to run this script on a host that has its system clock synced via +# NTP. Otherwise, the timestamps may get out of sync with real time. # ###################################################################### import getopt -import datetime +from datetime import datetime +from datetime import timezone import logging import os import signal @@ -140,8 +148,9 @@ def main(): print(" -n : Hostname of InfluxDB server, default: " + host_default) print(" -p : Port number to use on InfluxDB server") print(" -r: Include ping drop run length stats") - print(" -s : Number of data samples to parse, default: loop interval,") - print(" if set, else " + str(samples_default)) + print(" -s : Number of data samples to parse; in bulk mode, applies to first") + print(" loop iteration only, default: loop interval, if set, else " + + str(samples_default)) print(" -t : Loop interval in seconds or 0 for no loop, default: " + str(default_loop_time)) print(" -v: Be verbose") @@ -165,6 +174,8 @@ def main(): gstate = GlobalState() gstate.dish_id = None gstate.points = [] + gstate.counter = None + gstate.timestamp = None def conn_error(msg, *args): # Connection errors that happen in an interval loop are not critical @@ -187,23 +198,39 @@ def main(): return 0 def process_bulk_data(): - timestamp = datetime.datetime.utcnow() + # need to pull this now in case it is needed later + now = time.time() - general, bulk = starlink_grpc.history_bulk_data(samples, verbose) + start = gstate.counter + parse_samples = samples if start is None else -1 + general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose) parsed_samples = general["samples"] + new_counter = general["current"] + timestamp = gstate.timestamp + if timestamp is None or new_counter != gstate.counter + parsed_samples: + timestamp = now + if verbose: + print("Establishing new time base: " + str(new_counter) + " -> " + + str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) + timestamp -= parsed_samples + for i in range(parsed_samples): gstate.points.append({ "measurement": "spacex.starlink.user_terminal.history", "tags": { "id": gstate.dish_id }, - "time": timestamp + datetime.timedelta(seconds=i - parsed_samples), - "fields": {k: v[i] for k, v in bulk.items()}, + "time": datetime.utcfromtimestamp(timestamp), + "fields": {k: v[i] for k, v in bulk.items() if v[i] is not None}, }) + timestamp += 1 + + gstate.counter = new_counter + gstate.timestamp = timestamp def process_ping_stats(): - timestamp = datetime.datetime.utcnow() + timestamp = time.time() general, pd_stats, rl_stats = starlink_grpc.history_ping_stats(samples, verbose) @@ -222,7 +249,7 @@ def main(): "tags": { "id": gstate.dish_id }, - "time": timestamp, + "time": datetime.utcfromtimestamp(timestamp), "fields": all_stats, }) diff --git a/starlink_grpc.py b/starlink_grpc.py index 3586529..a60abd4 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -1,20 +1,55 @@ """Helpers for grpc communication with a Starlink user terminal. This module may eventually contain more expansive parsing logic, but for now -it contains functions to parse the history data for some specific packet loss -statistics. +it contains functions to either get the history data as-is or parse it for +some specific packet loss statistics. -General statistics: - This group of statistics contains data relevant to all the other groups. +Those functions return data grouped into sets, as follows: + +General data: + This set of fields contains data relevant to all the other groups. The sample interval is currently 1 second. - samples: The number of valid samples analyzed. - current: XXX explain + samples: The number of samples analyzed (for statistics) or returned + (for bulk data). + current: The total number of data samples that have been written to + the history buffer since dish reboot, irrespective of buffer wrap. + This can be used to keep track of how many samples are new in + comparison to a prior query of the history data. Bulk history data: - XXX to be written, but it'll be same as some of the items in status info, - just as lists for each. + This group holds the history data as-is for the requested range of + samples, just unwound from the circular buffers that the raw data holds. + It contains some of the same fields as the status info, but instead of + representing the current values, each field contains a sequence of values + representing the value over time, ending at the current time. + + pop_ping_drop_rate: Fraction of lost ping replies per sample. + pop_ping_latency_ms: Round trip time, in milliseconds, during the + sample period, or None if a sample experienced 100% ping drop. + downlink_throughput_bps: Download usage during the sample period + (actual, not max available), in bits per second. + uplink_throughput_bps: Upload usage during the sample period, in bits + per second. + snr: Signal to noise ratio during the sample period. + scheduled: Boolean indicating whether or not a satellite was scheduled + to be available for transmit/receive during the sample period. + When false, ping drop shows as "No satellites" in Starlink app. + obstructed: Boolean indicating whether or not the dish determined the + signal between it and the satellite was obstructed during the + sample period. When true, ping drop shows as "Obstructed" in the + Starlink app. + + There is no specific data field in the raw history data that directly + correlates with "Other" or "Beta downtime" in the Starlink app (or + whatever it gets renamed to after beta), but empirical evidence suggests + any sample where pop_ping_drop_rate is 1, scheduled is true, and + obstructed is false is counted as "Beta downtime". + + Note that neither scheduled=false nor obstructed=true necessarily means + packet loss occurred. Those need to be examined in combination with + pop_ping_drop_rate to be meaningful. General ping drop (packet loss) statistics: This group of statistics characterize the packet loss (labeled "ping drop" @@ -55,18 +90,18 @@ Ping drop run length statistics: end of the sample set that experienced 100% ping drop. This period may continue as a run beyond the end of the sample set, so is not counted in the following stats. - run_seconds: A 60 element list. Each element records the total amount - of time, in sample intervals, that experienced 100% ping drop in - a consecutive run that lasted for (list index + 1) sample + run_seconds: A 60 element sequence. Each element records the total + amount of time, in sample intervals, that experienced 100% ping + drop in a consecutive run that lasted for (index + 1) sample intervals (seconds). That is, the first element contains time spent in 1 sample runs, the second element contains time spent in 2 sample runs, etc. - run_minutes: A 60 element list. Each element records the total amount - of time, in sample intervals, that experienced 100% ping drop in - a consecutive run that lasted for more that (list index + 1) + run_minutes: A 60 element sequence. Each element records the total + amount of time, in sample intervals, that experienced 100% ping + drop in a consecutive run that lasted for more that (index + 1) multiples of 60 sample intervals (minutes), but less than or equal - to (list index + 2) multiples of 60 sample intervals. Except for - the last element in the list, which records the total amount of + to (index + 2) multiples of 60 sample intervals. Except for the + last element in the sequence, which records the total amount of time in runs of more than 60*60 samples. No sample should be counted in more than one of the run length stats or @@ -135,7 +170,7 @@ def history_ping_field_names(): """Return the field names of the packet loss stats. Returns: - A tuple with 3 lists, the first with general stat names, the second + A tuple with 3 lists, the first with general data names, the second with ping drop stat names, and the third with ping drop run length stat names. """ @@ -171,9 +206,7 @@ def get_history(): return response.dish_get_history -def compute_sample_range(history, parse_samples, verbose=False): - # 'current' is the count of data samples written to the ring buffer, - # irrespective of buffer wrap. +def _compute_sample_range(history, parse_samples, start=None, verbose=False): current = int(history.current) samples = len(history.pop_ping_drop_rate) @@ -186,30 +219,72 @@ def compute_sample_range(history, parse_samples, verbose=False): if verbose: print("Valid samples: " + str(samples)) - # This is ring buffer offset, so both index to oldest data sample and - # index to next data sample after the newest one. - offset = current % samples - if parse_samples < 0 or samples < parse_samples: parse_samples = samples - # Parse the most recent parse_samples-sized set of samples. This will - # iterate samples in order from oldest to newest. - if parse_samples <= offset: - sample_range = range(offset - parse_samples, offset) + if start is not None and start > current: + if verbose: + print("Counter reset detected, ignoring requested start count") + start = None + + if start is None or start < current - parse_samples: + start = current - parse_samples + + # This is ring buffer offset, so both index to oldest data sample and + # index to next data sample after the newest one. + end_offset = current % samples + start_offset = start % samples + + # Set the range for the requested set of samples. This will iterate + # sample index in order from oldest to newest. + if start_offset < end_offset: + sample_range = range(start_offset, end_offset) else: - sample_range = chain(range(samples + offset - parse_samples, samples), range(0, offset)) + sample_range = chain(range(start_offset, samples), range(0, end_offset)) - return sample_range, parse_samples, current + return sample_range, current - start, current -def history_bulk_data(parse_samples, verbose=False): +def history_bulk_data(parse_samples, start=None, verbose=False): + """Fetch history data for a range of samples. + + Args: + parse_samples (int): Number of samples to process, or -1 to parse all + available samples (bounded by start, if it is set). + start (int): Optional. If set, the samples returned will be limited to + the ones that have a counter value greater than or equal to this + value. The "current" field in the general data dict returned by + this function represents the counter value of the next data sample + after the returned data, so if that value is passed as start in a + subsequent call to this function, only new samples will be + returned. + NOTE: The sample counter will reset to 0 when the dish reboots. If + the requested start value is greater than the current "current" + value, this function will assume that happened and treat all + samples as being later than the requested start, and thus include + them (bounded by parse_samples, if it is not -1). + Combining parse_samples=-1 and setting start to other than None is + not recommended, as doing so will not guarantee that all new + samples are included in the results. + verbose (bool): Optionally produce verbose output. + + Returns: + A tuple with 2 dicts, the first mapping general data names to their + values and the second mapping bulk history data names to their values. + + Raises: + GrpcError: Failed getting history info from the Starlink user + terminal. + """ try: history = get_history() except grpc.RpcError as e: raise GrpcError(e) - sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) + sample_range, parsed_samples, current = _compute_sample_range(history, + parse_samples, + start=start, + verbose=verbose) pop_ping_drop_rate = [] pop_ping_latency_ms = [] @@ -221,7 +296,8 @@ def history_bulk_data(parse_samples, verbose=False): for i in sample_range: pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) - pop_ping_latency_ms.append(history.pop_ping_latency_ms[i]) + pop_ping_latency_ms.append( + history.pop_ping_latency_ms[i] if history.pop_ping_drop_rate[i] < 1 else None) downlink_throughput_bps.append(history.downlink_throughput_bps[i]) uplink_throughput_bps.append(history.uplink_throughput_bps[i]) snr.append(history.snr[i]) @@ -229,7 +305,7 @@ def history_bulk_data(parse_samples, verbose=False): obstructed.append(history.obstructed[i]) return { - "samples": parse_samples, + "samples": parsed_samples, "current": current, }, { "pop_ping_drop_rate": pop_ping_drop_rate, @@ -251,7 +327,7 @@ def history_ping_stats(parse_samples, verbose=False): verbose (bool): Optionally produce verbose output. Returns: - A tuple with 3 dicts, the first mapping general stat names to their + A tuple with 3 dicts, the first mapping general data names to their values, the second mapping ping drop stat names to their values and the third mapping ping drop run length stat names to their values. @@ -264,7 +340,9 @@ def history_ping_stats(parse_samples, verbose=False): except grpc.RpcError as e: raise GrpcError(e) - sample_range, parse_samples, current = compute_sample_range(history, parse_samples, verbose) + sample_range, parse_samples, current = _compute_sample_range(history, + parse_samples, + verbose=verbose) tot = 0.0 count_full_drop = 0 From edcf2a2ee4b81e00829b7cd567f14f2e30113150 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Tue, 19 Jan 2021 18:49:46 -0800 Subject: [PATCH 3/6] Detect and correct dish time getting out of sync If the number of samples reported in the history varies from the number of seconds elapsed as detected on the script host's system clock by more than +/- 2 seconds, forcibly correct the time base back to current system time. This doesn't seem to trigger on hosts with NTP synced system clocks (possibly because the dish's clock is, too), so no attempt is made to make this graceful, there will just be a discontinuity in the timestamps assigned to samples if this correction is made. Also, enforce a maximum batch size for data points writes to the InfluxDB server. It's somewhat arbitrarily set at 5000 data points. A write of the full 12 hour history buffer would be 43200 data points, so this will break that up a bit. Related to issue #5 --- dishHistoryInflux.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 56d1e15..ea6b3e0 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -64,6 +64,7 @@ def main(): icargs = {"host": host_default, "timeout": 5, "database": database_default} rp = None flush_limit = 6 + max_batch = 5000 # For each of these check they are both set and not empty string influxdb_host = os.environ.get("INFLUXDB_HOST") @@ -187,9 +188,15 @@ def main(): def flush_points(client): try: - client.write_points(gstate.points, retention_policy=rp) - if verbose: - print("Data points written: " + str(len(gstate.points))) + while len(gstate.points) > max_batch: + client.write_points(gstate.points[:max_batch], retention_policy=rp) + if verbose: + print("Data points written: " + str(max_batch)) + del gstate.points[:max_batch] + if gstate.points: + client.write_points(gstate.points, retention_policy=rp) + if verbose: + print("Data points written: " + str(len(gstate.points))) gstate.points.clear() except Exception as e: conn_error("Failed writing to InfluxDB database: %s", str(e)) @@ -198,18 +205,28 @@ def main(): return 0 def process_bulk_data(): - # need to pull this now in case it is needed later - now = time.time() + before = time.time() start = gstate.counter parse_samples = samples if start is None else -1 general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose) + after = time.time() parsed_samples = general["samples"] new_counter = general["current"] timestamp = gstate.timestamp - if timestamp is None or new_counter != gstate.counter + parsed_samples: - timestamp = now + # Check this first, so it doesn't report as lost time sync + if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: + timestamp = None + # Allow up to 2 seconds of time drift before forcibly re-syncing, since + # +/- 1 second can happen just due to scheduler timing. + if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0: + if verbose: + print("Lost sample time sync at: " + + str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc))) + timestamp = None + if timestamp is None: + timestamp = before if verbose: print("Establishing new time base: " + str(new_counter) + " -> " + str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) From ab335e92275af6751d79717369d201678a91741e Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Tue, 19 Jan 2021 19:05:41 -0800 Subject: [PATCH 4/6] Fix time base setting when not verbose --- dishHistoryInflux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index ea6b3e0..2ce9cbd 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -230,7 +230,7 @@ def main(): if verbose: print("Establishing new time base: " + str(new_counter) + " -> " + str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) - timestamp -= parsed_samples + timestamp -= parsed_samples for i in range(parsed_samples): gstate.points.append({ From 2e045ade1687c3e54ba6402cbdbb02fd8f18f1db Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Thu, 21 Jan 2021 20:39:37 -0800 Subject: [PATCH 5/6] Add tracking of counter across script invocations Write the sample counter value corresponding with the last recorded data point into the database along with the rest of the sample data so that it can be read out on next invocation of the script and data collection resumed where it left off. Switch default sample count to all samples when in bulk mode, which now really means all samples since the last one recorded already. Switch the time precision to be 1 second. Data points are only written one per second, anyway, and this way if there is any overlap due to counter tracking failure, the existing data will just get overwritten instead of creating duplicates. Add a maximum queue length, so the script doesn't just keep using more memory if it persistently (>10 days) fails writing to the InfluxDB server. Hack around some issues I ran into with the influxdb-python client library, especially with respect to running queries against InfluxDB 2.0 servers. This concludes the functionality related to bulk collection of history data discussed on issue #5 --- README.md | 6 +++ dishHistoryInflux.py | 121 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 108 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 78960f3..1c4ef4d 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,8 @@ The scripts that use [MQTT](https://mqtt.org/) for output require the `paho-mqtt The scripts that use [InfluxDB](https://www.influxdata.com/products/influxdb/) for output require the `influxdb` Python package. Information about how to install that can be found at https://github.com/influxdata/influxdb-python. Note that this is the (slightly) older version of the InfluxDB client Python module, not the InfluxDB 2.0 client. It can still be made to work with an InfluxDB 2.0 server, but doing so requires using `influx v1` [CLI commands](https://docs.influxdata.com/influxdb/v2.0/reference/cli/influx/v1/) on the server to map the 1.x username, password, and database names to their 2.0 equivalents. +Note that the Python package versions available from various Linux distributions (ie: installed via `apt-get` or similar) tend to run a bit behind those available to install via `pip`. While the distro packages should work OK as long as they aren't extremely old, they may not work as well as the later versions. + Running the scripts within a [Docker](https://www.docker.com/) container requires Docker to be installed. Information about how to install that can be found at https://docs.docker.com/engine/install/ ## Usage @@ -78,6 +80,10 @@ python3 dishStatusInflux.py -t 30 [... probably other args to specify server opt Some of the scripts (currently only the InfluxDB ones) also support specifying options through environment variables. See details in the scripts for the environment variables that map to options. +#### Bulk history data collection + +`dishStatusInflux.py` also supports a bulk mode that collects and writes the full second-by-second data to the server instead of summary stats. To select bulk mode, use the `-b` option. You'll probably also want to use the `-t` option to have it run in a loop. + ### Other scripts `dishDumpStatus.py` is a simple example of how to use the grpc modules (the ones generated by protoc, not `starlink_grpc.py`) directly. Just run it as: diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 2ce9cbd..1312b1a 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -31,6 +31,10 @@ from influxdb import InfluxDBClient import starlink_grpc +BULK_MEASUREMENT = "spacex.starlink.user_terminal.history" +PING_MEASUREMENT = "spacex.starlink.user_terminal.ping_stats" +MAX_QUEUE_LENGTH = 864000 + class Terminated(Exception): pass @@ -45,7 +49,7 @@ def main(): arg_error = False try: - opts, args = getopt.getopt(sys.argv[1:], "abhn:p:rs:t:vC:D:IP:R:SU:") + opts, args = getopt.getopt(sys.argv[1:], "abhkn:p:rs:t:vC:D:IP:R:SU:") except getopt.GetoptError as err: print(str(err)) arg_error = True @@ -58,6 +62,7 @@ def main(): default_loop_time = 0 loop_time = default_loop_time bulk_mode = False + bulk_skip_query = False run_lengths = False host_default = "localhost" database_default = "starlinkstats" @@ -106,6 +111,8 @@ def main(): bulk_mode = True elif opt == "-h": print_usage = True + elif opt == "-k": + bulk_skip_query = True elif opt == "-n": icargs["host"] = arg elif opt == "-p": @@ -146,12 +153,13 @@ def main(): print(" -a: Parse all valid samples") print(" -b: Bulk mode: write individual sample data instead of summary stats") print(" -h: Be helpful") + print(" -k: Skip querying for prior sample write point in bulk mode") print(" -n : Hostname of InfluxDB server, default: " + host_default) print(" -p : Port number to use on InfluxDB server") print(" -r: Include ping drop run length stats") print(" -s : Number of data samples to parse; in bulk mode, applies to first") - print(" loop iteration only, default: loop interval, if set, else " + - str(samples_default)) + print(" loop iteration only, default: -1 in bulk mode, loop interval if") + print(" loop interval set, else " + str(samples_default)) print(" -t : Loop interval in seconds or 0 for no loop, default: " + str(default_loop_time)) print(" -v: Be verbose") @@ -165,7 +173,7 @@ def main(): sys.exit(1 if arg_error else 0) if samples is None: - samples = int(loop_time) if loop_time > 0 else samples_default + samples = -1 if bulk_mode else int(loop_time) if loop_time > 0 else samples_default logging.basicConfig(format="%(levelname)s: %(message)s") @@ -177,6 +185,7 @@ def main(): gstate.points = [] gstate.counter = None gstate.timestamp = None + gstate.query_done = bulk_skip_query def conn_error(msg, *args): # Connection errors that happen in an interval loop are not critical @@ -187,24 +196,62 @@ def main(): logging.error(msg, *args) def flush_points(client): + # Don't flush points to server if the counter query failed, since some + # may be discarded later. Write would probably fail, too, anyway. + if bulk_mode and not gstate.query_done: + return 1 + try: while len(gstate.points) > max_batch: - client.write_points(gstate.points[:max_batch], retention_policy=rp) + client.write_points(gstate.points[:max_batch], + time_precision="s", + retention_policy=rp) if verbose: print("Data points written: " + str(max_batch)) del gstate.points[:max_batch] if gstate.points: - client.write_points(gstate.points, retention_policy=rp) + client.write_points(gstate.points, time_precision="s", retention_policy=rp) if verbose: print("Data points written: " + str(len(gstate.points))) gstate.points.clear() except Exception as e: conn_error("Failed writing to InfluxDB database: %s", str(e)) + # If failures persist, don't just use infinite memory. Max queue + # is currently 10 days of bulk data, so something is very wrong + # if it's ever exceeded. + if len(gstate.points) > MAX_QUEUE_LENGTH: + logging.error("Max write queue exceeded, discarding data.") + del gstate.points[:-MAX_QUEUE_LENGTH] return 1 return 0 - def process_bulk_data(): + def query_counter(client, now, len_points): + try: + # fetch the latest point where counter field was recorded + result = client.query("SELECT counter FROM \"{0}\" " + "WHERE time>={1}s AND time<{2}s AND id=$id " + "ORDER by time DESC LIMIT 1;".format( + BULK_MEASUREMENT, now - len_points, now), + bind_params={"id": gstate.dish_id}, + epoch="s") + rpoints = list(result.get_points()) + if rpoints: + counter = rpoints[0].get("counter", None) + timestamp = rpoints[0].get("time", 0) + if counter and timestamp: + return int(counter), int(timestamp) + except TypeError as e: + # bind_params was added in influxdb-python v5.2.3. That would be + # easy enough to work around, but older versions had other problems + # with query(), so just skip this functionality. + logging.error( + "Failed running query, probably due to influxdb-python version too old. " + "Skipping resumption from prior counter value. Reported error was: %s", str(e)) + + return None, 0 + + def process_bulk_data(client): before = time.time() start = gstate.counter @@ -215,7 +262,7 @@ def main(): parsed_samples = general["samples"] new_counter = general["current"] timestamp = gstate.timestamp - # Check this first, so it doesn't report as lost time sync + # check this first, so it doesn't report as lost time sync if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: timestamp = None # Allow up to 2 seconds of time drift before forcibly re-syncing, since @@ -226,26 +273,57 @@ def main(): str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc))) timestamp = None if timestamp is None: - timestamp = before - if verbose: - print("Establishing new time base: " + str(new_counter) + " -> " + - str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) + timestamp = int(before) + if verbose and gstate.query_done: + print("Establishing new time base: {0} -> {1}".format( + new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc))) timestamp -= parsed_samples for i in range(parsed_samples): + timestamp += 1 gstate.points.append({ - "measurement": "spacex.starlink.user_terminal.history", + "measurement": BULK_MEASUREMENT, "tags": { "id": gstate.dish_id }, - "time": datetime.utcfromtimestamp(timestamp), + "time": timestamp, "fields": {k: v[i] for k, v in bulk.items() if v[i] is not None}, }) - timestamp += 1 + + # save off counter value for script restart + if parsed_samples: + gstate.points[-1]["fields"]["counter"] = new_counter gstate.counter = new_counter gstate.timestamp = timestamp + # This is here and not before the points being processed because if the + # query previously failed, there will be points that were processed in + # a prior loop. This avoids having to handle that as a special case. + if not gstate.query_done: + try: + db_counter, db_timestamp = query_counter(client, timestamp, len(gstate.points)) + except Exception as e: + # could be temporary outage, so try again next time + conn_error("Failed querying InfluxDB for prior count: %s", str(e)) + return + gstate.query_done = True + start_counter = new_counter - len(gstate.points) + if db_counter and start_counter <= db_counter < new_counter: + del gstate.points[:db_counter - start_counter] + if before - 2.0 <= db_timestamp + len(gstate.points) <= after + 2.0: + if verbose: + print("Using existing time base: {0} -> {1}".format( + db_counter, datetime.fromtimestamp(db_timestamp, tz=timezone.utc))) + for point in gstate.points: + db_timestamp += 1 + point["time"] = db_timestamp + gstate.timestamp = db_timestamp + return + if verbose: + print("Establishing new time base: {0} -> {1}".format( + new_counter, datetime.fromtimestamp(timestamp, tz=timezone.utc))) + def process_ping_stats(): timestamp = time.time() @@ -262,11 +340,11 @@ def main(): all_stats[k] = v gstate.points.append({ - "measurement": "spacex.starlink.user_terminal.ping_stats", + "measurement": PING_MEASUREMENT, "tags": { "id": gstate.dish_id }, - "time": datetime.utcfromtimestamp(timestamp), + "time": int(timestamp), "fields": all_stats, }) @@ -282,7 +360,7 @@ def main(): if bulk_mode: try: - process_bulk_data() + process_bulk_data(client) except starlink_grpc.GrpcError as e: conn_error("Failure getting history: %s", str(e)) return 1 @@ -306,7 +384,12 @@ def main(): warnings.filterwarnings("ignore", message="Unverified HTTPS request") signal.signal(signal.SIGTERM, handle_sigterm) - influx_client = InfluxDBClient(**icargs) + try: + # attempt to hack around breakage between influxdb-python client and 2.0 server: + influx_client = InfluxDBClient(**icargs, headers={"Accept": "application/json"}) + except TypeError: + # ...unless influxdb-python package version is too old + influx_client = InfluxDBClient(**icargs) try: next_loop = time.monotonic() while True: From e16649fbf1d4263cac7ff79f511415bb6b67b59d Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Fri, 22 Jan 2021 18:43:51 -0800 Subject: [PATCH 6/6] Change name of "current" to "end_counter" Since "current" got added to the global data group returned from getting the history stats in non-bulk mode, it was being output by all 3 of the history scripts, and the name "current" was a little confusing when looking at prior output, since old values would no longer be current. The description of it in the start param of history_bulk_data was confusing, too. --- dishHistoryInflux.py | 2 +- starlink_grpc.py | 24 ++++++++++-------------- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 1312b1a..96bfa0d 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -260,7 +260,7 @@ def main(): after = time.time() parsed_samples = general["samples"] - new_counter = general["current"] + new_counter = general["end_counter"] timestamp = gstate.timestamp # check this first, so it doesn't report as lost time sync if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: diff --git a/starlink_grpc.py b/starlink_grpc.py index a60abd4..41eefa7 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -13,7 +13,7 @@ General data: samples: The number of samples analyzed (for statistics) or returned (for bulk data). - current: The total number of data samples that have been written to + end_counter: The total number of data samples that have been written to the history buffer since dish reboot, irrespective of buffer wrap. This can be used to keep track of how many samples are new in comparison to a prior query of the history data. @@ -176,7 +176,7 @@ def history_ping_field_names(): """ return [ "samples", - "current", + "end_counter", ], [ "total_ping_drop", "count_full_ping_drop", @@ -252,20 +252,16 @@ def history_bulk_data(parse_samples, start=None, verbose=False): parse_samples (int): Number of samples to process, or -1 to parse all available samples (bounded by start, if it is set). start (int): Optional. If set, the samples returned will be limited to - the ones that have a counter value greater than or equal to this - value. The "current" field in the general data dict returned by - this function represents the counter value of the next data sample - after the returned data, so if that value is passed as start in a - subsequent call to this function, only new samples will be - returned. + the ones that have a counter value greater than this value. The + "end_counter" field in the general data dict returned by this + function represents the counter value of the last data sample + returned, so if that value is passed as start in a subsequent call + to this function, only new samples will be returned. NOTE: The sample counter will reset to 0 when the dish reboots. If - the requested start value is greater than the current "current" + the requested start value is greater than the new "end_counter" value, this function will assume that happened and treat all samples as being later than the requested start, and thus include them (bounded by parse_samples, if it is not -1). - Combining parse_samples=-1 and setting start to other than None is - not recommended, as doing so will not guarantee that all new - samples are included in the results. verbose (bool): Optionally produce verbose output. Returns: @@ -306,7 +302,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False): return { "samples": parsed_samples, - "current": current, + "end_counter": current, }, { "pop_ping_drop_rate": pop_ping_drop_rate, "pop_ping_latency_ms": pop_ping_latency_ms, @@ -401,7 +397,7 @@ def history_ping_stats(parse_samples, verbose=False): return { "samples": parse_samples, - "current": current, + "end_counter": current, }, { "total_ping_drop": tot, "count_full_ping_drop": count_full_drop,