hat.monitor.server.ui

Implementation of web server UI

  1"""Implementation of web server UI"""
  2
  3import contextlib
  4import importlib.resources
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import json
 10from hat import juggler
 11
 12import hat.monitor.observer.server
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18SetRankCb: typing.TypeAlias = aio.AsyncCallable[['UiServer', int, int], None]
 19"""Set rank callback (server, cid, rank)"""
 20
 21
 22async def create(host: str,
 23                 port: int,
 24                 state: hat.monitor.observer.server.State,
 25                 *,
 26                 set_rank_cb: SetRankCb | None = None,
 27                 autoflush_delay: float | None = 0.2,
 28                 shutdown_timeout: float = 0.1
 29                 ) -> 'UiServer':
 30    """Create UI server
 31
 32    `autoflush_delay` and `shutdown_timeout` are passed directy to
 33    `hat.juggler.listen`.
 34
 35    """
 36    server = UiServer()
 37    server._set_rank_cb = set_rank_cb
 38    server._state = json.Storage(_state_to_json(state))
 39
 40    exit_stack = contextlib.ExitStack()
 41    try:
 42        ui_path = exit_stack.enter_context(
 43            importlib.resources.as_file(
 44                importlib.resources.files(__package__) / 'ui'))
 45
 46        server._srv = await juggler.listen(host, port,
 47                                           request_cb=server._on_request,
 48                                           static_dir=ui_path,
 49                                           autoflush_delay=autoflush_delay,
 50                                           shutdown_timeout=shutdown_timeout,
 51                                           state=server._state)
 52
 53        try:
 54            server.async_group.spawn(aio.call_on_cancel, exit_stack.close)
 55
 56        except BaseException:
 57            await aio.uncancellable(server.async_close())
 58            raise
 59
 60    except BaseException:
 61        exit_stack.close()
 62        raise
 63
 64    return server
 65
 66
 67class UiServer(aio.Resource):
 68    """UiServer
 69
 70    For creating new instance of this class see `create` coroutine.
 71
 72    """
 73
 74    @property
 75    def async_group(self) -> aio.Group:
 76        """Async group"""
 77        return self._srv.async_group
 78
 79    def set_state(self, state: hat.monitor.observer.server.State):
 80        self._state.set([], _state_to_json(state))
 81
 82    async def _on_request(self, conn, name, data):
 83        if name == 'set_rank':
 84            mlog.debug("received set_rank request")
 85            if self._set_rank_cb:
 86                await aio.call(self._set_rank_cb, self, data['cid'],
 87                               data['rank'])
 88
 89        else:
 90            raise Exception('received invalid message type')
 91
 92
 93def _state_to_json(state):
 94    return {'mid': state.mid,
 95            'local_components': list(_get_local_components(state)),
 96            'global_components': list(_get_global_components(state))}
 97
 98
 99def _get_local_components(state):
100    for i in state.local_components:
101        yield {'cid': i.cid,
102               'name': i.name,
103               'group': i.group,
104               'data': i.data,
105               'rank': i.rank}
106
107
108def _get_global_components(state):
109    for i in state.global_components:
110        yield {'cid': i.cid,
111               'mid': i.mid,
112               'name': i.name,
113               'group': i.group,
114               'data': i.data,
115               'rank': i.rank,
116               'blessing_req': {'token': i.blessing_req.token,
117                                'timestamp': i.blessing_req.timestamp},
118               'blessing_res': {'token': i.blessing_res.token,
119                                'ready': i.blessing_res.ready}}
mlog: logging.Logger = <Logger hat.monitor.server.ui (WARNING)>

Module logger

SetRankCb: TypeAlias = Callable[[ForwardRef('UiServer'), int, int], Optional[Awaitable[NoneType]]]

Set rank callback (server, cid, rank)

async def create( host: str, port: int, state: hat.monitor.observer.server.State, *, set_rank_cb: Optional[Callable[[UiServer, int, int], Optional[Awaitable[NoneType]]]] = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1) -> UiServer:
23async def create(host: str,
24                 port: int,
25                 state: hat.monitor.observer.server.State,
26                 *,
27                 set_rank_cb: SetRankCb | None = None,
28                 autoflush_delay: float | None = 0.2,
29                 shutdown_timeout: float = 0.1
30                 ) -> 'UiServer':
31    """Create UI server
32
33    `autoflush_delay` and `shutdown_timeout` are passed directy to
34    `hat.juggler.listen`.
35
36    """
37    server = UiServer()
38    server._set_rank_cb = set_rank_cb
39    server._state = json.Storage(_state_to_json(state))
40
41    exit_stack = contextlib.ExitStack()
42    try:
43        ui_path = exit_stack.enter_context(
44            importlib.resources.as_file(
45                importlib.resources.files(__package__) / 'ui'))
46
47        server._srv = await juggler.listen(host, port,
48                                           request_cb=server._on_request,
49                                           static_dir=ui_path,
50                                           autoflush_delay=autoflush_delay,
51                                           shutdown_timeout=shutdown_timeout,
52                                           state=server._state)
53
54        try:
55            server.async_group.spawn(aio.call_on_cancel, exit_stack.close)
56
57        except BaseException:
58            await aio.uncancellable(server.async_close())
59            raise
60
61    except BaseException:
62        exit_stack.close()
63        raise
64
65    return server

Create UI server

autoflush_delay and shutdown_timeout are passed directy to hat.juggler.listen.

class UiServer(hat.aio.group.Resource):
68class UiServer(aio.Resource):
69    """UiServer
70
71    For creating new instance of this class see `create` coroutine.
72
73    """
74
75    @property
76    def async_group(self) -> aio.Group:
77        """Async group"""
78        return self._srv.async_group
79
80    def set_state(self, state: hat.monitor.observer.server.State):
81        self._state.set([], _state_to_json(state))
82
83    async def _on_request(self, conn, name, data):
84        if name == 'set_rank':
85            mlog.debug("received set_rank request")
86            if self._set_rank_cb:
87                await aio.call(self._set_rank_cb, self, data['cid'],
88                               data['rank'])
89
90        else:
91            raise Exception('received invalid message type')

UiServer

For creating new instance of this class see create coroutine.

async_group: hat.aio.group.Group
75    @property
76    def async_group(self) -> aio.Group:
77        """Async group"""
78        return self._srv.async_group

Async group

def set_state(self, state: hat.monitor.observer.server.State):
80    def set_state(self, state: hat.monitor.observer.server.State):
81        self._state.set([], _state_to_json(state))
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close