hat.monitor.server.ui
Implementation of web server UI
1"""Implementation of web server UI""" 2 3from pathlib import Path 4import contextlib 5import importlib.resources 6import logging 7import typing 8 9from hat import aio 10from hat import json 11from hat import juggler 12 13import hat.monitor.observer.server 14 15 16mlog: logging.Logger = logging.getLogger(__name__) 17"""Module logger""" 18 19SetRankCb: typing.TypeAlias = aio.AsyncCallable[['UiServer', int, int], None] 20"""Set rank callback (server, cid, rank)""" 21 22 23async def create(host: str, 24 port: int, 25 state: hat.monitor.observer.server.State, 26 *, 27 set_rank_cb: SetRankCb | None = None, 28 htpasswd: Path | None = None, 29 autoflush_delay: float | None = 0.2, 30 shutdown_timeout: float = 0.1 31 ) -> 'UiServer': 32 """Create UI server 33 34 `autoflush_delay` and `shutdown_timeout` are passed directy to 35 `hat.juggler.listen`. 36 37 """ 38 server = UiServer() 39 server._set_rank_cb = set_rank_cb 40 server._state = json.Storage(_state_to_json(state)) 41 42 exit_stack = contextlib.ExitStack() 43 try: 44 ui_path = exit_stack.enter_context( 45 importlib.resources.as_file( 46 importlib.resources.files(__package__) / 'ui')) 47 48 server._srv = await juggler.listen(host, port, 49 request_cb=server._on_request, 50 static_dir=ui_path, 51 htpasswd_file=htpasswd, 52 autoflush_delay=autoflush_delay, 53 shutdown_timeout=shutdown_timeout, 54 state=server._state) 55 56 try: 57 server.async_group.spawn(aio.call_on_cancel, exit_stack.close) 58 59 except BaseException: 60 await aio.uncancellable(server.async_close()) 61 raise 62 63 except BaseException: 64 exit_stack.close() 65 raise 66 67 return server 68 69 70class UiServer(aio.Resource): 71 """UiServer 72 73 For creating new instance of this class see `create` coroutine. 74 75 """ 76 77 @property 78 def async_group(self) -> aio.Group: 79 """Async group""" 80 return self._srv.async_group 81 82 def set_state(self, state: hat.monitor.observer.server.State): 83 self._state.set([], _state_to_json(state)) 84 85 async def _on_request(self, conn, name, data): 86 if name == 'set_rank': 87 mlog.debug("received set_rank request") 88 if self._set_rank_cb: 89 await aio.call(self._set_rank_cb, self, data['cid'], 90 data['rank']) 91 92 else: 93 raise Exception('received invalid message type') 94 95 96def _state_to_json(state): 97 return {'mid': state.mid, 98 'local_components': list(_get_local_components(state)), 99 'global_components': list(_get_global_components(state))} 100 101 102def _get_local_components(state): 103 for i in state.local_components: 104 yield {'cid': i.cid, 105 'name': i.name, 106 'group': i.group, 107 'data': i.data, 108 'rank': i.rank} 109 110 111def _get_global_components(state): 112 for i in state.global_components: 113 yield {'cid': i.cid, 114 'mid': i.mid, 115 'name': i.name, 116 'group': i.group, 117 'data': i.data, 118 'rank': i.rank, 119 'blessing_req': {'token': i.blessing_req.token, 120 'timestamp': i.blessing_req.timestamp}, 121 'blessing_res': {'token': i.blessing_res.token, 122 'ready': i.blessing_res.ready}}
Module logger
SetRankCb: TypeAlias =
Callable[[ForwardRef('UiServer'), int, int], None | Awaitable[None]]
Set rank callback (server, cid, rank)
async def
create( host: str, port: int, state: hat.monitor.observer.server.State, *, set_rank_cb: Optional[Callable[[UiServer, int, int], None | Awaitable[None]]] = None, htpasswd: pathlib._local.Path | None = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1) -> UiServer:
24async def create(host: str, 25 port: int, 26 state: hat.monitor.observer.server.State, 27 *, 28 set_rank_cb: SetRankCb | None = None, 29 htpasswd: Path | None = None, 30 autoflush_delay: float | None = 0.2, 31 shutdown_timeout: float = 0.1 32 ) -> 'UiServer': 33 """Create UI server 34 35 `autoflush_delay` and `shutdown_timeout` are passed directy to 36 `hat.juggler.listen`. 37 38 """ 39 server = UiServer() 40 server._set_rank_cb = set_rank_cb 41 server._state = json.Storage(_state_to_json(state)) 42 43 exit_stack = contextlib.ExitStack() 44 try: 45 ui_path = exit_stack.enter_context( 46 importlib.resources.as_file( 47 importlib.resources.files(__package__) / 'ui')) 48 49 server._srv = await juggler.listen(host, port, 50 request_cb=server._on_request, 51 static_dir=ui_path, 52 htpasswd_file=htpasswd, 53 autoflush_delay=autoflush_delay, 54 shutdown_timeout=shutdown_timeout, 55 state=server._state) 56 57 try: 58 server.async_group.spawn(aio.call_on_cancel, exit_stack.close) 59 60 except BaseException: 61 await aio.uncancellable(server.async_close()) 62 raise 63 64 except BaseException: 65 exit_stack.close() 66 raise 67 68 return server
Create UI server
autoflush_delay
and shutdown_timeout
are passed directy to
hat.juggler.listen
.
class
UiServer(hat.aio.group.Resource):
71class UiServer(aio.Resource): 72 """UiServer 73 74 For creating new instance of this class see `create` coroutine. 75 76 """ 77 78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group 82 83 def set_state(self, state: hat.monitor.observer.server.State): 84 self._state.set([], _state_to_json(state)) 85 86 async def _on_request(self, conn, name, data): 87 if name == 'set_rank': 88 mlog.debug("received set_rank request") 89 if self._set_rank_cb: 90 await aio.call(self._set_rank_cb, self, data['cid'], 91 data['rank']) 92 93 else: 94 raise Exception('received invalid message type')
UiServer
For creating new instance of this class see create
coroutine.
async_group: hat.aio.group.Group
78 @property 79 def async_group(self) -> aio.Group: 80 """Async group""" 81 return self._srv.async_group
Async group