From 8b1d81b2bb109aabc2d6aee90ad8fbbe4c3327b0 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 1 Feb 2023 10:56:41 -0800 Subject: [PATCH 1/4] Protect status usage vs grpc protocol changes Change all uses of status data protobuf messages so that they will not crash the calling script in the case where the grpc protocol removes the message, field, or enum value being accessed. Instead, they will return None for the affected field (which most calling scripts interpret as "no data") or raise the same error as when the dish is not reachable. This makes the code a little less readable, but it's better than breaking every time the protocol obsoletes fields. mypy now complains about the return types for some fields that are now technically optional but are not marked as such in the type data, because the type data reflects what will currently be returned, not what may turn to None in the future if the protocol changes. This is for issue #66. --- starlink_grpc.py | 108 +++++++++++++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/starlink_grpc.py b/starlink_grpc.py index c3e62a3..b106d37 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -535,6 +535,8 @@ class GrpcError(Exception): msg = e.details() elif isinstance(e, grpc.RpcError): msg = "Unknown communication or service error" + elif isinstance(e, (AttributeError, ValueError)): + msg = "Protocol error" else: msg = str(e) super().__init__(msg, *args, **kwargs) @@ -617,8 +619,11 @@ def status_field_names(context: Optional[ChannelContext] = None): except grpc.RpcError as e: raise GrpcError(e) from e alert_names = [] - for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: - alert_names.append("alert_" + field.name) + try: + for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: + alert_names.append("alert_" + field.name) + except AttributeError: + pass return _field_names(StatusDict), _field_names(ObstructionDict), alert_names @@ -646,8 +651,12 @@ def status_field_types(context: Optional[ChannelContext] = None): call_with_channel(resolve_imports, context=context) except grpc.RpcError as e: raise GrpcError(e) from e - return (_field_types(StatusDict), _field_types(ObstructionDict), - [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) + num_alerts = 0 + try: + num_alerts = len(dish_pb2.DishAlerts.DESCRIPTOR.fields) + except AttributeError: + pass + return (_field_types(StatusDict), _field_types(ObstructionDict), [bool] * num_alerts) def get_status(context: Optional[ChannelContext] = None): @@ -661,6 +670,9 @@ def get_status(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel): if imports_pending: @@ -689,7 +701,7 @@ def get_id(context: Optional[ChannelContext] = None) -> str: try: status = get_status(context) return status.device_info.id - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e @@ -711,62 +723,78 @@ def status_data( """ try: status = get_status(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e - if status.HasField("outage"): - if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: - # Special case translate this to equivalent old name - state = "SEARCHING" + try: + if status.HasField("outage"): + if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: + # Special case translate this to equivalent old name + state = "SEARCHING" + else: + try: + state = dish_pb2.DishOutage.Cause.Name(status.outage.cause) + except ValueError: + # Unlikely, but possible if dish is running newer firmware + # than protocol data pulled via reflection + state = str(status.outage.cause) else: - state = dish_pb2.DishOutage.Cause.Name(status.outage.cause) - else: - state = "CONNECTED" + state = "CONNECTED" + except (AttributeError, ValueError): + state = "UNKNOWN" # More alerts may be added in future, so in addition to listing them # individually, provide a bit field based on field numbers of the # DishAlerts message. alerts = {} alert_bits = 0 - for field in status.alerts.DESCRIPTOR.fields: - value = getattr(status.alerts, field.name) - alerts["alert_" + field.name] = value - if field.number < 65: - alert_bits |= (1 if value else 0) << (field.number - 1) + try: + for field in status.alerts.DESCRIPTOR.fields: + value = getattr(status.alerts, field.name, False) + alerts["alert_" + field.name] = value + if field.number < 65: + alert_bits |= (1 if value else 0) << (field.number - 1) + except AttributeError: + pass - if (status.obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0 - and not math.isnan(status.obstruction_stats.avg_prolonged_obstruction_interval_s)): - obstruction_duration = status.obstruction_stats.avg_prolonged_obstruction_duration_s - obstruction_interval = status.obstruction_stats.avg_prolonged_obstruction_interval_s - else: - obstruction_duration = None - obstruction_interval = None + obstruction_duration = None + obstruction_interval = None + obstruction_stats = getattr(status, "obstruction_stats", None) + if obstruction_stats is not None: + try: + if (obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0 + and not math.isnan(obstruction_stats.avg_prolonged_obstruction_interval_s)): + obstruction_duration = obstruction_stats.avg_prolonged_obstruction_duration_s + obstruction_interval = obstruction_stats.avg_prolonged_obstruction_interval_s + except AttributeError: + pass + device_info = getattr(status, "device_info", None) return { - "id": status.device_info.id, - "hardware_version": status.device_info.hardware_version, - "software_version": status.device_info.software_version, + "id": getattr(device_info, "id", None), + "hardware_version": getattr(device_info, "hardware_version", None), + "software_version": getattr(device_info, "software_version", None), "state": state, - "uptime": status.device_state.uptime_s, + "uptime": getattr(getattr(status, "device_state", None), "uptime_s", None), "snr": None, # obsoleted in grpc service - "seconds_to_first_nonempty_slot": status.seconds_to_first_nonempty_slot, - "pop_ping_drop_rate": status.pop_ping_drop_rate, - "downlink_throughput_bps": status.downlink_throughput_bps, - "uplink_throughput_bps": status.uplink_throughput_bps, - "pop_ping_latency_ms": status.pop_ping_latency_ms, + "seconds_to_first_nonempty_slot": getattr(status, "seconds_to_first_nonempty_slot", None), + "pop_ping_drop_rate": getattr(status, "pop_ping_drop_rate", None), + "downlink_throughput_bps": getattr(status, "downlink_throughput_bps", None), + "uplink_throughput_bps": getattr(status, "uplink_throughput_bps", None), + "pop_ping_latency_ms": getattr(status, "pop_ping_latency_ms", None), "alerts": alert_bits, - "fraction_obstructed": status.obstruction_stats.fraction_obstructed, - "currently_obstructed": status.obstruction_stats.currently_obstructed, + "fraction_obstructed": getattr(obstruction_stats, "fraction_obstructed", None), + "currently_obstructed": getattr(obstruction_stats, "currently_obstructed", None), "seconds_obstructed": None, # obsoleted in grpc service "obstruction_duration": obstruction_duration, "obstruction_interval": obstruction_interval, - "direction_azimuth": status.boresight_azimuth_deg, - "direction_elevation": status.boresight_elevation_deg, - "is_snr_above_noise_floor": status.is_snr_above_noise_floor, + "direction_azimuth": getattr(status, "boresight_azimuth_deg", None), + "direction_elevation": getattr(status, "boresight_elevation_deg", None), + "is_snr_above_noise_floor": getattr(status, "is_snr_above_noise_floor", None), }, { "wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service "raw_wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service - "valid_s": status.obstruction_stats.valid_s, + "valid_s": getattr(obstruction_stats, "valid_s", None), }, alerts From ab2bce59cad717712ea5281e1b8d63fc183208b9 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:19:17 -0800 Subject: [PATCH 2/4] Protect history usage vs grpc protocol changes Change all uses of history data protobuf messages so that they will not crash the calling script in the case where the grpc protocol removes the message or field being accessed. Instead, they will return None for the affected field (which most calling scripts interpret as "no data") or raise the same error as when the dish is not reachable. Same caveats about code readability and adherence to the advertised type data as the change for protecting status usage. This is for issue #66. --- starlink_grpc.py | 86 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 70 insertions(+), 16 deletions(-) diff --git a/starlink_grpc.py b/starlink_grpc.py index b106d37..aa36009 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -955,6 +955,9 @@ def get_history(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel: grpc.Channel): if imports_pending: @@ -970,8 +973,12 @@ def _compute_sample_range(history, parse_samples: int, start: Optional[int] = None, verbose: bool = False): - current = int(history.current) - samples = len(history.pop_ping_drop_rate) + try: + current = int(history.current) + samples = len(history.pop_ping_drop_rate) + except (AttributeError, TypeError): + # Without current and pop_ping_drop_rate, history is unusable. + return range(0), 0, None if verbose: print("current counter: " + str(current)) @@ -1046,8 +1053,14 @@ def concatenate_history(history1, An object with the unwrapped history data and the same attribute fields as a grpc history object. """ - size2 = len(history2.pop_ping_drop_rate) - new_samples = history2.current - history1.current + try: + size2 = len(history2.pop_ping_drop_rate) + new_samples = history2.current - history1.current + except (AttributeError, TypeError): + # Something is wrong. Probably both history objects are bad, so no + # point in trying to combine them. + return history1 + if new_samples < 0: if verbose: print("Dish reboot detected. Appending anyway.") @@ -1062,19 +1075,28 @@ def concatenate_history(history1, unwrapped = UnwrappedHistory() for field in HISTORY_FIELDS: - setattr(unwrapped, field, []) + if hasattr(history1, field) and hasattr(history2, field): + setattr(unwrapped, field, []) unwrapped.unwrapped = True sample_range, ignore1, ignore2 = _compute_sample_range( # pylint: disable=unused-variable history1, samples1, start=start1) for i in sample_range: for field in HISTORY_FIELDS: - getattr(unwrapped, field).append(getattr(history1, field)[i]) + if hasattr(unwrapped, field): + try: + getattr(unwrapped, field).append(getattr(history1, field)[i]) + except (IndexError, TypeError): + pass sample_range, ignore1, ignore2 = _compute_sample_range(history2, new_samples) # pylint: disable=unused-variable for i in sample_range: for field in HISTORY_FIELDS: - getattr(unwrapped, field).append(getattr(history2, field)[i]) + if hasattr(unwrapped, field): + try: + getattr(unwrapped, field).append(getattr(history2, field)[i]) + except (IndexError, TypeError): + pass unwrapped.current = history2.current return unwrapped @@ -1124,7 +1146,7 @@ def history_bulk_data(parse_samples: int, if history is None: try: history = get_history(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e sample_range, parsed_samples, current = _compute_sample_range(history, @@ -1138,11 +1160,30 @@ def history_bulk_data(parse_samples: int, uplink_throughput_bps = [] for i in sample_range: + # pop_ping_drop_rate is checked in _compute_sample_range pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) - pop_ping_latency_ms.append( - history.pop_ping_latency_ms[i] if history.pop_ping_drop_rate[i] < 1 else None) - downlink_throughput_bps.append(history.downlink_throughput_bps[i]) - uplink_throughput_bps.append(history.uplink_throughput_bps[i]) + + latency = None + try: + if history.pop_ping_drop_rate[i] < 1: + latency = history.pop_ping_latency_ms[i] + except (AttributeError, IndexError, TypeError): + pass + pop_ping_latency_ms.append(latency) + + downlink = None + try: + downlink = history.downlink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass + downlink_throughput_bps.append(downlink) + + uplink = None + try: + uplink = history.uplink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass + uplink_throughput_bps.append(uplink) return { "samples": parsed_samples, @@ -1209,7 +1250,7 @@ def history_stats( if history is None: try: history = get_history(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e sample_range, parsed_samples, current = _compute_sample_range(history, @@ -1258,12 +1299,25 @@ def history_stats( init_run_length = 0 tot += d - down = history.downlink_throughput_bps[i] + down = 0.0 + try: + down = history.downlink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass usage_down += down - up = history.uplink_throughput_bps[i] + + up = 0.0 + try: + up = history.uplink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass usage_up += up - rtt = history.pop_ping_latency_ms[i] + rtt = 0.0 + try: + rtt = history.pop_ping_latency_ms[i] + except (AttributeError, IndexError, TypeError): + pass # note that "full" here means the opposite of ping drop full if d == 0.0: rtt_full.append(rtt) From 2eebbfc36663f8004c9ffa7dff8a842bade8214a Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:26:24 -0800 Subject: [PATCH 3/4] Protect one more case of history usage --- dish_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dish_common.py b/dish_common.py index 2f667a7..6701a7a 100644 --- a/dish_common.py +++ b/dish_common.py @@ -318,7 +318,7 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): timestamp = int(time.time()) history = starlink_grpc.get_history(context=gstate.context) gstate.timestamp_stats = timestamp - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) history = None From a8717d9549f7da0260884a9faa93a9845c241f9b Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:45:37 -0800 Subject: [PATCH 4/4] Protect other usage vs grpc protocol changes Change all uses of protobuf messages other than status and history data so that they will not crash the calling script in the case where the grpc protocol removes the message or field being accessed. Instead, they will return None for the affected field (which most calling scripts interpret as "no data") or raise the same error as when the dish is not reachable. This is for issue #66. --- starlink_grpc.py | 42 +++++++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/starlink_grpc.py b/starlink_grpc.py index aa36009..6e4a13b 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -535,7 +535,7 @@ class GrpcError(Exception): msg = e.details() elif isinstance(e, grpc.RpcError): msg = "Unknown communication or service error" - elif isinstance(e, (AttributeError, ValueError)): + elif isinstance(e, (AttributeError, IndexError, TypeError, ValueError)): msg = "Protocol error" else: msg = str(e) @@ -829,6 +829,9 @@ def get_location(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel): if imports_pending: @@ -857,7 +860,7 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict: """ try: location = get_location(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: if isinstance(e, grpc.Call) and e.code() is grpc.StatusCode.PERMISSION_DENIED: return { "latitude": None, @@ -866,11 +869,17 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict: } raise GrpcError(e) from e - return { - "latitude": location.lla.lat, - "longitude": location.lla.lon, - "altitude": location.lla.alt, - } + try: + return { + "latitude": location.lla.lat, + "longitude": location.lla.lon, + "altitude": getattr(location.lla, "alt", None), + } + except AttributeError as e: + # Allow None for altitude, but since all None values has special + # meaning for this function, any other protocol change is flagged as + # an error. + raise GrpcError(e) from e def history_bulk_field_names(): @@ -1424,6 +1433,9 @@ def get_obstruction_map(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel: grpc.Channel): if imports_pending: @@ -1450,15 +1462,19 @@ def obstruction_map(context: Optional[ChannelContext] = None): representation the SNR data instead, see `get_obstruction_map`. Raises: - GrpcError: Failed getting status info from the Starlink user terminal. + GrpcError: Failed getting obstruction data from the Starlink user + terminal. """ try: map_data = get_obstruction_map(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e - cols = map_data.num_cols - return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) + try: + cols = map_data.num_cols + return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) + except (AttributeError, IndexError, TypeError) as e: + raise GrpcError(e) from e def reboot(context: Optional[ChannelContext] = None) -> None: @@ -1482,7 +1498,7 @@ def reboot(context: Optional[ChannelContext] = None) -> None: try: call_with_channel(grpc_call, context=context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e @@ -1509,5 +1525,5 @@ def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = Non try: call_with_channel(grpc_call, context=context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e