Merge pull request #75 from sparky8512/protocol-robustness

Robustness against field removal in gRPC protocol

for issue #66
This commit is contained in:
sparky8512 2023-02-02 08:49:34 -08:00 committed by GitHub
commit 237cc349cf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 167 additions and 69 deletions

View file

@ -318,7 +318,7 @@ def get_history_stats(opts, gstate, add_item, add_sequence, flush_history):
timestamp = int(time.time()) timestamp = int(time.time())
history = starlink_grpc.get_history(context=gstate.context) history = starlink_grpc.get_history(context=gstate.context)
gstate.timestamp_stats = timestamp gstate.timestamp_stats = timestamp
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e))) conn_error(opts, "Failure getting history: %s", str(starlink_grpc.GrpcError(e)))
history = None history = None

View file

@ -535,6 +535,8 @@ class GrpcError(Exception):
msg = e.details() msg = e.details()
elif isinstance(e, grpc.RpcError): elif isinstance(e, grpc.RpcError):
msg = "Unknown communication or service error" msg = "Unknown communication or service error"
elif isinstance(e, (AttributeError, IndexError, TypeError, ValueError)):
msg = "Protocol error"
else: else:
msg = str(e) msg = str(e)
super().__init__(msg, *args, **kwargs) super().__init__(msg, *args, **kwargs)
@ -617,8 +619,11 @@ def status_field_names(context: Optional[ChannelContext] = None):
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) from e raise GrpcError(e) from e
alert_names = [] alert_names = []
try:
for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: for field in dish_pb2.DishAlerts.DESCRIPTOR.fields:
alert_names.append("alert_" + field.name) alert_names.append("alert_" + field.name)
except AttributeError:
pass
return _field_names(StatusDict), _field_names(ObstructionDict), alert_names return _field_names(StatusDict), _field_names(ObstructionDict), alert_names
@ -646,8 +651,12 @@ def status_field_types(context: Optional[ChannelContext] = None):
call_with_channel(resolve_imports, context=context) call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e: except grpc.RpcError as e:
raise GrpcError(e) from e raise GrpcError(e) from e
return (_field_types(StatusDict), _field_types(ObstructionDict), num_alerts = 0
[bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) try:
num_alerts = len(dish_pb2.DishAlerts.DESCRIPTOR.fields)
except AttributeError:
pass
return (_field_types(StatusDict), _field_types(ObstructionDict), [bool] * num_alerts)
def get_status(context: Optional[ChannelContext] = None): def get_status(context: Optional[ChannelContext] = None):
@ -661,6 +670,9 @@ def get_status(context: Optional[ChannelContext] = None):
Raises: Raises:
grpc.RpcError: Communication or service error. grpc.RpcError: Communication or service error.
AttributeError, ValueError: Protocol error. Either the target is not a
Starlink user terminal or the grpc protocol has changed in a way
this module cannot handle.
""" """
def grpc_call(channel): def grpc_call(channel):
if imports_pending: if imports_pending:
@ -689,7 +701,7 @@ def get_id(context: Optional[ChannelContext] = None) -> str:
try: try:
status = get_status(context) status = get_status(context)
return status.device_info.id return status.device_info.id
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
@ -711,62 +723,78 @@ def status_data(
""" """
try: try:
status = get_status(context) status = get_status(context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
try:
if status.HasField("outage"): if status.HasField("outage"):
if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE: if status.outage.cause == dish_pb2.DishOutage.Cause.NO_SCHEDULE:
# Special case translate this to equivalent old name # Special case translate this to equivalent old name
state = "SEARCHING" state = "SEARCHING"
else: else:
try:
state = dish_pb2.DishOutage.Cause.Name(status.outage.cause) state = dish_pb2.DishOutage.Cause.Name(status.outage.cause)
except ValueError:
# Unlikely, but possible if dish is running newer firmware
# than protocol data pulled via reflection
state = str(status.outage.cause)
else: else:
state = "CONNECTED" state = "CONNECTED"
except (AttributeError, ValueError):
state = "UNKNOWN"
# More alerts may be added in future, so in addition to listing them # More alerts may be added in future, so in addition to listing them
# individually, provide a bit field based on field numbers of the # individually, provide a bit field based on field numbers of the
# DishAlerts message. # DishAlerts message.
alerts = {} alerts = {}
alert_bits = 0 alert_bits = 0
try:
for field in status.alerts.DESCRIPTOR.fields: for field in status.alerts.DESCRIPTOR.fields:
value = getattr(status.alerts, field.name) value = getattr(status.alerts, field.name, False)
alerts["alert_" + field.name] = value alerts["alert_" + field.name] = value
if field.number < 65: if field.number < 65:
alert_bits |= (1 if value else 0) << (field.number - 1) alert_bits |= (1 if value else 0) << (field.number - 1)
except AttributeError:
pass
if (status.obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0
and not math.isnan(status.obstruction_stats.avg_prolonged_obstruction_interval_s)):
obstruction_duration = status.obstruction_stats.avg_prolonged_obstruction_duration_s
obstruction_interval = status.obstruction_stats.avg_prolonged_obstruction_interval_s
else:
obstruction_duration = None obstruction_duration = None
obstruction_interval = None obstruction_interval = None
obstruction_stats = getattr(status, "obstruction_stats", None)
if obstruction_stats is not None:
try:
if (obstruction_stats.avg_prolonged_obstruction_duration_s > 0.0
and not math.isnan(obstruction_stats.avg_prolonged_obstruction_interval_s)):
obstruction_duration = obstruction_stats.avg_prolonged_obstruction_duration_s
obstruction_interval = obstruction_stats.avg_prolonged_obstruction_interval_s
except AttributeError:
pass
device_info = getattr(status, "device_info", None)
return { return {
"id": status.device_info.id, "id": getattr(device_info, "id", None),
"hardware_version": status.device_info.hardware_version, "hardware_version": getattr(device_info, "hardware_version", None),
"software_version": status.device_info.software_version, "software_version": getattr(device_info, "software_version", None),
"state": state, "state": state,
"uptime": status.device_state.uptime_s, "uptime": getattr(getattr(status, "device_state", None), "uptime_s", None),
"snr": None, # obsoleted in grpc service "snr": None, # obsoleted in grpc service
"seconds_to_first_nonempty_slot": status.seconds_to_first_nonempty_slot, "seconds_to_first_nonempty_slot": getattr(status, "seconds_to_first_nonempty_slot", None),
"pop_ping_drop_rate": status.pop_ping_drop_rate, "pop_ping_drop_rate": getattr(status, "pop_ping_drop_rate", None),
"downlink_throughput_bps": status.downlink_throughput_bps, "downlink_throughput_bps": getattr(status, "downlink_throughput_bps", None),
"uplink_throughput_bps": status.uplink_throughput_bps, "uplink_throughput_bps": getattr(status, "uplink_throughput_bps", None),
"pop_ping_latency_ms": status.pop_ping_latency_ms, "pop_ping_latency_ms": getattr(status, "pop_ping_latency_ms", None),
"alerts": alert_bits, "alerts": alert_bits,
"fraction_obstructed": status.obstruction_stats.fraction_obstructed, "fraction_obstructed": getattr(obstruction_stats, "fraction_obstructed", None),
"currently_obstructed": status.obstruction_stats.currently_obstructed, "currently_obstructed": getattr(obstruction_stats, "currently_obstructed", None),
"seconds_obstructed": None, # obsoleted in grpc service "seconds_obstructed": None, # obsoleted in grpc service
"obstruction_duration": obstruction_duration, "obstruction_duration": obstruction_duration,
"obstruction_interval": obstruction_interval, "obstruction_interval": obstruction_interval,
"direction_azimuth": status.boresight_azimuth_deg, "direction_azimuth": getattr(status, "boresight_azimuth_deg", None),
"direction_elevation": status.boresight_elevation_deg, "direction_elevation": getattr(status, "boresight_elevation_deg", None),
"is_snr_above_noise_floor": status.is_snr_above_noise_floor, "is_snr_above_noise_floor": getattr(status, "is_snr_above_noise_floor", None),
}, { }, {
"wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service "wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service
"raw_wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service "raw_wedges_fraction_obstructed[]": [None] * 12, # obsoleted in grpc service
"valid_s": status.obstruction_stats.valid_s, "valid_s": getattr(obstruction_stats, "valid_s", None),
}, alerts }, alerts
@ -801,6 +829,9 @@ def get_location(context: Optional[ChannelContext] = None):
Raises: Raises:
grpc.RpcError: Communication or service error. grpc.RpcError: Communication or service error.
AttributeError, ValueError: Protocol error. Either the target is not a
Starlink user terminal or the grpc protocol has changed in a way
this module cannot handle.
""" """
def grpc_call(channel): def grpc_call(channel):
if imports_pending: if imports_pending:
@ -829,7 +860,7 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict:
""" """
try: try:
location = get_location(context) location = get_location(context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
if isinstance(e, grpc.Call) and e.code() is grpc.StatusCode.PERMISSION_DENIED: if isinstance(e, grpc.Call) and e.code() is grpc.StatusCode.PERMISSION_DENIED:
return { return {
"latitude": None, "latitude": None,
@ -838,11 +869,17 @@ def location_data(context: Optional[ChannelContext] = None) -> LocationDict:
} }
raise GrpcError(e) from e raise GrpcError(e) from e
try:
return { return {
"latitude": location.lla.lat, "latitude": location.lla.lat,
"longitude": location.lla.lon, "longitude": location.lla.lon,
"altitude": location.lla.alt, "altitude": getattr(location.lla, "alt", None),
} }
except AttributeError as e:
# Allow None for altitude, but since all None values has special
# meaning for this function, any other protocol change is flagged as
# an error.
raise GrpcError(e) from e
def history_bulk_field_names(): def history_bulk_field_names():
@ -927,6 +964,9 @@ def get_history(context: Optional[ChannelContext] = None):
Raises: Raises:
grpc.RpcError: Communication or service error. grpc.RpcError: Communication or service error.
AttributeError, ValueError: Protocol error. Either the target is not a
Starlink user terminal or the grpc protocol has changed in a way
this module cannot handle.
""" """
def grpc_call(channel: grpc.Channel): def grpc_call(channel: grpc.Channel):
if imports_pending: if imports_pending:
@ -942,8 +982,12 @@ def _compute_sample_range(history,
parse_samples: int, parse_samples: int,
start: Optional[int] = None, start: Optional[int] = None,
verbose: bool = False): verbose: bool = False):
try:
current = int(history.current) current = int(history.current)
samples = len(history.pop_ping_drop_rate) samples = len(history.pop_ping_drop_rate)
except (AttributeError, TypeError):
# Without current and pop_ping_drop_rate, history is unusable.
return range(0), 0, None
if verbose: if verbose:
print("current counter: " + str(current)) print("current counter: " + str(current))
@ -1018,8 +1062,14 @@ def concatenate_history(history1,
An object with the unwrapped history data and the same attribute An object with the unwrapped history data and the same attribute
fields as a grpc history object. fields as a grpc history object.
""" """
try:
size2 = len(history2.pop_ping_drop_rate) size2 = len(history2.pop_ping_drop_rate)
new_samples = history2.current - history1.current new_samples = history2.current - history1.current
except (AttributeError, TypeError):
# Something is wrong. Probably both history objects are bad, so no
# point in trying to combine them.
return history1
if new_samples < 0: if new_samples < 0:
if verbose: if verbose:
print("Dish reboot detected. Appending anyway.") print("Dish reboot detected. Appending anyway.")
@ -1034,6 +1084,7 @@ def concatenate_history(history1,
unwrapped = UnwrappedHistory() unwrapped = UnwrappedHistory()
for field in HISTORY_FIELDS: for field in HISTORY_FIELDS:
if hasattr(history1, field) and hasattr(history2, field):
setattr(unwrapped, field, []) setattr(unwrapped, field, [])
unwrapped.unwrapped = True unwrapped.unwrapped = True
@ -1041,12 +1092,20 @@ def concatenate_history(history1,
history1, samples1, start=start1) history1, samples1, start=start1)
for i in sample_range: for i in sample_range:
for field in HISTORY_FIELDS: for field in HISTORY_FIELDS:
if hasattr(unwrapped, field):
try:
getattr(unwrapped, field).append(getattr(history1, field)[i]) getattr(unwrapped, field).append(getattr(history1, field)[i])
except (IndexError, TypeError):
pass
sample_range, ignore1, ignore2 = _compute_sample_range(history2, new_samples) # pylint: disable=unused-variable sample_range, ignore1, ignore2 = _compute_sample_range(history2, new_samples) # pylint: disable=unused-variable
for i in sample_range: for i in sample_range:
for field in HISTORY_FIELDS: for field in HISTORY_FIELDS:
if hasattr(unwrapped, field):
try:
getattr(unwrapped, field).append(getattr(history2, field)[i]) getattr(unwrapped, field).append(getattr(history2, field)[i])
except (IndexError, TypeError):
pass
unwrapped.current = history2.current unwrapped.current = history2.current
return unwrapped return unwrapped
@ -1096,7 +1155,7 @@ def history_bulk_data(parse_samples: int,
if history is None: if history is None:
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
sample_range, parsed_samples, current = _compute_sample_range(history, sample_range, parsed_samples, current = _compute_sample_range(history,
@ -1110,11 +1169,30 @@ def history_bulk_data(parse_samples: int,
uplink_throughput_bps = [] uplink_throughput_bps = []
for i in sample_range: for i in sample_range:
# pop_ping_drop_rate is checked in _compute_sample_range
pop_ping_drop_rate.append(history.pop_ping_drop_rate[i]) pop_ping_drop_rate.append(history.pop_ping_drop_rate[i])
pop_ping_latency_ms.append(
history.pop_ping_latency_ms[i] if history.pop_ping_drop_rate[i] < 1 else None) latency = None
downlink_throughput_bps.append(history.downlink_throughput_bps[i]) try:
uplink_throughput_bps.append(history.uplink_throughput_bps[i]) if history.pop_ping_drop_rate[i] < 1:
latency = history.pop_ping_latency_ms[i]
except (AttributeError, IndexError, TypeError):
pass
pop_ping_latency_ms.append(latency)
downlink = None
try:
downlink = history.downlink_throughput_bps[i]
except (AttributeError, IndexError, TypeError):
pass
downlink_throughput_bps.append(downlink)
uplink = None
try:
uplink = history.uplink_throughput_bps[i]
except (AttributeError, IndexError, TypeError):
pass
uplink_throughput_bps.append(uplink)
return { return {
"samples": parsed_samples, "samples": parsed_samples,
@ -1181,7 +1259,7 @@ def history_stats(
if history is None: if history is None:
try: try:
history = get_history(context) history = get_history(context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
sample_range, parsed_samples, current = _compute_sample_range(history, sample_range, parsed_samples, current = _compute_sample_range(history,
@ -1230,12 +1308,25 @@ def history_stats(
init_run_length = 0 init_run_length = 0
tot += d tot += d
down = 0.0
try:
down = history.downlink_throughput_bps[i] down = history.downlink_throughput_bps[i]
except (AttributeError, IndexError, TypeError):
pass
usage_down += down usage_down += down
up = 0.0
try:
up = history.uplink_throughput_bps[i] up = history.uplink_throughput_bps[i]
except (AttributeError, IndexError, TypeError):
pass
usage_up += up usage_up += up
rtt = 0.0
try:
rtt = history.pop_ping_latency_ms[i] rtt = history.pop_ping_latency_ms[i]
except (AttributeError, IndexError, TypeError):
pass
# note that "full" here means the opposite of ping drop full # note that "full" here means the opposite of ping drop full
if d == 0.0: if d == 0.0:
rtt_full.append(rtt) rtt_full.append(rtt)
@ -1342,6 +1433,9 @@ def get_obstruction_map(context: Optional[ChannelContext] = None):
Raises: Raises:
grpc.RpcError: Communication or service error. grpc.RpcError: Communication or service error.
AttributeError, ValueError: Protocol error. Either the target is not a
Starlink user terminal or the grpc protocol has changed in a way
this module cannot handle.
""" """
def grpc_call(channel: grpc.Channel): def grpc_call(channel: grpc.Channel):
if imports_pending: if imports_pending:
@ -1368,15 +1462,19 @@ def obstruction_map(context: Optional[ChannelContext] = None):
representation the SNR data instead, see `get_obstruction_map`. representation the SNR data instead, see `get_obstruction_map`.
Raises: Raises:
GrpcError: Failed getting status info from the Starlink user terminal. GrpcError: Failed getting obstruction data from the Starlink user
terminal.
""" """
try: try:
map_data = get_obstruction_map(context) map_data = get_obstruction_map(context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
try:
cols = map_data.num_cols cols = map_data.num_cols
return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols))
except (AttributeError, IndexError, TypeError) as e:
raise GrpcError(e) from e
def reboot(context: Optional[ChannelContext] = None) -> None: def reboot(context: Optional[ChannelContext] = None) -> None:
@ -1400,7 +1498,7 @@ def reboot(context: Optional[ChannelContext] = None) -> None:
try: try:
call_with_channel(grpc_call, context=context) call_with_channel(grpc_call, context=context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
@ -1427,5 +1525,5 @@ def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = Non
try: try:
call_with_channel(grpc_call, context=context) call_with_channel(grpc_call, context=context)
except grpc.RpcError as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e