diff --git a/dish_common.py b/dish_common.py index 2f667a7..6701a7a 100644 --- a/dish_common.py +++ b/dish_common.py @@ -318,7 +318,7 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history): timestamp = int(time.time()) history = starlink_grpc.get_history(context=gstate.context) gstate.timestamp_stats = timestamp - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) history = None diff --git a/starlink_grpc.py b/starlink_grpc.py index c3e62a3..6e4a13b 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -535,6 +535,8 @@ class GrpcError(Exception): msg = e.details() elif isinstance(e, grpc.RpcError): msg = "Unknown communication or service error" + elif isinstance(e, (AttributeError, IndexError, TypeError, ValueError)): + msg = "Protocol error" else: msg = str(e) super().__init__(msg, *args, **kwargs) @@ -617,8 +619,11 @@ def status_field_names(context: Optional[ChannelContext] = None): except grpc.RpcError as e: raise GrpcError(e) from e alert_names = [] - for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: - alert_names.append("alert_" + field.name) + try: + for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: + alert_names.append("alert_" + field.name) + except AttributeError: + pass return _field_names(StatusDict), _field_names(ObstructionDict), alert_names @@ -646,8 +651,12 @@ def status_field_types(context: Optional[ChannelContext] = None): call_with_channel(resolve_imports, context=context) except grpc.RpcError as e: raise GrpcError(e) from e - return (_field_types(StatusDict), _field_types(ObstructionDict), - [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) + num_alerts = 0 + try: + num_alerts = len(dish_pb2.DishAlerts.DESCRIPTOR.fields) + except AttributeError: + pass + return (_field_types(StatusDict), _field_types(ObstructionDict), [bool] * num_alerts) def get_status(context: Optional[ChannelContext] = None): @@ -661,6 +670,9 @@ def get_status(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel): if imports_pending: @@ -689,7 +701,7 @@ def get_id(context: Optional[ChannelContext] = None) -> str: try: status = get_status(context) return status.device_info.id - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e @@ -711,62 +723,78 @@ def status_data( """ try: status = get_status(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e - if status.HasField("outage"): - if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: - # Special case translate this to equivalent old name - state = "SEARCHING" + try: + if status.HasField("outage"): + if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: + # Special case translate this to equivalent old name + state = "SEARCHING" + else: + try: + state = dish_pb2.DishOutage.Cause.Name(status.outage.cause) + except ValueError: + # Unlikely, but possible if dish is running newer firmware + # than protocol data pulled via reflection + state = str(status.outage.cause) else: - state = dish_pb2.DishOutage.Cause.Name(status.outage.cause) - else: - state = "CONNECTED" + state = "CONNECTED" + except (AttributeError, ValueError): + state = "UNKNOWN" # More alerts may be added in future, so in addition to listing them # individually, provide a bit field based on field numbers of the # DishAlerts message. alerts = {} alert_bits = 0 - for field in status.alerts.DESCRIPTOR.fields: - value = getattr(status.alerts, field.name) - alerts["alert_" + field.name] = value - if field.number < 65: - alert_bits |= (1 if value else 0) << (field.number - 1) + try: + for field in status.alerts.DESCRIPTOR.fields: + value = getattr(status.alerts, field.name, False) + alerts["alert_" + field.name] = value + if field.number < 65: + alert_bits |= (1 if value else 0) << (field.number - 1) + except AttributeError: + pass - if (status.obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0 - and not math.isnan(status.obstruction_stats.avg_prolonged_obstruction_interval_s)): - obstruction_duration = status.obstruction_stats.avg_prolonged_obstruction_duration_s - obstruction_interval = status.obstruction_stats.avg_prolonged_obstruction_interval_s - else: - obstruction_duration = None - obstruction_interval = None + obstruction_duration = None + obstruction_interval = None + obstruction_stats = getattr(status, "obstruction_stats", None) + if obstruction_stats is not None: + try: + if (obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0 + and not math.isnan(obstruction_stats.avg_prolonged_obstruction_interval_s)): + obstruction_duration = obstruction_stats.avg_prolonged_obstruction_duration_s + obstruction_interval = obstruction_stats.avg_prolonged_obstruction_interval_s + except AttributeError: + pass + device_info = getattr(status, "device_info", None) return { - "id": status.device_info.id, - "hardware_version": status.device_info.hardware_version, - "software_version": status.device_info.software_version, + "id": getattr(device_info, "id", None), + "hardware_version": getattr(device_info, "hardware_version", None), + "software_version": getattr(device_info, "software_version", None), "state": state, - "uptime": status.device_state.uptime_s, + "uptime": getattr(getattr(status, "device_state", None), "uptime_s", None), "snr": None, # obsoleted in grpc service - "seconds_to_first_nonempty_slot": status.seconds_to_first_nonempty_slot, - "pop_ping_drop_rate": status.pop_ping_drop_rate, - "downlink_throughput_bps": status.downlink_throughput_bps, - "uplink_throughput_bps": status.uplink_throughput_bps, - "pop_ping_latency_ms": status.pop_ping_latency_ms, + "seconds_to_first_nonempty_slot": getattr(status, "seconds_to_first_nonempty_slot", None), + "pop_ping_drop_rate": getattr(status, "pop_ping_drop_rate", None), + "downlink_throughput_bps": getattr(status, "downlink_throughput_bps", None), + "uplink_throughput_bps": getattr(status, "uplink_throughput_bps", None), + "pop_ping_latency_ms": getattr(status, "pop_ping_latency_ms", None), "alerts": alert_bits, - "fraction_obstructed": status.obstruction_stats.fraction_obstructed, - "currently_obstructed": status.obstruction_stats.currently_obstructed, + "fraction_obstructed": getattr(obstruction_stats, "fraction_obstructed", None), + "currently_obstructed": getattr(obstruction_stats, "currently_obstructed", None), "seconds_obstructed": None, # obsoleted in grpc service "obstruction_duration": obstruction_duration, "obstruction_interval": obstruction_interval, - "direction_azimuth": status.boresight_azimuth_deg, - "direction_elevation": status.boresight_elevation_deg, - "is_snr_above_noise_floor": status.is_snr_above_noise_floor, + "direction_azimuth": getattr(status, "boresight_azimuth_deg", None), + "direction_elevation": getattr(status, "boresight_elevation_deg", None), + "is_snr_above_noise_floor": getattr(status, "is_snr_above_noise_floor", None), }, { "wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service "raw_wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service - "valid_s": status.obstruction_stats.valid_s, + "valid_s": getattr(obstruction_stats, "valid_s", None), }, alerts @@ -801,6 +829,9 @@ def get_location(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel): if imports_pending: @@ -829,7 +860,7 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict: """ try: location = get_location(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: if isinstance(e, grpc.Call) and e.code() is grpc.StatusCode.PERMISSION_DENIED: return { "latitude": None, @@ -838,11 +869,17 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict: } raise GrpcError(e) from e - return { - "latitude": location.lla.lat, - "longitude": location.lla.lon, - "altitude": location.lla.alt, - } + try: + return { + "latitude": location.lla.lat, + "longitude": location.lla.lon, + "altitude": getattr(location.lla, "alt", None), + } + except AttributeError as e: + # Allow None for altitude, but since all None values has special + # meaning for this function, any other protocol change is flagged as + # an error. + raise GrpcError(e) from e def history_bulk_field_names(): @@ -927,6 +964,9 @@ def get_history(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel: grpc.Channel): if imports_pending: @@ -942,8 +982,12 @@ def _compute_sample_range(history, parse_samples: int, start: Optional[int] = None, verbose: bool = False): - current = int(history.current) - samples = len(history.pop_ping_drop_rate) + try: + current = int(history.current) + samples = len(history.pop_ping_drop_rate) + except (AttributeError, TypeError): + # Without current and pop_ping_drop_rate, history is unusable. + return range(0), 0, None if verbose: print("current counter: " + str(current)) @@ -1018,8 +1062,14 @@ def concatenate_history(history1, An object with the unwrapped history data and the same attribute fields as a grpc history object. """ - size2 = len(history2.pop_ping_drop_rate) - new_samples = history2.current - history1.current + try: + size2 = len(history2.pop_ping_drop_rate) + new_samples = history2.current - history1.current + except (AttributeError, TypeError): + # Something is wrong. Probably both history objects are bad, so no + # point in trying to combine them. + return history1 + if new_samples < 0: if verbose: print("Dish reboot detected. Appending anyway.") @@ -1034,19 +1084,28 @@ def concatenate_history(history1, unwrapped = UnwrappedHistory() for field in HISTORY_FIELDS: - setattr(unwrapped, field, []) + if hasattr(history1, field) and hasattr(history2, field): + setattr(unwrapped, field, []) unwrapped.unwrapped = True sample_range, ignore1, ignore2 = _compute_sample_range( # pylint: disable=unused-variable history1, samples1, start=start1) for i in sample_range: for field in HISTORY_FIELDS: - getattr(unwrapped, field).append(getattr(history1, field)[i]) + if hasattr(unwrapped, field): + try: + getattr(unwrapped, field).append(getattr(history1, field)[i]) + except (IndexError, TypeError): + pass sample_range, ignore1, ignore2 = _compute_sample_range(history2, new_samples) # pylint: disable=unused-variable for i in sample_range: for field in HISTORY_FIELDS: - getattr(unwrapped, field).append(getattr(history2, field)[i]) + if hasattr(unwrapped, field): + try: + getattr(unwrapped, field).append(getattr(history2, field)[i]) + except (IndexError, TypeError): + pass unwrapped.current = history2.current return unwrapped @@ -1096,7 +1155,7 @@ def history_bulk_data(parse_samples: int, if history is None: try: history = get_history(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e sample_range, parsed_samples, current = _compute_sample_range(history, @@ -1110,11 +1169,30 @@ def history_bulk_data(parse_samples: int, uplink_throughput_bps = [] for i in sample_range: + # pop_ping_drop_rate is checked in _compute_sample_range pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) - pop_ping_latency_ms.append( - history.pop_ping_latency_ms[i] if history.pop_ping_drop_rate[i] < 1 else None) - downlink_throughput_bps.append(history.downlink_throughput_bps[i]) - uplink_throughput_bps.append(history.uplink_throughput_bps[i]) + + latency = None + try: + if history.pop_ping_drop_rate[i] < 1: + latency = history.pop_ping_latency_ms[i] + except (AttributeError, IndexError, TypeError): + pass + pop_ping_latency_ms.append(latency) + + downlink = None + try: + downlink = history.downlink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass + downlink_throughput_bps.append(downlink) + + uplink = None + try: + uplink = history.uplink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass + uplink_throughput_bps.append(uplink) return { "samples": parsed_samples, @@ -1181,7 +1259,7 @@ def history_stats( if history is None: try: history = get_history(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e sample_range, parsed_samples, current = _compute_sample_range(history, @@ -1230,12 +1308,25 @@ def history_stats( init_run_length = 0 tot += d - down = history.downlink_throughput_bps[i] + down = 0.0 + try: + down = history.downlink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass usage_down += down - up = history.uplink_throughput_bps[i] + + up = 0.0 + try: + up = history.uplink_throughput_bps[i] + except (AttributeError, IndexError, TypeError): + pass usage_up += up - rtt = history.pop_ping_latency_ms[i] + rtt = 0.0 + try: + rtt = history.pop_ping_latency_ms[i] + except (AttributeError, IndexError, TypeError): + pass # note that "full" here means the opposite of ping drop full if d == 0.0: rtt_full.append(rtt) @@ -1342,6 +1433,9 @@ def get_obstruction_map(context: Optional[ChannelContext] = None): Raises: grpc.RpcError: Communication or service error. + AttributeError, ValueError: Protocol error. Either the target is not a + Starlink user terminal or the grpc protocol has changed in a way + this module cannot handle. """ def grpc_call(channel: grpc.Channel): if imports_pending: @@ -1368,15 +1462,19 @@ def obstruction_map(context: Optional[ChannelContext] = None): representation the SNR data instead, see `get_obstruction_map`. Raises: - GrpcError: Failed getting status info from the Starlink user terminal. + GrpcError: Failed getting obstruction data from the Starlink user + terminal. """ try: map_data = get_obstruction_map(context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e - cols = map_data.num_cols - return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) + try: + cols = map_data.num_cols + return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) + except (AttributeError, IndexError, TypeError) as e: + raise GrpcError(e) from e def reboot(context: Optional[ChannelContext] = None) -> None: @@ -1400,7 +1498,7 @@ def reboot(context: Optional[ChannelContext] = None) -> None: try: call_with_channel(grpc_call, context=context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e @@ -1427,5 +1525,5 @@ def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = Non try: call_with_channel(grpc_call, context=context) - except grpc.RpcError as e: + except (AttributeError, ValueError, grpc.RpcError) as e: raise GrpcError(e) from e