From edcf2a2ee4b81e00829b7cd567f14f2e30113150 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Tue, 19 Jan 2021 18:49:46 -0800 Subject: [PATCH] Detect and correct dish time getting out of sync If the number of samples reported in the history varies from the number of seconds elapsed as detected on the script host's system clock by more than +/- 2 seconds, forcibly correct the time base back to current system time. This doesn't seem to trigger on hosts with NTP synced system clocks (possibly because the dish's clock is, too), so no attempt is made to make this graceful, there will just be a discontinuity in the timestamps assigned to samples if this correction is made. Also, enforce a maximum batch size for data points writes to the InfluxDB server. It's somewhat arbitrarily set at 5000 data points. A write of the full 12 hour history buffer would be 43200 data points, so this will break that up a bit. Related to issue #5 --- dishHistoryInflux.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 56d1e15..ea6b3e0 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -64,6 +64,7 @@ def main(): icargs = {"host": host_default, "timeout": 5, "database": database_default} rp = None flush_limit = 6 + max_batch = 5000 # For each of these check they are both set and not empty string influxdb_host = os.environ.get("INFLUXDB_HOST") @@ -187,9 +188,15 @@ def main(): def flush_points(client): try: - client.write_points(gstate.points, retention_policy=rp) - if verbose: - print("Data points written: " + str(len(gstate.points))) + while len(gstate.points) > max_batch: + client.write_points(gstate.points[:max_batch], retention_policy=rp) + if verbose: + print("Data points written: " + str(max_batch)) + del gstate.points[:max_batch] + if gstate.points: + client.write_points(gstate.points, retention_policy=rp) + if verbose: + print("Data points written: " + str(len(gstate.points))) gstate.points.clear() except Exception as e: conn_error("Failed writing to InfluxDB database: %s", str(e)) @@ -198,18 +205,28 @@ def main(): return 0 def process_bulk_data(): - # need to pull this now in case it is needed later - now = time.time() + before = time.time() start = gstate.counter parse_samples = samples if start is None else -1 general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose) + after = time.time() parsed_samples = general["samples"] new_counter = general["current"] timestamp = gstate.timestamp - if timestamp is None or new_counter != gstate.counter + parsed_samples: - timestamp = now + # Check this first, so it doesn't report as lost time sync + if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: + timestamp = None + # Allow up to 2 seconds of time drift before forcibly re-syncing, since + # +/- 1 second can happen just due to scheduler timing. + if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0: + if verbose: + print("Lost sample time sync at: " + + str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc))) + timestamp = None + if timestamp is None: + timestamp = before if verbose: print("Establishing new time base: " + str(new_counter) + " -> " + str(datetime.fromtimestamp(timestamp, tz=timezone.utc)))