From 5ff207ed8c56d2a2e6b9b3bc1771098d1b881e25 Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Wed, 7 Sep 2022 13:44:54 -0700 Subject: [PATCH 1/2] Type hints for status and history return values Implement type hints for the data returned from status_data, history_stats, and history_bulk_data, and modify the *_field_names and *_field_types functions to make use of the same data. This is complicated a little by the fact that the field names returned for sequences in the returned data are slightly different from those in *_field_names, for reasons that I'm sure made perfect sense to me at the time. Should work with static type checkers on Python 3.8 and later. Python 3.7 is still supported for the run time, but possibly in a way that static type checking will not be able to understand. This is for issue #57 --- starlink_grpc.py | 305 +++++++++++++++++++++++++---------------------- 1 file changed, 160 insertions(+), 145 deletions(-) diff --git a/starlink_grpc.py b/starlink_grpc.py index 3379e01..2a167ba 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -346,7 +346,18 @@ period. from itertools import chain import math import statistics -from typing import Optional, Tuple +from typing import Any, Dict, Optional, Sequence, Tuple, get_type_hints +try: + from typing import TypedDict, get_args +except ImportError: + # Python 3.7 does not have TypedDict, so fake it so the run time still + # works, even though static type checker probably will not. + def TypedDict(name, types): + return type(name, (dict,), {"__annotations__": types}) + + def get_args(tp: Any) -> tuple[Any, ...]: + return tp.__args__ + import grpc @@ -368,6 +379,126 @@ REQUEST_TIMEOUT = 10 HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_throughput_bps", "uplink_throughput_bps") +StatusDict = TypedDict( + "StatusDict", { + "id": str, + "hardware_version": str, + "software_version": str, + "state": str, + "uptime": int, + "snr": Optional[float], + "seconds_to_first_nonempty_slot": float, + "pop_ping_drop_rate": float, + "downlink_throughput_bps": float, + "uplink_throughput_bps": float, + "pop_ping_latency_ms": float, + "alerts": int, + "fraction_obstructed": float, + "currently_obstructed": bool, + "seconds_obstructed": Optional[float], + "obstruction_duration": Optional[float], + "obstruction_interval": Optional[float], + "direction_azimuth": float, + "direction_elevation": float, + "is_snr_above_noise_floor": bool, + }) + +ObstructionDict = TypedDict( + "ObstructionDict", { + "wedges_fraction_obstructed[]": Sequence[float], + "raw_wedges_fraction_obstructed[]": Sequence[float], + "valid_s": float, + }) + +AlertDict = Dict[str, bool] + +HistGeneralDict = TypedDict("HistGeneralDict", {"samples": int, "end_counter": int}) + +HistBulkDict = TypedDict( + "HistBulkDict", { + "pop_ping_drop_rate": Sequence[float], + "pop_ping_latency_ms": Sequence[float], + "downlink_throughput_bps": Sequence[float], + "uplink_throughput_bps": Sequence[float], + "snr": Sequence[Optional[float]], + "scheduled": Sequence[Optional[bool]], + "obstructed": Sequence[Optional[bool]], + }) + +PingDropDict = TypedDict( + "PingDropDict", { + "total_ping_drop": float, + "count_full_ping_drop": int, + "count_obstructed": int, + "total_obstructed_ping_drop": float, + "count_full_obstructed_ping_drop": int, + "count_unscheduled": int, + "total_unscheduled_ping_drop": float, + "count_full_unscheduled_ping_drop": int, + }) + +PingDropRlDict = TypedDict( + "PingDropRlDict", { + "init_run_fragment": int, + "final_run_fragment": int, + "run_seconds[1,]": Sequence[int], + "run_minutes[1,]": Sequence[int], + }) + +PingLatencyDict = TypedDict( + "PingLatencyDict", { + "mean_all_ping_latency": float, + "deciles_all_ping_latency[]": Sequence[float], + "mean_full_ping_latency": float, + "deciles_full_ping_latency[]": Sequence[float], + "stdev_full_ping_latency": float, + }) + +LoadedLatencyDict = TypedDict( + "LoadedLatencyDict", { + "load_bucket_samples[]": Sequence[int], + "load_bucket_min_latency[]": Sequence[Optional[float]], + "load_bucket_median_latency[]": Sequence[Optional[float]], + "load_bucket_max_latency[]": Sequence[Optional[float]], + }) + +UsageDict = TypedDict("UsageDict", {"download_usage": int, "upload_usage": int}) + +# For legacy reasons, there is a slight difference between the field names +# returned in the actual data vs the *_field_names functions. This is a map of +# the differences. Bulk data fields are handled separately because the field +# "snr" overlaps with a status field and needs to map differently. +_FIELD_NAME_MAP = { + "wedges_fraction_obstructed[]": "wedges_fraction_obstructed[12]", + "raw_wedges_fraction_obstructed[]": "raw_wedges_fraction_obstructed[12]", + "run_seconds[1,]": "run_seconds[1,61]", + "run_minutes[1,]": "run_minutes[1,61]", + "deciles_all_ping_latency[]": "deciles_all_ping_latency[11]", + "deciles_full_ping_latency[]": "deciles_full_ping_latency[11]", + "load_bucket_samples[]": "load_bucket_samples[15]", + "load_bucket_min_latency[]": "load_bucket_min_latency[15]", + "load_bucket_median_latency[]": "load_bucket_median_latency[15]", + "load_bucket_max_latency[]": "load_bucket_max_latency[15]", +} + + +def _field_names(hint_type): + return list(_FIELD_NAME_MAP.get(key, key) for key in get_type_hints(hint_type).keys()) + + +def _field_names_bulk(hint_type): + return list(key + "[]" for key in get_type_hints(hint_type).keys()) + + +def _field_types(hint_type): + def xlate(value): + while not isinstance(value, type): + args = get_args(value) + value = args[0] if args[0] is not type(None) else args[1] + return value + + return list(xlate(val) for val in get_type_hints(hint_type).values()) + def resolve_imports(channel: grpc.Channel): importer.resolve_lazy_imports(channel) @@ -467,32 +598,7 @@ def status_field_names(context: Optional[ChannelContext] = None): for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: alert_names.append("alert_" + field.name) - return [ - "id", - "hardware_version", - "software_version", - "state", - "uptime", - "snr", - "seconds_to_first_nonempty_slot", - "pop_ping_drop_rate", - "downlink_throughput_bps", - "uplink_throughput_bps", - "pop_ping_latency_ms", - "alerts", - "fraction_obstructed", - "currently_obstructed", - "seconds_obstructed", - "obstruction_duration", - "obstruction_interval", - "direction_azimuth", - "direction_elevation", - "is_snr_above_noise_floor", - ], [ - "wedges_fraction_obstructed[12]", - "raw_wedges_fraction_obstructed[12]", - "valid_s", - ], alert_names + return _field_names(StatusDict), _field_names(ObstructionDict), alert_names def status_field_types(context: Optional[ChannelContext] = None): @@ -518,32 +624,8 @@ def status_field_types(context: Optional[ChannelContext] = None): call_with_channel(resolve_imports, context=context) except grpc.RpcError as e: raise GrpcError(e) - return [ - str, # id - str, # hardware_version - str, # software_version - str, # state - int, # uptime - float, # snr - float, # seconds_to_first_nonempty_slot - float, # pop_ping_drop_rate - float, # downlink_throughput_bps - float, # uplink_throughput_bps - float, # pop_ping_latency_ms - int, # alerts - float, # fraction_obstructed - bool, # currently_obstructed - float, # seconds_obstructed - float, # obstruction_duration - float, # obstruction_interval - float, # direction_azimuth - float, # direction_elevation - bool, # is_snr_above_noise_floor - ], [ - float, # wedges_fraction_obstructed[] - float, # raw_wedges_fraction_obstructed[] - float, # valid_s - ], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields) + return (_field_types(StatusDict), _field_types(ObstructionDict), + [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) def get_status(context: Optional[ChannelContext] = None): @@ -589,7 +671,8 @@ def get_id(context: Optional[ChannelContext] = None) -> str: raise GrpcError(e) -def status_data(context: Optional[ChannelContext] = None): +def status_data( + context: Optional[ChannelContext] = None) -> Tuple[StatusDict, ObstructionDict, AlertDict]: """Fetch current status data. Args: @@ -675,18 +758,7 @@ def history_bulk_field_names(): A tuple with 2 lists, the first with general data names, the second with bulk history data names. """ - return [ - "samples", - "end_counter", - ], [ - "pop_ping_drop_rate[]", - "pop_ping_latency_ms[]", - "downlink_throughput_bps[]", - "uplink_throughput_bps[]", - "snr[]", - "scheduled[]", - "obstructed[]", - ] + return _field_names(HistGeneralDict), _field_names_bulk(HistBulkDict) def history_bulk_field_types(): @@ -699,18 +771,7 @@ def history_bulk_field_types(): A tuple with 2 lists, the first with general data types, the second with bulk history data types. """ - return [ - int, # samples - int, # end_counter - ], [ - float, # pop_ping_drop_rate[] - float, # pop_ping_latency_ms[] - float, # downlink_throughput_bps[] - float, # uplink_throughput_bps[] - float, # snr[] - bool, # scheduled[] - bool, # obstructed[] - ] + return _field_types(HistGeneralDict), _field_types(HistBulkDict) def history_ping_field_names(): @@ -734,38 +795,8 @@ def history_stats_field_names(): additional data groups, so it not recommended for the caller to assume exactly 6 elements. """ - return [ - "samples", - "end_counter", - ], [ - "total_ping_drop", - "count_full_ping_drop", - "count_obstructed", - "total_obstructed_ping_drop", - "count_full_obstructed_ping_drop", - "count_unscheduled", - "total_unscheduled_ping_drop", - "count_full_unscheduled_ping_drop", - ], [ - "init_run_fragment", - "final_run_fragment", - "run_seconds[1,61]", - "run_minutes[1,61]", - ], [ - "mean_all_ping_latency", - "deciles_all_ping_latency[11]", - "mean_full_ping_latency", - "deciles_full_ping_latency[11]", - "stdev_full_ping_latency", - ], [ - "load_bucket_samples[15]", - "load_bucket_min_latency[15]", - "load_bucket_median_latency[15]", - "load_bucket_max_latency[15]", - ], [ - "download_usage", - "upload_usage", - ] + return (_field_names(HistGeneralDict), _field_names(PingDropDict), _field_names(PingDropRlDict), + _field_names(PingLatencyDict), _field_names(LoadedLatencyDict), _field_names(UsageDict)) def history_stats_field_types(): @@ -784,38 +815,8 @@ def history_stats_field_types(): additional data groups, so it not recommended for the caller to assume exactly 6 elements. """ - return [ - int, # samples - int, # end_counter - ], [ - float, # total_ping_drop - int, # count_full_ping_drop - int, # count_obstructed - float, # total_obstructed_ping_drop - int, # count_full_obstructed_ping_drop - int, # count_unscheduled - float, # total_unscheduled_ping_drop - int, # count_full_unscheduled_ping_drop - ], [ - int, # init_run_fragment - int, # final_run_fragment - int, # run_seconds[] - int, # run_minutes[] - ], [ - float, # mean_all_ping_latency - float, # deciles_all_ping_latency[] - float, # mean_full_ping_latency - float, # deciles_full_ping_latency[] - float, # stdev_full_ping_latency - ], [ - int, # load_bucket_samples[] - float, # load_bucket_min_latency[] - float, # load_bucket_median_latency[] - float, # load_bucket_max_latency[] - ], [ - int, # download_usage - int, # upload_usage - ] + return (_field_types(HistGeneralDict), _field_types(PingDropDict), _field_types(PingDropRlDict), + _field_types(PingLatencyDict), _field_types(LoadedLatencyDict), _field_types(UsageDict)) def get_history(context: Optional[ChannelContext] = None): @@ -946,7 +947,11 @@ def concatenate_history(history1, history2, samples1: int = -1, start1: Optional return unwrapped -def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): +def history_bulk_data(parse_samples: int, + start: Optional[int] = None, + verbose: bool = False, + context: Optional[ChannelContext] = None, + history=None) -> Tuple[HistGeneralDict, HistBulkDict]: """Fetch history data for a range of samples. Args: @@ -1020,12 +1025,22 @@ def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: } -def history_ping_stats(parse_samples: int, verbose: bool = False, context: Optional[ChannelContext] = None): +def history_ping_stats(parse_samples: int, + verbose: bool = False, + context: Optional[ChannelContext] = None + ) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict]: """Deprecated. Use history_stats instead.""" return history_stats(parse_samples, verbose=verbose, context=context)[0:3] -def history_stats(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): +def history_stats( + parse_samples: int, + start: Optional[int] = None, + verbose: bool = False, + context: Optional[ChannelContext] = None, + history=None +) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict, PingLatencyDict, LoadedLatencyDict, + UsageDict]: """Fetch, parse, and compute ping and usage stats. Note: From 1167742f75fa454362a5d48125b2c972d2a132df Mon Sep 17 00:00:00 2001 From: sparky8512 <76499194+sparky8512@users.noreply.github.com> Date: Fri, 9 Sep 2022 11:01:53 -0700 Subject: [PATCH 2/2] Correct Python 3.7 compatibility hack mypy complained about the signature of conditional def of get_args not matching the one from typing module, so I made it match, but apparently hadn't actually tested it with Python 3.7. It was just the return type hint, but it sill needs to parse at run time. Also, fix up a few other (non-error) gripes from pylint. --- starlink_grpc.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/starlink_grpc.py b/starlink_grpc.py index 2a167ba..7ce9e9c 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -352,10 +352,10 @@ try: except ImportError: # Python 3.7 does not have TypedDict, so fake it so the run time still # works, even though static type checker probably will not. - def TypedDict(name, types): + def TypedDict(name, types): # pylint: disable=invalid-name return type(name, (dict,), {"__annotations__": types}) - def get_args(tp: Any) -> tuple[Any, ...]: + def get_args(tp: Any) -> Tuple[Any, ...]: return tp.__args__ @@ -483,11 +483,11 @@ _FIELD_NAME_MAP = { def _field_names(hint_type): - return list(_FIELD_NAME_MAP.get(key, key) for key in get_type_hints(hint_type).keys()) + return list(_FIELD_NAME_MAP.get(key, key) for key in get_type_hints(hint_type)) def _field_names_bulk(hint_type): - return list(key + "[]" for key in get_type_hints(hint_type).keys()) + return list(key + "[]" for key in get_type_hints(hint_type)) def _field_types(hint_type):