Add configuration of sleep mode settings

Add ability to show, set, or disable the power save (sleep) configuration in the dish_control.py script and similar function in the core module.
This commit is contained in:
sparky8512 2023-02-02 13:43:25 -08:00
parent 955e2e0b5d
commit 5cbc5b73dd
2 changed files with 104 additions and 7 deletions

View file

@ -11,13 +11,37 @@ from yagrc import reflector as yagrc_reflector
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description="Starlink user terminal state control") parser = argparse.ArgumentParser(description="Starlink user terminal state control")
parser.add_argument("command", choices=["reboot", "stow", "unstow"])
parser.add_argument("-e", parser.add_argument("-e",
"--target", "--target",
default="192.168.100.1:9200", default="192.168.100.1:9200",
help="host:port of dish to query, default is the standard IP address " help="host:port of dish to query, default is the standard IP address "
"and port (192.168.100.1:9200)") "and port (192.168.100.1:9200)")
return parser.parse_args() subs = parser.add_subparsers(dest="command", required=True)
subs.add_parser("reboot", help="Reboot the user terminal")
subs.add_parser("stow", help="Set user terminal to stow position")
subs.add_parser("unstow", help="Restore user terminal from stow position")
sleep_parser = subs.add_parser(
"set_sleep",
help="Show, set, or disable power save configuration",
description="Run without arguments to show current configuration")
sleep_parser.add_argument("start",
nargs="?",
type=int,
help="Start time in minutes past midnight UTC")
sleep_parser.add_argument("duration",
nargs="?",
type=int,
help="Duration in minutes, or 0 to disable")
opts = parser.parse_args()
if opts.command == "set_sleep" and opts.start is not None:
if opts.duration is None:
sleep_parser.error("Must specify duration if start time is specified")
if opts.start < 0 or opts.start >= 1440:
sleep_parser.error("Invalid start time, must be >= 0 and < 1440")
if opts.duration < 0 or opts.duration > 1440:
sleep_parser.error("Invalid duration, must be >= 0 and <= 1440")
return opts
def main(): def main():
@ -29,19 +53,47 @@ def main():
try: try:
with grpc.insecure_channel(opts.target) as channel: with grpc.insecure_channel(opts.target) as channel:
reflector.load_protocols(channel, symbols=["SpaceX.API.Device.Device"]) reflector.load_protocols(channel, symbols=["SpaceX.API.Device.Device"])
stub = reflector.service_stub_class("SpaceX.API.Device.Device")(channel)
request_class = reflector.message_class("SpaceX.API.Device.Request") request_class = reflector.message_class("SpaceX.API.Device.Request")
if opts.command == "reboot": if opts.command == "reboot":
request = request_class(reboot={}) request = request_class(reboot={})
elif opts.command == "stow": elif opts.command == "stow":
request = request_class(dish_stow={}) request = request_class(dish_stow={})
else: # unstow elif opts.command == "unstow":
request = request_class(dish_stow={"unstow": True}) request = request_class(dish_stow={"unstow": True})
stub = reflector.service_stub_class("SpaceX.API.Device.Device")(channel) else: # set_sleep
stub.Handle(request, timeout=10) if opts.start is None and opts.duration is None:
# response is just empty message, so ignore it request = request_class(dish_get_config={})
except grpc.RpcError as e: else:
if opts.duration:
request = request_class(
dish_power_save={
"power_save_start_minutes": opts.start,
"power_save_duration_minutes": opts.duration,
"enable_power_save": True
})
else:
# duration of 0 not allowed, even when disabled
request = request_class(dish_power_save={
"power_save_duration_minutes": 1,
"enable_power_save": False
})
response = stub.Handle(request, timeout=10)
if opts.command == "set_sleep" and opts.start is None and opts.duration is None:
config = response.dish_get_config.dish_config
if config.power_save_mode:
print("Sleep start:", config.power_save_start_minutes,
"minutes past midnight UTC")
print("Sleep duration:", config.power_save_duration_minutes, "minutes")
else:
print("Sleep disabled")
except (AttributeError, ValueError, grpc.RpcError) as e:
if isinstance(e, grpc.Call): if isinstance(e, grpc.Call):
msg = e.details() msg = e.details()
elif isinstance(e, (AttributeError, ValueError)):
msg = "Protocol error"
else: else:
msg = "Unknown communication or service error" msg = "Unknown communication or service error"
logging.error(msg) logging.error(msg)

View file

@ -1533,3 +1533,48 @@ def set_stow_state(unstow: bool = False, context: Optional[ChannelContext] = Non
call_with_channel(grpc_call, context=context) call_with_channel(grpc_call, context=context)
except (AttributeError, ValueError, grpc.RpcError) as e: except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e raise GrpcError(e) from e
def set_sleep_config(start: int,
duration: int,
enable: bool = True,
context: Optional[ChannelContext] = None) -> None:
"""Set sleep mode configuration.
Args:
start (int): Time, in minutes past midnight UTC, to start sleep mode
each day. Ignored if enable is set to False.
duration (int): Duration of sleep mode, in minutes. Ignored if enable
is set to False.
enable (bool): Whether or not to enable sleep mode.
context (ChannelContext): Optionally provide a channel for reuse
across repeated calls. If an existing channel is reused, the RPC
call will be retried at most once, since connectivity may have
been lost and restored in the time since it was last used.
Raises:
GrpcError: Communication or service error, including invalid start or
duration.
"""
if not enable:
start = 0
# duration of 0 not allowed, even when disabled
duration = 1
def grpc_call(channel: grpc.Channel) -> None:
if imports_pending:
resolve_imports(channel)
stub = device_pb2_grpc.DeviceStub(channel)
stub.Handle(device_pb2.Request(
dish_power_save={
"power_save_start_minutes": start,
"power_save_duration_minutes": duration,
"enable_power_save": enable
}),
timeout=REQUEST_TIMEOUT)
# response is empty message in this case, so just ignore it
try:
call_with_channel(grpc_call, context=context)
except (AttributeError, ValueError, grpc.RpcError) as e:
raise GrpcError(e) from e