diff --git a/dish_common.py b/dish_common.py index eeaae5f..f78a808 100644 --- a/dish_common.py +++ b/dish_common.py @@ -117,11 +117,15 @@ def conn_error(opts, msg, *args): class GlobalState: - """A mostly empty class for keeping state across loop iterations.""" + """A class for keeping state across loop iterations.""" def __init__(self): self.counter = None self.timestamp = None self.dish_id = None + self.context = starlink_grpc.ChannelContext() + + def shutdown(self): + self.context.close() def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): @@ -159,7 +163,8 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): if opts.satus_mode: try: - status_data, obstruct_detail, alert_detail = starlink_grpc.status_data() + status_data, obstruct_detail, alert_detail = starlink_grpc.status_data( + context=gstate.context) except starlink_grpc.GrpcError as e: if "status" in opts.mode: if opts.need_id and gstate.dish_id is None: @@ -182,7 +187,7 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): add_data(alert_detail, "status") elif opts.need_id and gstate.dish_id is None: try: - gstate.dish_id = starlink_grpc.get_id() + gstate.dish_id = starlink_grpc.get_id(context=gstate.context) except starlink_grpc.GrpcError as e: conn_error(opts, "Failure getting dish ID: %s", str(e)) return 1 @@ -191,7 +196,9 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): if opts.ping_mode: try: - general, ping, runlen = starlink_grpc.history_ping_stats(opts.samples, opts.verbose) + general, ping, runlen = starlink_grpc.history_ping_stats(opts.samples, + opts.verbose, + context=gstate.context) except starlink_grpc.GrpcError as e: conn_error(opts, "Failure getting ping stats: %s", str(e)) return 1 @@ -209,7 +216,8 @@ def get_data(opts, gstate, add_item, add_sequence, add_bulk=None): try: general, bulk = starlink_grpc.history_bulk_data(parse_samples, start=start, - verbose=opts.verbose) + verbose=opts.verbose, + context=gstate.context) except starlink_grpc.GrpcError as e: conn_error(opts, "Failure getting history: %s", str(e)) return 1 diff --git a/dish_grpc_influx.py b/dish_grpc_influx.py index 65cf044..8aa667d 100644 --- a/dish_grpc_influx.py +++ b/dish_grpc_influx.py @@ -313,6 +313,7 @@ def main(): if gstate.points: rc = flush_points(opts, gstate) gstate.influx_client.close() + gstate.shutdown() sys.exit(rc) diff --git a/dish_grpc_mqtt.py b/dish_grpc_mqtt.py index 0ca5c0a..6448d6c 100644 --- a/dish_grpc_mqtt.py +++ b/dish_grpc_mqtt.py @@ -113,15 +113,18 @@ def main(): gstate = dish_common.GlobalState() - next_loop = time.monotonic() - while True: - rc = loop_body(opts, gstate) - if opts.loop_interval > 0.0: - now = time.monotonic() - next_loop = max(next_loop + opts.loop_interval, now) - time.sleep(next_loop - now) - else: - break + try: + next_loop = time.monotonic() + while True: + rc = loop_body(opts, gstate) + if opts.loop_interval > 0.0: + now = time.monotonic() + next_loop = max(next_loop + opts.loop_interval, now) + time.sleep(next_loop - now) + else: + break + finally: + gstate.shutdown() sys.exit(rc) diff --git a/dish_grpc_text.py b/dish_grpc_text.py index d8092c7..b35c20e 100644 --- a/dish_grpc_text.py +++ b/dish_grpc_text.py @@ -165,15 +165,18 @@ def main(): gstate = dish_common.GlobalState() - next_loop = time.monotonic() - while True: - rc = loop_body(opts, gstate) - if opts.loop_interval > 0.0: - now = time.monotonic() - next_loop = max(next_loop + opts.loop_interval, now) - time.sleep(next_loop - now) - else: - break + try: + next_loop = time.monotonic() + while True: + rc = loop_body(opts, gstate) + if opts.loop_interval > 0.0: + now = time.monotonic() + next_loop = max(next_loop + opts.loop_interval, now) + time.sleep(next_loop - now) + else: + break + finally: + gstate.shutdown() sys.exit(rc) diff --git a/starlink_grpc.py b/starlink_grpc.py index af83e4d..5879920 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -52,9 +52,9 @@ This group holds information about the current state of the user terminal. indicates the alert is active. See alert detail status data for which bits correspond with each alert, or to get individual alert flags instead of a combined bit mask. -: **fraction_obstructed** : The fraction of total area that the user terminal - has determined to be obstructed between it and the satellites with which - it communicates. +: **fraction_obstructed** : The fraction of total area (or possibly fraction + of time?) that the user terminal has determined to be obstructed between + it and the satellites with which it communicates. : **currently_obstructed** : Most recent sample value. See bulk history data for detail. : **seconds_obstructed** : The amount of time within the history buffer @@ -71,10 +71,13 @@ user terminal has determined to be obstructed. : **wedges_fraction_obstructed** : A 12 element sequence. Each element represents a 30 degree wedge of area and its value indicates the fraction - of area within that wedge that the user terminal has determined to be - obstructed between it and the satellites with which it communicates. The - values are expressed as a fraction of total area, not a fraction of the - wedge, so max value for each element should be 1/12. + of area (time?) within that wedge that the user terminal has determined to + be obstructed between it and the satellites with which it communicates. + The values are expressed as a fraction of total, not a fraction of the + wedge, so max value for each element should be 1/12. The first element in + the sequence represents the wedge that spans exactly North to 30 degrees + East of North, and subsequent wedges rotate 30 degrees further in the same + direction. (It's not clear if this will hold true at all latitudes.) See also *fraction_obstructed* in general status data, which should equal the sum of all *wedges_fraction_obstructed* elements. @@ -229,6 +232,25 @@ class GrpcError(Exception): super().__init__(msg, *args, **kwargs) +class ChannelContext: + """A wrapper for reusing an open grpc Channel across calls.""" + def __init__(self, target="192.168.100.1:9200"): + self.channel = None + self.target = target + + def get_channel(self): + reused = True + if self.channel is None: + self.channel = grpc.insecure_channel(self.target) + reused = False + return self.channel, reused + + def close(self): + if self.channel is not None: + self.channel.close() + self.channel = None + + def status_field_names(): """Return the field names of the status data. @@ -265,21 +287,43 @@ def status_field_names(): ], alert_names -def get_status(): +def get_status(context=None): """Fetch status data and return it in grpc structure format. + Args: + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. If an existing channel is reused, the RPC + call will be retried at most once, since connectivity may have + been lost and restored in the time since it was last used. + Raises: grpc.RpcError: Communication or service error. """ - with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) - return response.dish_get_status + if context is None: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + return response.dish_get_status + + while True: + channel, reused = context.get_channel() + try: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_status={})) + return response.dish_get_status + except grpc.RpcError: + context.close() + if not reused: + raise -def get_id(): +def get_id(context=None): """Return the ID from the dish status information. + Args: + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. + Returns: A string identifying the Starlink user terminal reachable from the local network. @@ -288,15 +332,19 @@ def get_id(): GrpcError: No user terminal is currently reachable. """ try: - status = get_status() + status = get_status(context) return status.device_info.id except grpc.RpcError as e: raise GrpcError(e) -def status_data(): +def status_data(context=None): """Fetch current status data. + Args: + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. + Returns: A tuple with 3 dicts, the first mapping status data names to their values, the second mapping alert detail field names to their values, @@ -307,7 +355,7 @@ def status_data(): terminal. """ try: - status = get_status() + status = get_status(context) except grpc.RpcError as e: raise GrpcError(e) @@ -397,16 +445,34 @@ def history_ping_field_names(): ] -def get_history(): +def get_history(context=None): """Fetch history data and return it in grpc structure format. + Args: + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. If an existing channel is reused, the RPC + call will be retried at most once, since connectivity may have + been lost and restored in the time since it was last used. + Raises: grpc.RpcError: Communication or service error. """ - with grpc.insecure_channel("192.168.100.1:9200") as channel: - stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) - response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) - return response.dish_get_history + if context is None: + with grpc.insecure_channel("192.168.100.1:9200") as channel: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) + return response.dish_get_history + + while True: + channel, reused = context.get_channel() + try: + stub = spacex.api.device.device_pb2_grpc.DeviceStub(channel) + response = stub.Handle(spacex.api.device.device_pb2.Request(get_history={})) + return response.dish_get_history + except grpc.RpcError: + context.close() + if not reused: + raise def _compute_sample_range(history, parse_samples, start=None, verbose=False): @@ -448,7 +514,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False): return sample_range, current - start, current -def history_bulk_data(parse_samples, start=None, verbose=False): +def history_bulk_data(parse_samples, start=None, verbose=False, context=None): """Fetch history data for a range of samples. Args: @@ -467,6 +533,8 @@ def history_bulk_data(parse_samples, start=None, verbose=False): samples as being later than the requested start, and thus include them (bounded by parse_samples, if it is not -1). verbose (bool): Optionally produce verbose output. + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. Returns: A tuple with 2 dicts, the first mapping general data names to their @@ -482,7 +550,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False): terminal. """ try: - history = get_history() + history = get_history(context) except grpc.RpcError as e: raise GrpcError(e) @@ -523,7 +591,7 @@ def history_bulk_data(parse_samples, start=None, verbose=False): } -def history_ping_stats(parse_samples, verbose=False): +def history_ping_stats(parse_samples, verbose=False, context=None): """Fetch, parse, and compute the packet loss stats. Note: @@ -533,6 +601,8 @@ def history_ping_stats(parse_samples, verbose=False): parse_samples (int): Number of samples to process, or -1 to parse all available samples. verbose (bool): Optionally produce verbose output. + context (ChannelContext): Optionally provide a channel for reuse + across repeated calls. Returns: A tuple with 3 dicts, the first mapping general data names to their @@ -544,7 +614,7 @@ def history_ping_stats(parse_samples, verbose=False): terminal. """ try: - history = get_history() + history = get_history(context) except grpc.RpcError as e: raise GrpcError(e)