Detect and correct dish time getting out of sync

If the number of samples reported in the history varies from the number of seconds elapsed as detected on the script host's system clock by more than +/- 2 seconds, forcibly correct the time base back to current system time. This doesn't seem to trigger on hosts with NTP synced system clocks (possibly because the dish's clock is, too), so no attempt is made to make this graceful, there will just be a discontinuity in the timestamps assigned to samples if this correction is made.

Also, enforce a maximum batch size for data points writes to the InfluxDB server. It's somewhat arbitrarily set at 5000 data points. A write of the full 12 hour history buffer would be 43200 data points, so this will break that up a bit.

Related to issue #5
This commit is contained in:
sparky8512 2021-01-19 18:49:46 -08:00
parent 9b96c5dcc6
commit edcf2a2ee4

View file

@ -64,6 +64,7 @@ def main():
icargs = {"host": host_default, "timeout": 5, "database": database_default} icargs = {"host": host_default, "timeout": 5, "database": database_default}
rp = None rp = None
flush_limit = 6 flush_limit = 6
max_batch = 5000
# For each of these check they are both set and not empty string # For each of these check they are both set and not empty string
influxdb_host = os.environ.get("INFLUXDB_HOST") influxdb_host = os.environ.get("INFLUXDB_HOST")
@ -187,9 +188,15 @@ def main():
def flush_points(client): def flush_points(client):
try: try:
client.write_points(gstate.points, retention_policy=rp) while len(gstate.points) > max_batch:
if verbose: client.write_points(gstate.points[:max_batch], retention_policy=rp)
print("Data points written: " + str(len(gstate.points))) if verbose:
print("Data points written: " + str(max_batch))
del gstate.points[:max_batch]
if gstate.points:
client.write_points(gstate.points, retention_policy=rp)
if verbose:
print("Data points written: " + str(len(gstate.points)))
gstate.points.clear() gstate.points.clear()
except Exception as e: except Exception as e:
conn_error("Failed writing to InfluxDB database: %s", str(e)) conn_error("Failed writing to InfluxDB database: %s", str(e))
@ -198,18 +205,28 @@ def main():
return 0 return 0
def process_bulk_data(): def process_bulk_data():
# need to pull this now in case it is needed later before = time.time()
now = time.time()
start = gstate.counter start = gstate.counter
parse_samples = samples if start is None else -1 parse_samples = samples if start is None else -1
general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose) general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, verbose=verbose)
after = time.time()
parsed_samples = general["samples"] parsed_samples = general["samples"]
new_counter = general["current"] new_counter = general["current"]
timestamp = gstate.timestamp timestamp = gstate.timestamp
if timestamp is None or new_counter != gstate.counter + parsed_samples: # Check this first, so it doesn't report as lost time sync
timestamp = now if gstate.counter is not None and new_counter != gstate.counter + parsed_samples:
timestamp = None
# Allow up to 2 seconds of time drift before forcibly re-syncing, since
# +/- 1 second can happen just due to scheduler timing.
if timestamp is not None and not before - 2.0 <= timestamp + parsed_samples <= after + 2.0:
if verbose:
print("Lost sample time sync at: " +
str(datetime.fromtimestamp(timestamp + parsed_samples, tz=timezone.utc)))
timestamp = None
if timestamp is None:
timestamp = before
if verbose: if verbose:
print("Establishing new time base: " + str(new_counter) + " -> " + print("Establishing new time base: " + str(new_counter) + " -> " +
str(datetime.fromtimestamp(timestamp, tz=timezone.utc))) str(datetime.fromtimestamp(timestamp, tz=timezone.utc)))