diff --git a/dishHistoryInflux.py b/dishHistoryInflux.py index 56d1e15..ea6b3e0 100644 --- a/dishHistoryInflux.py +++ b/dishHistoryInflux.py @@ -64,6 +64,7 @@ def main(): icargs = {"host": host_default, "timeout": 5, "database": database_default} rp = None flush_limit = 6 + max_batch = 5000 # For each of these check they are both set and not empty string influxdb_host = os.environ.get("INFLUXDB_HOST") @@ -187,9 +188,15 @@ def main(): def flush_points(client): try: - client.write_points(gstate.points, retention_policy=rp) - if verbose: - print("Data points written: " + str(len(gstate.points))) + while len(gstate.points) > max_batch: + client.write_points(gstate.points[:max_batch], retention_policy=rp) + if verbose: + print("Data points written: " + str(max_batch)) + del gstate.points[:max_batch] + if gstate.points: + client.write_points(gstate.points, retention_policy=rp) + if verbose: + print("Data points written: " + str(len(gstate.points))) gstate.points.clear() except Exception as e: conn_error("Failed writing to InfluxDB database: %s", str(e)) @@ -198,18 +205,28 @@ def main(): return 0 def process_bulk_data(): - # need to pull this now in case it is needed later - now = time.time() + before = time.time() start = gstate.counter parse_samples = samples if start is None else -1 general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose) + after = time.time() parsed_samples = general["samples"] new_counter = general["current"] timestamp = gstate.timestamp - if timestamp is None or new_counter != gstate.counter + parsed_samples: - timestamp = now + # Check this first, so it doesn't report as lost time sync + if gstate.counter is not None and new_counter != gstate.counter + parsed_samples: + timestamp = None + # Allow up to 2 seconds of time drift before forcibly re-syncing, since + # +/- 1 second can happen just due to scheduler timing. + if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0: + if verbose: + print("Lost sample time sync at: " + + str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc))) + timestamp = None + if timestamp is None: + timestamp = before if verbose: print("Establishing new time base: " + str(new_counter) + " -> " + str(datetime.fromtimestamp(timestamp, tz=timezone.utc)))