Merge pull request #62 from sparky8512/return-type-hints-2

Type hints for status and history return values
This commit is contained in:
sparky8512 2022-09-09 11:13:34 -07:00 committed by GitHub
commit f759e4cc3b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -346,7 +346,18 @@ period.
from itertools import chain
import math
import statistics
from typing import Optional, Tuple
from typing import Any, Dict, Optional, Sequence, Tuple, get_type_hints
try:
from typing import TypedDict, get_args
except ImportError:
# Python 3.7 does not have TypedDict, so fake it so the run time still
# works, even though static type checker probably will not.
def TypedDict(name, types): # pylint: disable=invalid-name
return type(name, (dict,), {"__annotations__": types})
def get_args(tp: Any) -> Tuple[Any, ...]:
return tp.__args__
import grpc
@ -368,6 +379,126 @@ REQUEST_TIMEOUT = 10
HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_throughput_bps",
"uplink_throughput_bps")
StatusDict = TypedDict(
"StatusDict", {
"id": str,
"hardware_version": str,
"software_version": str,
"state": str,
"uptime": int,
"snr": Optional[float],
"seconds_to_first_nonempty_slot": float,
"pop_ping_drop_rate": float,
"downlink_throughput_bps": float,
"uplink_throughput_bps": float,
"pop_ping_latency_ms": float,
"alerts": int,
"fraction_obstructed": float,
"currently_obstructed": bool,
"seconds_obstructed": Optional[float],
"obstruction_duration": Optional[float],
"obstruction_interval": Optional[float],
"direction_azimuth": float,
"direction_elevation": float,
"is_snr_above_noise_floor": bool,
})
ObstructionDict = TypedDict(
"ObstructionDict", {
"wedges_fraction_obstructed[]": Sequence[float],
"raw_wedges_fraction_obstructed[]": Sequence[float],
"valid_s": float,
})
AlertDict = Dict[str, bool]
HistGeneralDict = TypedDict("HistGeneralDict", {"samples": int, "end_counter": int})
HistBulkDict = TypedDict(
"HistBulkDict", {
"pop_ping_drop_rate": Sequence[float],
"pop_ping_latency_ms": Sequence[float],
"downlink_throughput_bps": Sequence[float],
"uplink_throughput_bps": Sequence[float],
"snr": Sequence[Optional[float]],
"scheduled": Sequence[Optional[bool]],
"obstructed": Sequence[Optional[bool]],
})
PingDropDict = TypedDict(
"PingDropDict", {
"total_ping_drop": float,
"count_full_ping_drop": int,
"count_obstructed": int,
"total_obstructed_ping_drop": float,
"count_full_obstructed_ping_drop": int,
"count_unscheduled": int,
"total_unscheduled_ping_drop": float,
"count_full_unscheduled_ping_drop": int,
})
PingDropRlDict = TypedDict(
"PingDropRlDict", {
"init_run_fragment": int,
"final_run_fragment": int,
"run_seconds[1,]": Sequence[int],
"run_minutes[1,]": Sequence[int],
})
PingLatencyDict = TypedDict(
"PingLatencyDict", {
"mean_all_ping_latency": float,
"deciles_all_ping_latency[]": Sequence[float],
"mean_full_ping_latency": float,
"deciles_full_ping_latency[]": Sequence[float],
"stdev_full_ping_latency": float,
})
LoadedLatencyDict = TypedDict(
"LoadedLatencyDict", {
"load_bucket_samples[]": Sequence[int],
"load_bucket_min_latency[]": Sequence[Optional[float]],
"load_bucket_median_latency[]": Sequence[Optional[float]],
"load_bucket_max_latency[]": Sequence[Optional[float]],
})
UsageDict = TypedDict("UsageDict", {"download_usage": int, "upload_usage": int})
# For legacy reasons, there is a slight difference between the field names
# returned in the actual data vs the *_field_names functions. This is a map of
# the differences. Bulk data fields are handled separately because the field
# "snr" overlaps with a status field and needs to map differently.
_FIELD_NAME_MAP = {
"wedges_fraction_obstructed[]": "wedges_fraction_obstructed[12]",
"raw_wedges_fraction_obstructed[]": "raw_wedges_fraction_obstructed[12]",
"run_seconds[1,]": "run_seconds[1,61]",
"run_minutes[1,]": "run_minutes[1,61]",
"deciles_all_ping_latency[]": "deciles_all_ping_latency[11]",
"deciles_full_ping_latency[]": "deciles_full_ping_latency[11]",
"load_bucket_samples[]": "load_bucket_samples[15]",
"load_bucket_min_latency[]": "load_bucket_min_latency[15]",
"load_bucket_median_latency[]": "load_bucket_median_latency[15]",
"load_bucket_max_latency[]": "load_bucket_max_latency[15]",
}
def _field_names(hint_type):
return list(_FIELD_NAME_MAP.get(key, key) for key in get_type_hints(hint_type))
def _field_names_bulk(hint_type):
return list(key + "[]" for key in get_type_hints(hint_type))
def _field_types(hint_type):
def xlate(value):
while not isinstance(value, type):
args = get_args(value)
value = args[0] if args[0] is not type(None) else args[1]
return value
return list(xlate(val) for val in get_type_hints(hint_type).values())
def resolve_imports(channel: grpc.Channel):
importer.resolve_lazy_imports(channel)
@ -467,32 +598,7 @@ def status_field_names(context: Optional[ChannelContext] = None):
for field in dish_pb2.DishAlerts.DESCRIPTOR.fields:
alert_names.append("alert_" + field.name)
return [
"id",
"hardware_version",
"software_version",
"state",
"uptime",
"snr",
"seconds_to_first_nonempty_slot",
"pop_ping_drop_rate",
"downlink_throughput_bps",
"uplink_throughput_bps",
"pop_ping_latency_ms",
"alerts",
"fraction_obstructed",
"currently_obstructed",
"seconds_obstructed",
"obstruction_duration",
"obstruction_interval",
"direction_azimuth",
"direction_elevation",
"is_snr_above_noise_floor",
], [
"wedges_fraction_obstructed[12]",
"raw_wedges_fraction_obstructed[12]",
"valid_s",
], alert_names
return _field_names(StatusDict), _field_names(ObstructionDict), alert_names
def status_field_types(context: Optional[ChannelContext] = None):
@ -518,32 +624,8 @@ def status_field_types(context: Optional[ChannelContext] = None):
call_with_channel(resolve_imports, context=context)
except grpc.RpcError as e:
raise GrpcError(e)
return [
str, # id
str, # hardware_version
str, # software_version
str, # state
int, # uptime
float, # snr
float, # seconds_to_first_nonempty_slot
float, # pop_ping_drop_rate
float, # downlink_throughput_bps
float, # uplink_throughput_bps
float, # pop_ping_latency_ms
int, # alerts
float, # fraction_obstructed
bool, # currently_obstructed
float, # seconds_obstructed
float, # obstruction_duration
float, # obstruction_interval
float, # direction_azimuth
float, # direction_elevation
bool, # is_snr_above_noise_floor
], [
float, # wedges_fraction_obstructed[]
float, # raw_wedges_fraction_obstructed[]
float, # valid_s
], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)
return (_field_types(StatusDict), _field_types(ObstructionDict),
[bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields))
def get_status(context: Optional[ChannelContext] = None):
@ -589,7 +671,8 @@ def get_id(context: Optional[ChannelContext] = None) -> str:
raise GrpcError(e)
def status_data(context: Optional[ChannelContext] = None):
def status_data(
context: Optional[ChannelContext] = None) -> Tuple[StatusDict, ObstructionDict, AlertDict]:
"""Fetch current status data.
Args:
@ -675,18 +758,7 @@ def history_bulk_field_names():
A tuple with 2 lists, the first with general data names, the second
with bulk history data names.
"""
return [
"samples",
"end_counter",
], [
"pop_ping_drop_rate[]",
"pop_ping_latency_ms[]",
"downlink_throughput_bps[]",
"uplink_throughput_bps[]",
"snr[]",
"scheduled[]",
"obstructed[]",
]
return _field_names(HistGeneralDict), _field_names_bulk(HistBulkDict)
def history_bulk_field_types():
@ -699,18 +771,7 @@ def history_bulk_field_types():
A tuple with 2 lists, the first with general data types, the second
with bulk history data types.
"""
return [
int, # samples
int, # end_counter
], [
float, # pop_ping_drop_rate[]
float, # pop_ping_latency_ms[]
float, # downlink_throughput_bps[]
float, # uplink_throughput_bps[]
float, # snr[]
bool, # scheduled[]
bool, # obstructed[]
]
return _field_types(HistGeneralDict), _field_types(HistBulkDict)
def history_ping_field_names():
@ -734,38 +795,8 @@ def history_stats_field_names():
additional data groups, so it not recommended for the caller to
assume exactly 6 elements.
"""
return [
"samples",
"end_counter",
], [
"total_ping_drop",
"count_full_ping_drop",
"count_obstructed",
"total_obstructed_ping_drop",
"count_full_obstructed_ping_drop",
"count_unscheduled",
"total_unscheduled_ping_drop",
"count_full_unscheduled_ping_drop",
], [
"init_run_fragment",
"final_run_fragment",
"run_seconds[1,61]",
"run_minutes[1,61]",
], [
"mean_all_ping_latency",
"deciles_all_ping_latency[11]",
"mean_full_ping_latency",
"deciles_full_ping_latency[11]",
"stdev_full_ping_latency",
], [
"load_bucket_samples[15]",
"load_bucket_min_latency[15]",
"load_bucket_median_latency[15]",
"load_bucket_max_latency[15]",
], [
"download_usage",
"upload_usage",
]
return (_field_names(HistGeneralDict), _field_names(PingDropDict), _field_names(PingDropRlDict),
_field_names(PingLatencyDict), _field_names(LoadedLatencyDict), _field_names(UsageDict))
def history_stats_field_types():
@ -784,38 +815,8 @@ def history_stats_field_types():
additional data groups, so it not recommended for the caller to
assume exactly 6 elements.
"""
return [
int, # samples
int, # end_counter
], [
float, # total_ping_drop
int, # count_full_ping_drop
int, # count_obstructed
float, # total_obstructed_ping_drop
int, # count_full_obstructed_ping_drop
int, # count_unscheduled
float, # total_unscheduled_ping_drop
int, # count_full_unscheduled_ping_drop
], [
int, # init_run_fragment
int, # final_run_fragment
int, # run_seconds[]
int, # run_minutes[]
], [
float, # mean_all_ping_latency
float, # deciles_all_ping_latency[]
float, # mean_full_ping_latency
float, # deciles_full_ping_latency[]
float, # stdev_full_ping_latency
], [
int, # load_bucket_samples[]
float, # load_bucket_min_latency[]
float, # load_bucket_median_latency[]
float, # load_bucket_max_latency[]
], [
int, # download_usage
int, # upload_usage
]
return (_field_types(HistGeneralDict), _field_types(PingDropDict), _field_types(PingDropRlDict),
_field_types(PingLatencyDict), _field_types(LoadedLatencyDict), _field_types(UsageDict))
def get_history(context: Optional[ChannelContext] = None):
@ -946,7 +947,11 @@ def concatenate_history(history1, history2, samples1: int = -1, start1: Optional
return unwrapped
def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None):
def history_bulk_data(parse_samples: int,
start: Optional[int] = None,
verbose: bool = False,
context: Optional[ChannelContext] = None,
history=None) -> Tuple[HistGeneralDict, HistBulkDict]:
"""Fetch history data for a range of samples.
Args:
@ -1020,12 +1025,22 @@ def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose:
}
def history_ping_stats(parse_samples: int, verbose: bool = False, context: Optional[ChannelContext] = None):
def history_ping_stats(parse_samples: int,
verbose: bool = False,
context: Optional[ChannelContext] = None
) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict]:
"""Deprecated. Use history_stats instead."""
return history_stats(parse_samples, verbose=verbose, context=context)[0:3]
def history_stats(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None):
def history_stats(
parse_samples: int,
start: Optional[int] = None,
verbose: bool = False,
context: Optional[ChannelContext] = None,
history=None
) -> Tuple[HistGeneralDict, PingDropDict, PingDropRlDict, PingLatencyDict, LoadedLatencyDict,
UsageDict]:
"""Fetch, parse, and compute ping and usage stats.
Note: