diff --git a/starlink_grpc.py b/starlink_grpc.py index 3379e01..7ce9e9c 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -346,7 +346,18 @@ period. from itertools import chain import math import statistics -from typing import Optional, Tuple +from typing import Any, Dict, Optional, Sequence, Tuple, get_type_hints +try: + from typing import TypedDict, get_args +except ImportError: + # Python 3.7 does not have TypedDict, so fake it so the run time still + # works, even though static type checker probably will not. + def TypedDict(name, types): # pylint: disable=invalid-name + return type(name, (dict,), {"__annotations__": types}) + + def get_args(tp: Any) -> Tuple[Any, ...]: + return tp.__args__ + import grpc @@ -368,6 +379,126 @@ REQUEST_TIMEOUT = 10 HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_throughput_bps", "uplink_throughput_bps") +StatusDict = TypedDict( + "StatusDict", { + "id": str, + "hardware_version": str, + "software_version": str, + "state": str, + "uptime": int, + "snr": Optional[float], + "seconds_to_first_nonempty_slot": float, + "pop_ping_drop_rate": float, + "downlink_throughput_bps": float, + "uplink_throughput_bps": float, + "pop_ping_latency_ms": float, + "alerts": int, + "fraction_obstructed": float, + "currently_obstructed": bool, + "seconds_obstructed": Optional[float], + "obstruction_duration": Optional[float], + "obstruction_interval": Optional[float], + "direction_azimuth": float, + "direction_elevation": float, + "is_snr_above_noise_floor": bool, + }) + +ObstructionDict = TypedDict( + "ObstructionDict", { + "wedges_fraction_obstructed[]": Sequence[float], + "raw_wedges_fraction_obstructed[]": Sequence[float], + "valid_s": float, + }) + +AlertDict = Dict[str, bool] + +HistGeneralDict = TypedDict("HistGeneralDict", {"samples": int, "end_counter": int}) + +HistBulkDict = TypedDict( + "HistBulkDict", { + "pop_ping_drop_rate": Sequence[float], + "pop_ping_latency_ms": Sequence[float], + "downlink_throughput_bps": Sequence[float], + "uplink_throughput_bps": Sequence[float], + "snr": Sequence[Optional[float]], + "scheduled": Sequence[Optional[bool]], + "obstructed": Sequence[Optional[bool]], + }) + +PingDropDict = TypedDict( + "PingDropDict", { + "total_ping_drop": float, + "count_full_ping_drop": int, + "count_obstructed": int, + "total_obstructed_ping_drop": float, + "count_full_obstructed_ping_drop": int, + "count_unscheduled": int, + "total_unscheduled_ping_drop": float, + "count_full_unscheduled_ping_drop": int, + }) + +PingDropRlDict = TypedDict( + "PingDropRlDict", { + "init_run_fragment": int, + "final_run_fragment": int, + "run_seconds[1,]": Sequence[int], + "run_minutes[1,]": Sequence[int], + }) + +PingLatencyDict = TypedDict( + "PingLatencyDict", { + "mean_all_ping_latency": float, + "deciles_all_ping_latency[]": Sequence[float], + "mean_full_ping_latency": float, + "deciles_full_ping_latency[]": Sequence[float], + "stdev_full_ping_latency": float, + }) + +LoadedLatencyDict = TypedDict( + "LoadedLatencyDict", { + "load_bucket_samples[]": Sequence[int], + "load_bucket_min_latency[]": Sequence[Optional[float]], + "load_bucket_median_latency[]": Sequence[Optional[float]], + "load_bucket_max_latency[]": Sequence[Optional[float]], + }) + +UsageDict = TypedDict("UsageDict", {"download_usage": int, "upload_usage": int}) + +# For legacy reasons, there is a slight difference between the field names +# returned in the actual data vs the *_field_names functions. This is a map of +# the differences. Bulk data fields are handled separately because the field +# "snr" overlaps with a status field and needs to map differently. +_FIELD_NAME_MAP = { + "wedges_fraction_obstructed[]": "wedges_fraction_obstructed[12]", + "raw_wedges_fraction_obstructed[]": "raw_wedges_fraction_obstructed[12]", + "run_seconds[1,]": "run_seconds[1,61]", + "run_minutes[1,]": "run_minutes[1,61]", + "deciles_all_ping_latency[]": "deciles_all_ping_latency[11]", + "deciles_full_ping_latency[]": "deciles_full_ping_latency[11]", + "load_bucket_samples[]": "load_bucket_samples[15]", + "load_bucket_min_latency[]": "load_bucket_min_latency[15]", + "load_bucket_median_latency[]": "load_bucket_median_latency[15]", + "load_bucket_max_latency[]": "load_bucket_max_latency[15]", +} + + +def _field_names(hint_type): + return list(_FIELD_NAME_MAP.get(key, key) for key in get_type_hints(hint_type)) + + +def _field_names_bulk(hint_type): + return list(key + "[]" for key in get_type_hints(hint_type)) + + +def _field_types(hint_type): + def xlate(value): + while not isinstance(value, type): + args = get_args(value) + value = args[0] if args[0] is not type(None) else args[1] + return value + + return list(xlate(val) for val in get_type_hints(hint_type).values()) + def resolve_imports(channel: grpc.Channel): importer.resolve_lazy_imports(channel) @@ -467,32 +598,7 @@ def status_field_names(context: Optional[ChannelContext] = None): for field in dish_pb2.DishAlerts.DESCRIPTOR.fields: alert_names.append("alert_" + field.name) - return [ - "id", - "hardware_version", - "software_version", - "state", - "uptime", - "snr", - "seconds_to_first_nonempty_slot", - "pop_ping_drop_rate", - "downlink_throughput_bps", - "uplink_throughput_bps", - "pop_ping_latency_ms", - "alerts", - "fraction_obstructed", - "currently_obstructed", - "seconds_obstructed", - "obstruction_duration", - "obstruction_interval", - "direction_azimuth", - "direction_elevation", - "is_snr_above_noise_floor", - ], [ - "wedges_fraction_obstructed[12]", - "raw_wedges_fraction_obstructed[12]", - "valid_s", - ], alert_names + return _field_names(StatusDict), _field_names(ObstructionDict), alert_names def status_field_types(context: Optional[ChannelContext] = None): @@ -518,32 +624,8 @@ def status_field_types(context: Optional[ChannelContext] = None): call_with_channel(resolve_imports, context=context) except grpc.RpcError as e: raise GrpcError(e) - return [ - str, # id - str, # hardware_version - str, # software_version - str, # state - int, # uptime - float, # snr - float, # seconds_to_first_nonempty_slot - float, # pop_ping_drop_rate - float, # downlink_throughput_bps - float, # uplink_throughput_bps - float, # pop_ping_latency_ms - int, # alerts - float, # fraction_obstructed - bool, # currently_obstructed - float, # seconds_obstructed - float, # obstruction_duration - float, # obstruction_interval - float, # direction_azimuth - float, # direction_elevation - bool, # is_snr_above_noise_floor - ], [ - float, # wedges_fraction_obstructed[] - float, # raw_wedges_fraction_obstructed[] - float, # valid_s - ], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields) + return (_field_types(StatusDict), _field_types(ObstructionDict), + [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)) def get_status(context: Optional[ChannelContext] = None): @@ -589,7 +671,8 @@ def get_id(context: Optional[ChannelContext] = None) -> str: raise GrpcError(e) -def status_data(context: Optional[ChannelContext] = None): +def status_data( + context: Optional[ChannelContext] = None) -> Tuple[StatusDict, ObstructionDict, AlertDict]: """Fetch current status data. Args: @@ -675,18 +758,7 @@ def history_bulk_field_names(): A tuple with 2 lists, the first with general data names, the second with bulk history data names. """ - return [ - "samples", - "end_counter", - ], [ - "pop_ping_drop_rate[]", - "pop_ping_latency_ms[]", - "downlink_throughput_bps[]", - "uplink_throughput_bps[]", - "snr[]", - "scheduled[]", - "obstructed[]", - ] + return _field_names(HistGeneralDict), _field_names_bulk(HistBulkDict) def history_bulk_field_types(): @@ -699,18 +771,7 @@ def history_bulk_field_types(): A tuple with 2 lists, the first with general data types, the second with bulk history data types. """ - return [ - int, # samples - int, # end_counter - ], [ - float, # pop_ping_drop_rate[] - float, # pop_ping_latency_ms[] - float, # downlink_throughput_bps[] - float, # uplink_throughput_bps[] - float, # snr[] - bool, # scheduled[] - bool, # obstructed[] - ] + return _field_types(HistGeneralDict), _field_types(HistBulkDict) def history_ping_field_names(): @@ -734,38 +795,8 @@ def history_stats_field_names(): additional data groups, so it not recommended for the caller to assume exactly 6 elements. """ - return [ - "samples", - "end_counter", - ], [ - "total_ping_drop", - "count_full_ping_drop", - "count_obstructed", - "total_obstructed_ping_drop", - "count_full_obstructed_ping_drop", - "count_unscheduled", - "total_unscheduled_ping_drop", - "count_full_unscheduled_ping_drop", - ], [ - "init_run_fragment", - "final_run_fragment", - "run_seconds[1,61]", - "run_minutes[1,61]", - ], [ - "mean_all_ping_latency", - "deciles_all_ping_latency[11]", - "mean_full_ping_latency", - "deciles_full_ping_latency[11]", - "stdev_full_ping_latency", - ], [ - "load_bucket_samples[15]", - "load_bucket_min_latency[15]", - "load_bucket_median_latency[15]", - "load_bucket_max_latency[15]", - ], [ - "download_usage", - "upload_usage", - ] + return (_field_names(HistGeneralDict), _field_names(PingDropDict), _field_names(PingDropRlDict), + _field_names(PingLatencyDict), _field_names(LoadedLatencyDict), _field_names(UsageDict)) def history_stats_field_types(): @@ -784,38 +815,8 @@ def history_stats_field_types(): additional data groups, so it not recommended for the caller to assume exactly 6 elements. """ - return [ - int, # samples - int, # end_counter - ], [ - float, # total_ping_drop - int, # count_full_ping_drop - int, # count_obstructed - float, # total_obstructed_ping_drop - int, # count_full_obstructed_ping_drop - int, # count_unscheduled - float, # total_unscheduled_ping_drop - int, # count_full_unscheduled_ping_drop - ], [ - int, # init_run_fragment - int, # final_run_fragment - int, # run_seconds[] - int, # run_minutes[] - ], [ - float, # mean_all_ping_latency - float, # deciles_all_ping_latency[] - float, # mean_full_ping_latency - float, # deciles_full_ping_latency[] - float, # stdev_full_ping_latency - ], [ - int, # load_bucket_samples[] - float, # load_bucket_min_latency[] - float, # load_bucket_median_latency[] - float, # load_bucket_max_latency[] - ], [ - int, # download_usage - int, # upload_usage - ] + return (_field_types(HistGeneralDict), _field_types(PingDropDict), _field_types(PingDropRlDict), + _field_types(PingLatencyDict), _field_types(LoadedLatencyDict), _field_types(UsageDict)) def get_history(context: Optional[ChannelContext] = None): @@ -946,7 +947,11 @@ def concatenate_history(history1, history2, samples1: int = -1, start1: Optional return unwrapped -def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): +def history_bulk_data(parse_samples: int, + start: Optional[int] = None, + verbose: bool = False, + context: Optional[ChannelContext] = None, + history=None) -> Tuple[HistGeneralDict, HistBulkDict]: """Fetch history data for a range of samples. Args: @@ -1020,12 +1025,22 @@ def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: } -def history_ping_stats(parse_samples: int, verbose: bool = False, context: Optional[ChannelContext] = None): +def history_ping_stats(parse_samples: int, + verbose: bool = False, + context: Optional[ChannelContext] = None + ) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict]: """Deprecated. Use history_stats instead.""" return history_stats(parse_samples, verbose=verbose, context=context)[0:3] -def history_stats(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): +def history_stats( + parse_samples: int, + start: Optional[int] = None, + verbose: bool = False, + context: Optional[ChannelContext] = None, + history=None +) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict, PingLatencyDict, LoadedLatencyDict, + UsageDict]: """Fetch, parse, and compute ping and usage stats. Note: