hat.monitor.server.ui

Implementation of web server UI

  1"""Implementation of web server UI"""
  2
  3from pathlib import Path
  4import contextlib
  5import importlib.resources
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat import json
 11from hat import juggler
 12
 13import hat.monitor.observer.server
 14
 15
 16mlog: logging.Logger = logging.getLogger(__name__)
 17"""Module logger"""
 18
 19SetRankCb: typing.TypeAlias = aio.AsyncCallable[['UiServer', int, int], None]
 20"""Set rank callback (server, cid, rank)"""
 21
 22
 23async def create(host: str,
 24                 port: int,
 25                 state: hat.monitor.observer.server.State,
 26                 *,
 27                 set_rank_cb: SetRankCb | None = None,
 28                 htpasswd: Path | None = None,
 29                 autoflush_delay: float | None = 0.2,
 30                 shutdown_timeout: float = 0.1
 31                 ) -> 'UiServer':
 32    """Create UI server
 33
 34    `autoflush_delay` and `shutdown_timeout` are passed directy to
 35    `hat.juggler.listen`.
 36
 37    """
 38    server = UiServer()
 39    server._set_rank_cb = set_rank_cb
 40    server._state = json.Storage(_state_to_json(state))
 41
 42    exit_stack = contextlib.ExitStack()
 43    try:
 44        ui_path = exit_stack.enter_context(
 45            importlib.resources.as_file(
 46                importlib.resources.files(__package__) / 'ui'))
 47
 48        server._srv = await juggler.listen(host, port,
 49                                           request_cb=server._on_request,
 50                                           static_dir=ui_path,
 51                                           htpasswd_file=htpasswd,
 52                                           autoflush_delay=autoflush_delay,
 53                                           shutdown_timeout=shutdown_timeout,
 54                                           state=server._state)
 55
 56        try:
 57            server.async_group.spawn(aio.call_on_cancel, exit_stack.close)
 58
 59        except BaseException:
 60            await aio.uncancellable(server.async_close())
 61            raise
 62
 63    except BaseException:
 64        exit_stack.close()
 65        raise
 66
 67    return server
 68
 69
 70class UiServer(aio.Resource):
 71    """UiServer
 72
 73    For creating new instance of this class see `create` coroutine.
 74
 75    """
 76
 77    @property
 78    def async_group(self) -> aio.Group:
 79        """Async group"""
 80        return self._srv.async_group
 81
 82    def set_state(self, state: hat.monitor.observer.server.State):
 83        self._state.set([], _state_to_json(state))
 84
 85    async def _on_request(self, conn, name, data):
 86        if name == 'set_rank':
 87            mlog.debug("received set_rank request")
 88            if self._set_rank_cb:
 89                await aio.call(self._set_rank_cb, self, data['cid'],
 90                               data['rank'])
 91
 92        else:
 93            raise Exception('received invalid message type')
 94
 95
 96def _state_to_json(state):
 97    return {'mid': state.mid,
 98            'local_components': list(_get_local_components(state)),
 99            'global_components': list(_get_global_components(state))}
100
101
102def _get_local_components(state):
103    for i in state.local_components:
104        yield {'cid': i.cid,
105               'name': i.name,
106               'group': i.group,
107               'data': i.data,
108               'rank': i.rank}
109
110
111def _get_global_components(state):
112    for i in state.global_components:
113        yield {'cid': i.cid,
114               'mid': i.mid,
115               'name': i.name,
116               'group': i.group,
117               'data': i.data,
118               'rank': i.rank,
119               'blessing_req': {'token': i.blessing_req.token,
120                                'timestamp': i.blessing_req.timestamp},
121               'blessing_res': {'token': i.blessing_res.token,
122                                'ready': i.blessing_res.ready}}
mlog: logging.Logger = <Logger hat.monitor.server.ui (WARNING)>

Module logger

SetRankCb: TypeAlias = Callable[[ForwardRef('UiServer'), int, int], None | Awaitable[None]]

Set rank callback (server, cid, rank)

async def create( host: str, port: int, state: hat.monitor.observer.server.State, *, set_rank_cb: Optional[Callable[[UiServer, int, int], None | Awaitable[None]]] = None, htpasswd: pathlib._local.Path | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1) -> UiServer:
24async def create(host: str,
25                 port: int,
26                 state: hat.monitor.observer.server.State,
27                 *,
28                 set_rank_cb: SetRankCb | None = None,
29                 htpasswd: Path | None = None,
30                 autoflush_delay: float | None = 0.2,
31                 shutdown_timeout: float = 0.1
32                 ) -> 'UiServer':
33    """Create UI server
34
35    `autoflush_delay` and `shutdown_timeout` are passed directy to
36    `hat.juggler.listen`.
37
38    """
39    server = UiServer()
40    server._set_rank_cb = set_rank_cb
41    server._state = json.Storage(_state_to_json(state))
42
43    exit_stack = contextlib.ExitStack()
44    try:
45        ui_path = exit_stack.enter_context(
46            importlib.resources.as_file(
47                importlib.resources.files(__package__) / 'ui'))
48
49        server._srv = await juggler.listen(host, port,
50                                           request_cb=server._on_request,
51                                           static_dir=ui_path,
52                                           htpasswd_file=htpasswd,
53                                           autoflush_delay=autoflush_delay,
54                                           shutdown_timeout=shutdown_timeout,
55                                           state=server._state)
56
57        try:
58            server.async_group.spawn(aio.call_on_cancel, exit_stack.close)
59
60        except BaseException:
61            await aio.uncancellable(server.async_close())
62            raise
63
64    except BaseException:
65        exit_stack.close()
66        raise
67
68    return server

Create UI server

autoflush_delay and shutdown_timeout are passed directy to hat.juggler.listen.

class UiServer(hat.aio.group.Resource):
71class UiServer(aio.Resource):
72    """UiServer
73
74    For creating new instance of this class see `create` coroutine.
75
76    """
77
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group
82
83    def set_state(self, state: hat.monitor.observer.server.State):
84        self._state.set([], _state_to_json(state))
85
86    async def _on_request(self, conn, name, data):
87        if name == 'set_rank':
88            mlog.debug("received set_rank request")
89            if self._set_rank_cb:
90                await aio.call(self._set_rank_cb, self, data['cid'],
91                               data['rank'])
92
93        else:
94            raise Exception('received invalid message type')

UiServer

For creating new instance of this class see create coroutine.

async_group: hat.aio.group.Group
78    @property
79    def async_group(self) -> aio.Group:
80        """Async group"""
81        return self._srv.async_group

Async group

def set_state(self, state: hat.monitor.observer.server.State):
83    def set_state(self, state: hat.monitor.observer.server.State):
84        self._state.set([], _state_to_json(state))