hat.monitor.server.ui
Implementation of web server UI
1"""Implementation of web server UI""" 2 3import contextlib 4import importlib.resources 5import logging 6import typing 7 8from hat import aio 9from hat import json 10from hat import juggler 11 12import hat.monitor.observer.server 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18SetRankCb: typing.TypeAlias = aio.AsyncCallable[['UiServer', int, int], None] 19"""Set rank callback (server, cid, rank)""" 20 21 22async def create(host: str, 23 port: int, 24 state: hat.monitor.observer.server.State, 25 *, 26 set_rank_cb: SetRankCb | None = None, 27 autoflush_delay: float | None = 0.2, 28 shutdown_timeout: float = 0.1 29 ) -> 'UiServer': 30 """Create UI server 31 32 `autoflush_delay` and `shutdown_timeout` are passed directy to 33 `hat.juggler.listen`. 34 35 """ 36 server = UiServer() 37 server._set_rank_cb = set_rank_cb 38 server._state = json.Storage(_state_to_json(state)) 39 40 exit_stack = contextlib.ExitStack() 41 try: 42 ui_path = exit_stack.enter_context( 43 importlib.resources.as_file( 44 importlib.resources.files(__package__) / 'ui')) 45 46 server._srv = await juggler.listen(host, port, 47 request_cb=server._on_request, 48 static_dir=ui_path, 49 autoflush_delay=autoflush_delay, 50 shutdown_timeout=shutdown_timeout, 51 state=server._state) 52 53 try: 54 server.async_group.spawn(aio.call_on_cancel, exit_stack.close) 55 56 except BaseException: 57 await aio.uncancellable(server.async_close()) 58 raise 59 60 except BaseException: 61 exit_stack.close() 62 raise 63 64 return server 65 66 67class UiServer(aio.Resource): 68 """UiServer 69 70 For creating new instance of this class see `create` coroutine. 71 72 """ 73 74 @property 75 def async_group(self) -> aio.Group: 76 """Async group""" 77 return self._srv.async_group 78 79 def set_state(self, state: hat.monitor.observer.server.State): 80 self._state.set([], _state_to_json(state)) 81 82 async def _on_request(self, conn, name, data): 83 if name == 'set_rank': 84 mlog.debug("received set_rank request") 85 if self._set_rank_cb: 86 await aio.call(self._set_rank_cb, self, data['cid'], 87 data['rank']) 88 89 else: 90 raise Exception('received invalid message type') 91 92 93def _state_to_json(state): 94 return {'mid': state.mid, 95 'local_components': list(_get_local_components(state)), 96 'global_components': list(_get_global_components(state))} 97 98 99def _get_local_components(state): 100 for i in state.local_components: 101 yield {'cid': i.cid, 102 'name': i.name, 103 'group': i.group, 104 'data': i.data, 105 'rank': i.rank} 106 107 108def _get_global_components(state): 109 for i in state.global_components: 110 yield {'cid': i.cid, 111 'mid': i.mid, 112 'name': i.name, 113 'group': i.group, 114 'data': i.data, 115 'rank': i.rank, 116 'blessing_req': {'token': i.blessing_req.token, 117 'timestamp': i.blessing_req.timestamp}, 118 'blessing_res': {'token': i.blessing_res.token, 119 'ready': i.blessing_res.ready}}
Module logger
SetRankCb: TypeAlias =
Callable[[ForwardRef('UiServer'), int, int], Optional[Awaitable[NoneType]]]
Set rank callback (server, cid, rank)
async def
create( host: str, port: int, state: hat.monitor.observer.server.State, *, set_rank_cb: Optional[Callable[[UiServer, int, int], Optional[Awaitable[NoneType]]]] = None, autoflush_delay: float | None = 0.2, shutdown_timeout: float = 0.1) -> UiServer:
23async def create(host: str, 24 port: int, 25 state: hat.monitor.observer.server.State, 26 *, 27 set_rank_cb: SetRankCb | None = None, 28 autoflush_delay: float | None = 0.2, 29 shutdown_timeout: float = 0.1 30 ) -> 'UiServer': 31 """Create UI server 32 33 `autoflush_delay` and `shutdown_timeout` are passed directy to 34 `hat.juggler.listen`. 35 36 """ 37 server = UiServer() 38 server._set_rank_cb = set_rank_cb 39 server._state = json.Storage(_state_to_json(state)) 40 41 exit_stack = contextlib.ExitStack() 42 try: 43 ui_path = exit_stack.enter_context( 44 importlib.resources.as_file( 45 importlib.resources.files(__package__) / 'ui')) 46 47 server._srv = await juggler.listen(host, port, 48 request_cb=server._on_request, 49 static_dir=ui_path, 50 autoflush_delay=autoflush_delay, 51 shutdown_timeout=shutdown_timeout, 52 state=server._state) 53 54 try: 55 server.async_group.spawn(aio.call_on_cancel, exit_stack.close) 56 57 except BaseException: 58 await aio.uncancellable(server.async_close()) 59 raise 60 61 except BaseException: 62 exit_stack.close() 63 raise 64 65 return server
Create UI server
autoflush_delay
and shutdown_timeout
are passed directy to
hat.juggler.listen
.
class
UiServer(hat.aio.group.Resource):
68class UiServer(aio.Resource): 69 """UiServer 70 71 For creating new instance of this class see `create` coroutine. 72 73 """ 74 75 @property 76 def async_group(self) -> aio.Group: 77 """Async group""" 78 return self._srv.async_group 79 80 def set_state(self, state: hat.monitor.observer.server.State): 81 self._state.set([], _state_to_json(state)) 82 83 async def _on_request(self, conn, name, data): 84 if name == 'set_rank': 85 mlog.debug("received set_rank request") 86 if self._set_rank_cb: 87 await aio.call(self._set_rank_cb, self, data['cid'], 88 data['rank']) 89 90 else: 91 raise Exception('received invalid message type')
UiServer
For creating new instance of this class see create
coroutine.
async_group: hat.aio.group.Group
75 @property 76 def async_group(self) -> aio.Group: 77 """Async group""" 78 return self._srv.async_group
Async group
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close