greendeck/greendeck/main.py

341 lines
11 KiB
Python

import asyncio
import functools
import logging
import pathlib
import click
# from PIL import ImageDraw
# from PIL import ImageFont
from pydantic import parse_obj_as
from pydantic_yaml import parse_yaml_raw_as
from typing_extensions import Self
from greendeck.config import Config
from greendeck.config import HomeAssistantIconImageConfig
from greendeck.config import HomeAssistantServiceActionConfig
from greendeck.config import ImageConfig
from greendeck.config import PulseAudioActionConfig
from greendeck.config import StreamDeckConfig
from greendeck.config import TileConfig
from greendeck.lib.actions.pulseaudio import handle_pulseaudio_action
from greendeck.lib.elgato import enumerate_streamdecks
from greendeck.lib.elgato.streamdeck import StreamDeck
from greendeck.lib.homeassistant import EventResponse
from greendeck.lib.homeassistant import HomeAssistant
from greendeck.lib.images.staticiconimage import render_static_icon_image
from greendeck.lib.logging import setup_logging
from greendeck.lib.util import task_done_callback
logger = logging.getLogger(__name__)
class DeckHandler:
"""Class to manage connected StreamDeck."""
deck: StreamDeck
deck_config: StreamDeckConfig
config: Config
homeassistant: HomeAssistant
homeassistant_states: list[str | None]
key_states: list[bool | None]
def __init__(
self,
deck: StreamDeck,
deck_config: StreamDeckConfig,
config: Config,
homeassistant: HomeAssistant,
):
self.deck = deck
self.deck_config = deck_config
self.config = config
self.homeassistant = homeassistant
self.current_screen = self.deck_config.screen
self.homeassistant_states = [None] * self.deck.KEY_COUNT
self.key_states = [None] * self.deck.KEY_COUNT
def convert_key_to_position(self: Self, key: int) -> tuple[int, int]:
return self.deck.convert_key_to_position(key)
def get_screen_config(self: Self) -> list[TileConfig] | None:
return self.config.screens.get(self.current_screen)
def get_tile_config_for_key(self: Self, key: int) -> TileConfig | None:
position = self.convert_key_to_position(key)
screen = self.get_screen_config()
if screen is None:
return None
for tile_config in screen:
if position == tile_config.position:
return tile_config
return None
async def start(self: Self):
await self.deck.open()
await self.deck.reset()
serial = await self.deck.get_serial_number()
fw = await self.deck.get_firmware_version()
logger.info(
f"opened '{self.deck.DECK_TYPE}' device "
f"(serial number: '{serial}', fw: '{fw}')"
)
self.deck.set_key_callback(self.key_callback)
await self.deck.set_brightness(self.deck_config.brightness)
tasks: list[asyncio.Task] = []
for key in range(self.deck.KEY_COUNT):
tasks.append(asyncio.create_task(self.initialize_key(key)))
for task in tasks:
task.add_done_callback(task_done_callback)
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
async def initialize_key(self: Self, key: int) -> None:
await self.update_key_image(key)
await self.update_homeassistant_callbacks(key)
async def update_key_state(
self: Self, key: int, old_state: bool | None, new_state: bool
) -> None:
self.key_states[key] = new_state
if new_state is False:
t = asyncio.create_task(
self.process_key_actions(key),
name=(
f"process key actions on streamdeck "
f"{self.deck.serial_number} key {key}"
),
)
t.add_done_callback(task_done_callback)
if old_state is not None and not (old_state ^ new_state):
logger.info("no change - skipping image update")
return
t = asyncio.create_task(
self.update_key_image(key),
name=f"update image on streamdeck {self.deck.serial_number} key {key}",
)
t.add_done_callback(task_done_callback)
async def update_key_image(self: Self, key: int) -> None:
tile_config = self.get_tile_config_for_key(key)
if tile_config is None:
image = self.deck.BLANK_KEY_IMAGE
else:
match tile_config.image.type:
case "static-icon":
image_config = parse_obj_as(ImageConfig, tile_config.image)
image = await render_static_icon_image(
self.deck, self.key_states[key], image_config
)
case "homeassistant-icon":
home_assistant_icon_image_config = parse_obj_as(
HomeAssistantIconImageConfig, tile_config.image
)
image_config = home_assistant_icon_image_config.states.get(
self.homeassistant_states[key]
)
if image_config is None:
image_config = home_assistant_icon_image_config.states.get(None)
if image_config is not None:
image = await render_static_icon_image(
self.deck, self.key_states[key], image_config
)
else:
print(f"no state matched {self.homeassistant_states[key]}")
image = self.deck.BLANK_KEY_IMAGE
case _:
image = self.deck.BLANK_KEY_IMAGE
await self.deck.set_key_image(key, image)
async def key_callback(
self: Self,
deck: StreamDeck,
key: int,
old_state: bool | None,
new_state: bool,
) -> None:
assert self.deck.serial_number == deck.serial_number
position = self.convert_key_to_position(key)
logger.info(
f"deck {deck.serial_number} key {key} {position} {old_state}{new_state}",
)
await self.update_key_state(key, old_state, new_state)
async def process_key_actions(self: Self, key: int):
tile_config = self.get_tile_config_for_key(key)
if tile_config is not None:
for action_config in tile_config.actions:
match action_config.__root__.type:
case "homeassistant-service":
action_config = HomeAssistantServiceActionConfig.parse_obj(
action_config.__root__
)
await self.homeassistant.call_service(
domain=action_config.domain,
service=action_config.service,
data=action_config.data,
)
case "pulseaudio-action":
action_config = PulseAudioActionConfig.parse_obj(
action_config.__root__
)
await handle_pulseaudio_action(action_config)
case _:
logger.error(f"unimplemented {action_config.__root__.type}")
async def homeassistant_callback(
self: Self,
key: int,
event: EventResponse,
) -> None:
self.homeassistant_states[key] = event.event.data.new_state.state
t = asyncio.create_task(self.update_key_image(key))
t.add_done_callback(task_done_callback)
async def update_homeassistant_callbacks(self: Self, key: int):
tile_config = self.get_tile_config_for_key(key)
if tile_config is None:
return
match tile_config.image.type:
case "homeassistant-icon":
homeassistant_icon_image_config = parse_obj_as(
HomeAssistantIconImageConfig, tile_config.image
)
self.homeassistant.add_event_callback(
homeassistant_icon_image_config.entity_id,
functools.partial(
self.homeassistant_callback,
key,
),
)
event_response = await self.homeassistant.get_state(
homeassistant_icon_image_config.entity_id
)
if event_response is not None:
self.homeassistant_states[key] = event_response.state
await self.update_key_image(key)
async def _main(config: Config):
decks: dict[str, tuple[StreamDeck, StreamDeckConfig]] = {}
try:
streamdecks = await enumerate_streamdecks()
if len(streamdecks) == 1:
logger.info("found 1 StreamDeck")
else:
logger.info(f"found {len(streamdecks)} StreamDecks")
for deck_config in config.streamdecks:
if deck_config.serial_number is None:
continue
for deck in streamdecks:
if deck_config.serial_number == deck.serial_number:
decks[deck.serial_number] = (
deck,
deck_config,
)
for deck_config in config.streamdecks:
if deck_config.index is None:
continue
if deck_config.index >= len(streamdecks):
logger.error(f"index {deck_config.index} is out of bounds")
continue
deck = streamdecks[deck_config.index]
if deck.serial_number in decks:
logger.error(
f"deck {deck_config.index} was already configured "
f"by serial number {deck.serial_number}"
)
continue
decks[deck.serial_number] = (
deck,
deck_config,
)
homeassistant = HomeAssistant(
host=config.homeassistant.hostname,
port=config.homeassistant.port,
secure=config.homeassistant.secure,
token=config.homeassistant.token,
)
tasks = [asyncio.create_task(homeassistant.start(), name="homeassistant")]
for deck, deck_config in decks.values():
deck_handler = DeckHandler(deck, deck_config, config, homeassistant)
tasks.append(asyncio.create_task(deck_handler.start()))
for task in tasks:
task.add_done_callback(task_done_callback)
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
while True:
await asyncio.sleep(5)
except KeyboardInterrupt:
logger.info("keyboard_interrupt")
finally:
for deck, _ in decks.values():
print(deck.serial_number)
if await deck.is_open():
await deck.reset()
await deck.close()
@click.command
@click.option(
"--config",
"config_path",
type=click.Path(
exists=True,
file_okay=True,
dir_okay=False,
allow_dash=False,
path_type=pathlib.Path,
),
default="./config.yaml",
)
def main(
config_path: pathlib.Path,
):
setup_logging()
# config = Config.parse_raw(config_path.read_bytes(), proto="yaml")
config: Config = parse_yaml_raw_as(Config, config_path.read_bytes())
asyncio.run(_main(config))
if __name__ == "__main__":
main()