Merge pull request #55 from boswelja/channel-context-type-hints
Introduce some type hints
This commit is contained in:
commit
c961ec90d0
1 changed files with 27 additions and 26 deletions
|
@ -343,6 +343,7 @@ period.
|
|||
from itertools import chain
|
||||
import math
|
||||
import statistics
|
||||
from typing import Optional, Tuple
|
||||
|
||||
import grpc
|
||||
|
||||
|
@ -365,7 +366,7 @@ HISTORY_FIELDS = ("pop_ping_drop_rate", "pop_ping_latency_ms", "downlink_through
|
|||
"uplink_throughput_bps")
|
||||
|
||||
|
||||
def resolve_imports(channel):
|
||||
def resolve_imports(channel: grpc.Channel):
|
||||
importer.resolve_lazy_imports(channel)
|
||||
global imports_pending
|
||||
imports_pending = False
|
||||
|
@ -395,24 +396,24 @@ class ChannelContext:
|
|||
`close()` should be called on the object when it is no longer
|
||||
in use.
|
||||
"""
|
||||
def __init__(self, target=None):
|
||||
def __init__(self, target: Optional[str] = None) -> None:
|
||||
self.channel = None
|
||||
self.target = "192.168.100.1:9200" if target is None else target
|
||||
|
||||
def get_channel(self):
|
||||
def get_channel(self) -> Tuple[grpc.Channel, bool]:
|
||||
reused = True
|
||||
if self.channel is None:
|
||||
self.channel = grpc.insecure_channel(self.target)
|
||||
reused = False
|
||||
return self.channel, reused
|
||||
|
||||
def close(self):
|
||||
def close(self) -> None:
|
||||
if self.channel is not None:
|
||||
self.channel.close()
|
||||
self.channel = None
|
||||
|
||||
|
||||
def call_with_channel(function, *args, context=None, **kwargs):
|
||||
def call_with_channel(function, *args, context: Optional[ChannelContext] = None, **kwargs):
|
||||
"""Call a function with a channel object.
|
||||
|
||||
Args:
|
||||
|
@ -436,7 +437,7 @@ def call_with_channel(function, *args, context=None, **kwargs):
|
|||
raise
|
||||
|
||||
|
||||
def status_field_names(context=None):
|
||||
def status_field_names(context: Optional[ChannelContext] = None):
|
||||
"""Return the field names of the status data.
|
||||
|
||||
Note:
|
||||
|
@ -490,7 +491,7 @@ def status_field_names(context=None):
|
|||
], alert_names
|
||||
|
||||
|
||||
def status_field_types(context=None):
|
||||
def status_field_types(context: Optional[ChannelContext] = None):
|
||||
"""Return the field types of the status data.
|
||||
|
||||
Return the type classes for each field. For sequence types, the type of
|
||||
|
@ -540,7 +541,7 @@ def status_field_types(context=None):
|
|||
], [bool] * len(dish_pb2.DishAlerts.DESCRIPTOR.fields)
|
||||
|
||||
|
||||
def get_status(context=None):
|
||||
def get_status(context: Optional[ChannelContext] = None):
|
||||
"""Fetch status data and return it in grpc structure format.
|
||||
|
||||
Args:
|
||||
|
@ -562,7 +563,7 @@ def get_status(context=None):
|
|||
return call_with_channel(grpc_call, context=context)
|
||||
|
||||
|
||||
def get_id(context=None):
|
||||
def get_id(context: Optional[ChannelContext] = None):
|
||||
"""Return the ID from the dish status information.
|
||||
|
||||
Args:
|
||||
|
@ -583,7 +584,7 @@ def get_id(context=None):
|
|||
raise GrpcError(e)
|
||||
|
||||
|
||||
def status_data(context=None):
|
||||
def status_data(context: Optional[ChannelContext] = None):
|
||||
"""Fetch current status data.
|
||||
|
||||
Args:
|
||||
|
@ -811,7 +812,7 @@ def history_stats_field_types():
|
|||
]
|
||||
|
||||
|
||||
def get_history(context=None):
|
||||
def get_history(context: Optional[ChannelContext] = None):
|
||||
"""Fetch history data and return it in grpc structure format.
|
||||
|
||||
Args:
|
||||
|
@ -823,7 +824,7 @@ def get_history(context=None):
|
|||
Raises:
|
||||
grpc.RpcError: Communication or service error.
|
||||
"""
|
||||
def grpc_call(channel):
|
||||
def grpc_call(channel: grpc.Channel):
|
||||
if imports_pending:
|
||||
resolve_imports(channel)
|
||||
stub = device_pb2_grpc.DeviceStub(channel)
|
||||
|
@ -833,7 +834,7 @@ def get_history(context=None):
|
|||
return call_with_channel(grpc_call, context=context)
|
||||
|
||||
|
||||
def _compute_sample_range(history, parse_samples, start=None, verbose=False):
|
||||
def _compute_sample_range(history, parse_samples: int, start: Optional[int] = None, verbose: bool = False):
|
||||
current = int(history.current)
|
||||
samples = len(history.pop_ping_drop_rate)
|
||||
|
||||
|
@ -880,7 +881,7 @@ def _compute_sample_range(history, parse_samples, start=None, verbose=False):
|
|||
return sample_range, current - start, current
|
||||
|
||||
|
||||
def concatenate_history(history1, history2, samples1=-1, start1=None, verbose=False):
|
||||
def concatenate_history(history1, history2, samples1: int = -1, start1: Optional[int] = None, verbose: bool = False):
|
||||
"""Append the sample-dependent fields of one history object to another.
|
||||
|
||||
Note:
|
||||
|
@ -939,7 +940,7 @@ def concatenate_history(history1, history2, samples1=-1, start1=None, verbose=Fa
|
|||
return unwrapped
|
||||
|
||||
|
||||
def history_bulk_data(parse_samples, start=None, verbose=False, context=None, history=None):
|
||||
def history_bulk_data(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None):
|
||||
"""Fetch history data for a range of samples.
|
||||
|
||||
Args:
|
||||
|
@ -1013,12 +1014,12 @@ def history_bulk_data(parse_samples, start=None, verbose=False, context=None, hi
|
|||
}
|
||||
|
||||
|
||||
def history_ping_stats(parse_samples, verbose=False, context=None):
|
||||
def history_ping_stats(parse_samples: int, verbose: bool = False, context: Optional[ChannelContext] = None):
|
||||
"""Deprecated. Use history_stats instead."""
|
||||
return history_stats(parse_samples, verbose=verbose, context=context)[0:3]
|
||||
|
||||
|
||||
def history_stats(parse_samples, start=None, verbose=False, context=None, history=None):
|
||||
def history_stats(parse_samples: int, start: Optional[int] = None, verbose: bool = False, context: Optional[ChannelContext] = None, history=None):
|
||||
"""Fetch, parse, and compute ping and usage stats.
|
||||
|
||||
Note:
|
||||
|
@ -1204,7 +1205,7 @@ def history_stats(parse_samples, start=None, verbose=False, context=None, histor
|
|||
}
|
||||
|
||||
|
||||
def get_obstruction_map(context=None):
|
||||
def get_obstruction_map(context: Optional[ChannelContext] = None):
|
||||
"""Fetch obstruction map data and return it in grpc structure format.
|
||||
|
||||
Args:
|
||||
|
@ -1216,7 +1217,7 @@ def get_obstruction_map(context=None):
|
|||
Raises:
|
||||
grpc.RpcError: Communication or service error.
|
||||
"""
|
||||
def grpc_call(channel):
|
||||
def grpc_call(channel: grpc.Channel):
|
||||
if imports_pending:
|
||||
resolve_imports(channel)
|
||||
stub = device_pb2_grpc.DeviceStub(channel)
|
||||
|
@ -1227,7 +1228,7 @@ def get_obstruction_map(context=None):
|
|||
return call_with_channel(grpc_call, context=context)
|
||||
|
||||
|
||||
def obstruction_map(context=None):
|
||||
def obstruction_map(context: Optional[ChannelContext] = None):
|
||||
"""Fetch current obstruction map data.
|
||||
|
||||
Args:
|
||||
|
@ -1252,7 +1253,7 @@ def obstruction_map(context=None):
|
|||
return tuple((map_data.snr[i:i + cols]) for i in range(0, cols * map_data.num_rows, cols))
|
||||
|
||||
|
||||
def reboot(context=None):
|
||||
def reboot(context: Optional[ChannelContext] = None) -> None:
|
||||
"""Request dish reboot operation.
|
||||
|
||||
Args:
|
||||
|
@ -1264,13 +1265,13 @@ def reboot(context=None):
|
|||
Raises:
|
||||
GrpcError: Communication or service error.
|
||||
"""
|
||||
def grpc_call(channel):
|
||||
def grpc_call(channel: grpc.Channel) -> None:
|
||||
if imports_pending:
|
||||
resolve_imports(channel)
|
||||
stub = device_pb2_grpc.DeviceStub(channel)
|
||||
stub.Handle(device_pb2.Request(reboot={}), timeout=REQUEST_TIMEOUT)
|
||||
# response is empty message in this case, so just ignore it
|
||||
return 0
|
||||
return None
|
||||
|
||||
try:
|
||||
call_with_channel(grpc_call, context=context)
|
||||
|
@ -1278,7 +1279,7 @@ def reboot(context=None):
|
|||
raise GrpcError(e)
|
||||
|
||||
|
||||
def set_stow_state(unstow=False, context=None):
|
||||
def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = None) -> None:
|
||||
"""Request dish stow or unstow operation.
|
||||
|
||||
Args:
|
||||
|
@ -1292,13 +1293,13 @@ def set_stow_state(unstow=False, context=None):
|
|||
Raises:
|
||||
GrpcError: Communication or service error.
|
||||
"""
|
||||
def grpc_call(channel):
|
||||
def grpc_call(channel: grpc.Channel) -> None:
|
||||
if imports_pending:
|
||||
resolve_imports(channel)
|
||||
stub = device_pb2_grpc.DeviceStub(channel)
|
||||
stub.Handle(device_pb2.Request(dish_stow={"unstow": unstow}), timeout=REQUEST_TIMEOUT)
|
||||
# response is empty message in this case, so just ignore it
|
||||
return 0
|
||||
return None
|
||||
|
||||
try:
|
||||
call_with_channel(grpc_call, context=context)
|
||||
|
|
Loading…
Reference in a new issue