diff --git a/starlink_grpc.py b/starlink_grpc.py index c3a24ab..46950e3 100644 --- a/starlink_grpc.py +++ b/starlink_grpc.py @@ -343,6 +343,7 @@ period. from itertools import chain import math import statistics +from typing import Optional, Tuple import grpc @@ -365,7 +366,7 @@ HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_through "uplink_throughput_bps") -def resolve_imports(channel): +def resolve_imports(channel: grpc.Channel): importer.resolve_lazy_imports(channel) global imports_pending imports_pending = False @@ -395,24 +396,24 @@ class ChannelContext: `close()` should be called on the object when it is no longer in use. """ - def __init__(self, target=None): + def __init__(self, target: Optional[str] = None) -> None: self.channel = None self.target = "192.168.100.1:9200" if target is None else target - def get_channel(self): + def get_channel(self) -> Tuple[grpc.Channel, bool]: reused = True if self.channel is None: self.channel = grpc.insecure_channel(self.target) reused = False return self.channel, reused - def close(self): + def close(self) -> None: if self.channel is not None: self.channel.close() self.channel = None -def call_with_channel(function, *args, context=None, **kwargs): +def call_with_channel(function, *args, context: Optional[ChannelContext] = None, **kwargs): """Call a function with a channel object. Args: @@ -436,7 +437,7 @@ def call_with_channel(function, *args, context=None, **kwargs): raise -def status_field_names(context=None): +def status_field_names(context: Optional[ChannelContext] = None): """Return the field names of the status data. Note: @@ -490,7 +491,7 @@ def status_field_names(context=None): ], alert_names -def status_field_types(context=None): +def status_field_types(context: Optional[ChannelContext] = None): """Return the field types of the status data. Return the type classes for each field. For sequence types, the type of @@ -540,7 +541,7 @@ def status_field_types(context=None): ], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields) -def get_status(context=None): +def get_status(context: Optional[ChannelContext] = None): """Fetch status data and return it in grpc structure format. Args: @@ -562,7 +563,7 @@ def get_status(context=None): return call_with_channel(grpc_call, context=context) -def get_id(context=None): +def get_id(context: Optional[ChannelContext] = None): """Return the ID from the dish status information. Args: @@ -583,7 +584,7 @@ def get_id(context=None): raise GrpcError(e) -def status_data(context=None): +def status_data(context: Optional[ChannelContext] = None): """Fetch current status data. Args: @@ -811,7 +812,7 @@ def history_stats_field_types(): ] -def get_history(context=None): +def get_history(context: Optional[ChannelContext] = None): """Fetch history data and return it in grpc structure format. Args: @@ -823,7 +824,7 @@ def get_history(context=None): Raises: grpc.RpcError: Communication or service error. """ - def grpc_call(channel): + def grpc_call(channel: grpc.Channel): if imports_pending: resolve_imports(channel) stub = device_pb2_grpc.DeviceStub(channel) @@ -833,7 +834,7 @@ def get_history(context=None): return call_with_channel(grpc_call, context=context) -def _compute_sample_range(history, parse_samples, start=None, verbose=False): +def _compute_sample_range(history, parse_samples: int, start: Optional[int] = None, verbose: bool = False): current = int(history.current) samples = len(history.pop_ping_drop_rate) @@ -880,7 +881,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False): return sample_range, current - start, current -def concatenate_history(history1, history2, samples1=-1, start1=None, verbose=False): +def concatenate_history(history1, history2, samples1: int = -1, start1: Optional[int] = None, verbose: bool = False): """Append the sample-dependent fields of one history object to another. Note: @@ -939,7 +940,7 @@ def concatenate_history(history1, history2, samples1=-1, start1=None, verbose=Fa return unwrapped -def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None): +def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): """Fetch history data for a range of samples. Args: @@ -1013,12 +1014,12 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None, hi } -def history_ping_stats(parse_samples, verbose=False, context=None): +def history_ping_stats(parse_samples: int, verbose: bool = False, context: Optional[ChannelContext] = None): """Deprecated. Use history_stats instead.""" return history_stats(parse_samples, verbose=verbose, context=context)[0:3] -def history_stats(parse_samples, start=None, verbose=False, context=None, history=None): +def history_stats(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None): """Fetch, parse, and compute ping and usage stats. Note: @@ -1204,7 +1205,7 @@ def history_stats(parse_samples, start=None, verbose=False, context=None, histor } -def get_obstruction_map(context=None): +def get_obstruction_map(context: Optional[ChannelContext] = None): """Fetch obstruction map data and return it in grpc structure format. Args: @@ -1216,7 +1217,7 @@ def get_obstruction_map(context=None): Raises: grpc.RpcError: Communication or service error. """ - def grpc_call(channel): + def grpc_call(channel: grpc.Channel): if imports_pending: resolve_imports(channel) stub = device_pb2_grpc.DeviceStub(channel) @@ -1227,7 +1228,7 @@ def get_obstruction_map(context=None): return call_with_channel(grpc_call, context=context) -def obstruction_map(context=None): +def obstruction_map(context: Optional[ChannelContext] = None): """Fetch current obstruction map data. Args: @@ -1252,7 +1253,7 @@ def obstruction_map(context=None): return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols)) -def reboot(context=None): +def reboot(context: Optional[ChannelContext] = None) -> None: """Request dish reboot operation. Args: @@ -1264,13 +1265,13 @@ def reboot(context=None): Raises: GrpcError: Communication or service error. """ - def grpc_call(channel): + def grpc_call(channel: grpc.Channel) -> None: if imports_pending: resolve_imports(channel) stub = device_pb2_grpc.DeviceStub(channel) stub.Handle(device_pb2.Request(reboot={}), timeout=REQUEST_TIMEOUT) # response is empty message in this case, so just ignore it - return 0 + return None try: call_with_channel(grpc_call, context=context) @@ -1278,7 +1279,7 @@ def reboot(context=None): raise GrpcError(e) -def set_stow_state(unstow=False, context=None): +def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = None) -> None: """Request dish stow or unstow operation. Args: @@ -1292,13 +1293,13 @@ def set_stow_state(unstow=False, context=None): Raises: GrpcError: Communication or service error. """ - def grpc_call(channel): + def grpc_call(channel: grpc.Channel) -> None: if imports_pending: resolve_imports(channel) stub = device_pb2_grpc.DeviceStub(channel) stub.Handle(device_pb2.Request(dish_stow={"unstow": unstow}), timeout=REQUEST_TIMEOUT) # response is empty message in this case, so just ignore it - return 0 + return None try: call_with_channel(grpc_call, context=context)