hat.monitor.observer.server
Observer Server
1"""Observer Server""" 2 3import contextlib 4import itertools 5import logging 6import typing 7 8from hat import aio 9from hat import json 10from hat import util 11from hat.drivers import chatter 12from hat.drivers import tcp 13 14from hat.monitor.observer import common 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20StateCb: typing.TypeAlias = aio.AsyncCallable[['Server', 'State'], None] 21"""State callback""" 22 23 24class State(typing.NamedTuple): 25 mid: int 26 local_components: list[common.ComponentInfo] 27 global_components: list[common.ComponentInfo] 28 29 30async def listen(addr: tcp.Address, 31 *, 32 default_rank: int = 1, 33 close_timeout: float = 3, 34 state_cb: StateCb | None = None, 35 **kwargs 36 ) -> 'Server': 37 """Create listening Observer Server 38 39 All client connections are always bound to server lifetime regardles 40 of `bind_connections` argument. 41 42 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 43 44 """ 45 server = Server() 46 server._default_rank = default_rank 47 server._close_timeout = close_timeout 48 server._state_cb = state_cb 49 server._state = State(mid=0, 50 local_components=[], 51 global_components=[]) 52 server._next_cids = itertools.count(1) 53 server._cid_conns = {} 54 server._rank_cache = {} 55 56 server._srv = await chatter.listen(server._client_loop, addr, **kwargs) 57 58 return server 59 60 61class Server(aio.Resource): 62 """Observer Server 63 64 For creating new instance of this class see `listen` coroutine. 65 66 """ 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._srv.async_group 72 73 @property 74 def state(self) -> State: 75 """Server's state""" 76 return self._state 77 78 async def update(self, 79 mid: int, 80 global_components: list[common.ComponentInfo]): 81 """Update server's monitor id and global components""" 82 if (mid == self._state.mid and 83 global_components == self._state.global_components): 84 return 85 86 blessing_reqs = {i.cid: i.blessing_req 87 for i in global_components 88 if i.mid == mid} 89 90 local_components = [ 91 i._replace(mid=mid, 92 blessing_req=blessing_reqs.get(i.cid, i.blessing_req)) 93 for i in self._state.local_components] 94 95 await self._change_state(mid=mid, 96 local_components=local_components, 97 global_components=global_components) 98 99 async def set_rank(self, 100 cid: int, 101 rank: int): 102 """Set component rank""" 103 info = util.first(self._state.local_components, 104 lambda i: i.cid == cid) 105 if not info or info.rank == rank: 106 return 107 108 if info.name is not None: 109 self._rank_cache[info.name, info.group] = rank 110 111 updated_info = info._replace(rank=rank) 112 local_components = [(updated_info if i is info else i) 113 for i in self._state.local_components] 114 115 await self._change_state(local_components=local_components) 116 117 async def _client_loop(self, conn): 118 cid = next(self._next_cids) 119 self._cid_conns[cid] = conn 120 121 mlog.debug('starting client loop (cid: %s)', cid) 122 try: 123 local_components = [*self._state.local_components, 124 self._get_init_info(cid)] 125 await self._change_state(local_components=local_components) 126 127 while True: 128 msg_type, msg_data = await common.receive_msg(conn) 129 130 if msg_type != 'HatObserver.MsgClient': 131 raise Exception('unsupported message type') 132 133 mlog.debug('received msg client (cid: %s)', cid) 134 await self._update_client( 135 cid=cid, 136 name=msg_data['name'], 137 group=msg_data['group'], 138 data=json.decode(msg_data['data']), 139 blessing_res=common.blessing_res_from_sbs( 140 msg_data['blessingRes'])) 141 142 except ConnectionError: 143 pass 144 145 except Exception as e: 146 mlog.error('client loop error (cid: %s): %s', cid, e, exc_info=e) 147 148 finally: 149 mlog.debug('closing client loop (cid: %s)', cid) 150 await aio.uncancellable(self._remove_client(cid)) 151 152 async def _change_state(self, **kwargs): 153 self._state = self._state._replace(**kwargs) 154 155 for cid, conn in list(self._cid_conns.items()): 156 with contextlib.suppress(ConnectionError): 157 await common.send_msg(conn, 'HatObserver.MsgServer', { 158 'cid': cid, 159 'mid': self._state.mid, 160 'components': [common.component_info_to_sbs(info) 161 for info in self._state.global_components]}) 162 163 if self._state_cb: 164 await aio.call(self._state_cb, self, self._state) 165 166 async def _remove_client(self, cid): 167 conn = self._cid_conns.pop(cid) 168 169 try: 170 local_components = [i for i in self._state.local_components 171 if i.cid != cid] 172 await self._change_state(local_components=local_components) 173 174 except Exception as e: 175 mlog.error('change state error: %s', e, exc_info=e) 176 177 with contextlib.suppress(Exception): 178 await conn.send(chatter.Data('HatObserver.MsgClose', b'')) 179 await aio.wait_for(conn.wait_closed(), self._close_timeout) 180 181 await conn.async_close() 182 183 async def _update_client(self, cid, name, group, data, blessing_res): 184 info = util.first(self._state.local_components, 185 lambda i: i.cid == cid) 186 updated_info = info._replace(name=name, 187 group=group, 188 data=data, 189 blessing_res=blessing_res) 190 191 if info.name is None: 192 rank_cache_key = name, group 193 rank = self._rank_cache.get(rank_cache_key, info.rank) 194 updated_info = updated_info._replace(rank=rank) 195 196 if info == updated_info: 197 return 198 199 local_components = [(updated_info if i is info else i) 200 for i in self._state.local_components] 201 await self._change_state(local_components=local_components) 202 203 def _get_init_info(self, cid): 204 return common.ComponentInfo( 205 cid=cid, 206 mid=self._state.mid, 207 name=None, 208 group=None, 209 data=None, 210 rank=self._default_rank, 211 blessing_req=common.BlessingReq(token=None, 212 timestamp=None), 213 blessing_res=common.BlessingRes(token=None, 214 ready=False))
Module logger
StateCb: TypeAlias =
Callable[[ForwardRef('Server'), ForwardRef('State')], Optional[Awaitable[NoneType]]]
State callback
class
State(typing.NamedTuple):
25class State(typing.NamedTuple): 26 mid: int 27 local_components: list[common.ComponentInfo] 28 global_components: list[common.ComponentInfo]
State(mid, local_components, global_components)
State( mid: int, local_components: list[hat.monitor.common.ComponentInfo], global_components: list[hat.monitor.common.ComponentInfo])
Create new instance of State(mid, local_components, global_components)
Inherited Members
- builtins.tuple
- index
- count
async def
listen( addr: hat.drivers.tcp.Address, *, default_rank: int = 1, close_timeout: float = 3, state_cb: Optional[Callable[[Server, State], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Server:
31async def listen(addr: tcp.Address, 32 *, 33 default_rank: int = 1, 34 close_timeout: float = 3, 35 state_cb: StateCb | None = None, 36 **kwargs 37 ) -> 'Server': 38 """Create listening Observer Server 39 40 All client connections are always bound to server lifetime regardles 41 of `bind_connections` argument. 42 43 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 44 45 """ 46 server = Server() 47 server._default_rank = default_rank 48 server._close_timeout = close_timeout 49 server._state_cb = state_cb 50 server._state = State(mid=0, 51 local_components=[], 52 global_components=[]) 53 server._next_cids = itertools.count(1) 54 server._cid_conns = {} 55 server._rank_cache = {} 56 57 server._srv = await chatter.listen(server._client_loop, addr, **kwargs) 58 59 return server
Create listening Observer Server
All client connections are always bound to server lifetime regardles
of bind_connections
argument.
Additional arguments are passed directly to hat.drivers.chatter.listen
.
class
Server(hat.aio.group.Resource):
62class Server(aio.Resource): 63 """Observer Server 64 65 For creating new instance of this class see `listen` coroutine. 66 67 """ 68 69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._srv.async_group 73 74 @property 75 def state(self) -> State: 76 """Server's state""" 77 return self._state 78 79 async def update(self, 80 mid: int, 81 global_components: list[common.ComponentInfo]): 82 """Update server's monitor id and global components""" 83 if (mid == self._state.mid and 84 global_components == self._state.global_components): 85 return 86 87 blessing_reqs = {i.cid: i.blessing_req 88 for i in global_components 89 if i.mid == mid} 90 91 local_components = [ 92 i._replace(mid=mid, 93 blessing_req=blessing_reqs.get(i.cid, i.blessing_req)) 94 for i in self._state.local_components] 95 96 await self._change_state(mid=mid, 97 local_components=local_components, 98 global_components=global_components) 99 100 async def set_rank(self, 101 cid: int, 102 rank: int): 103 """Set component rank""" 104 info = util.first(self._state.local_components, 105 lambda i: i.cid == cid) 106 if not info or info.rank == rank: 107 return 108 109 if info.name is not None: 110 self._rank_cache[info.name, info.group] = rank 111 112 updated_info = info._replace(rank=rank) 113 local_components = [(updated_info if i is info else i) 114 for i in self._state.local_components] 115 116 await self._change_state(local_components=local_components) 117 118 async def _client_loop(self, conn): 119 cid = next(self._next_cids) 120 self._cid_conns[cid] = conn 121 122 mlog.debug('starting client loop (cid: %s)', cid) 123 try: 124 local_components = [*self._state.local_components, 125 self._get_init_info(cid)] 126 await self._change_state(local_components=local_components) 127 128 while True: 129 msg_type, msg_data = await common.receive_msg(conn) 130 131 if msg_type != 'HatObserver.MsgClient': 132 raise Exception('unsupported message type') 133 134 mlog.debug('received msg client (cid: %s)', cid) 135 await self._update_client( 136 cid=cid, 137 name=msg_data['name'], 138 group=msg_data['group'], 139 data=json.decode(msg_data['data']), 140 blessing_res=common.blessing_res_from_sbs( 141 msg_data['blessingRes'])) 142 143 except ConnectionError: 144 pass 145 146 except Exception as e: 147 mlog.error('client loop error (cid: %s): %s', cid, e, exc_info=e) 148 149 finally: 150 mlog.debug('closing client loop (cid: %s)', cid) 151 await aio.uncancellable(self._remove_client(cid)) 152 153 async def _change_state(self, **kwargs): 154 self._state = self._state._replace(**kwargs) 155 156 for cid, conn in list(self._cid_conns.items()): 157 with contextlib.suppress(ConnectionError): 158 await common.send_msg(conn, 'HatObserver.MsgServer', { 159 'cid': cid, 160 'mid': self._state.mid, 161 'components': [common.component_info_to_sbs(info) 162 for info in self._state.global_components]}) 163 164 if self._state_cb: 165 await aio.call(self._state_cb, self, self._state) 166 167 async def _remove_client(self, cid): 168 conn = self._cid_conns.pop(cid) 169 170 try: 171 local_components = [i for i in self._state.local_components 172 if i.cid != cid] 173 await self._change_state(local_components=local_components) 174 175 except Exception as e: 176 mlog.error('change state error: %s', e, exc_info=e) 177 178 with contextlib.suppress(Exception): 179 await conn.send(chatter.Data('HatObserver.MsgClose', b'')) 180 await aio.wait_for(conn.wait_closed(), self._close_timeout) 181 182 await conn.async_close() 183 184 async def _update_client(self, cid, name, group, data, blessing_res): 185 info = util.first(self._state.local_components, 186 lambda i: i.cid == cid) 187 updated_info = info._replace(name=name, 188 group=group, 189 data=data, 190 blessing_res=blessing_res) 191 192 if info.name is None: 193 rank_cache_key = name, group 194 rank = self._rank_cache.get(rank_cache_key, info.rank) 195 updated_info = updated_info._replace(rank=rank) 196 197 if info == updated_info: 198 return 199 200 local_components = [(updated_info if i is info else i) 201 for i in self._state.local_components] 202 await self._change_state(local_components=local_components) 203 204 def _get_init_info(self, cid): 205 return common.ComponentInfo( 206 cid=cid, 207 mid=self._state.mid, 208 name=None, 209 group=None, 210 data=None, 211 rank=self._default_rank, 212 blessing_req=common.BlessingReq(token=None, 213 timestamp=None), 214 blessing_res=common.BlessingRes(token=None, 215 ready=False))
Observer Server
For creating new instance of this class see listen
coroutine.
async_group: hat.aio.group.Group
69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._srv.async_group
Async group
79 async def update(self, 80 mid: int, 81 global_components: list[common.ComponentInfo]): 82 """Update server's monitor id and global components""" 83 if (mid == self._state.mid and 84 global_components == self._state.global_components): 85 return 86 87 blessing_reqs = {i.cid: i.blessing_req 88 for i in global_components 89 if i.mid == mid} 90 91 local_components = [ 92 i._replace(mid=mid, 93 blessing_req=blessing_reqs.get(i.cid, i.blessing_req)) 94 for i in self._state.local_components] 95 96 await self._change_state(mid=mid, 97 local_components=local_components, 98 global_components=global_components)
Update server's monitor id and global components
async def
set_rank(self, cid: int, rank: int):
100 async def set_rank(self, 101 cid: int, 102 rank: int): 103 """Set component rank""" 104 info = util.first(self._state.local_components, 105 lambda i: i.cid == cid) 106 if not info or info.rank == rank: 107 return 108 109 if info.name is not None: 110 self._rank_cache[info.name, info.group] = rank 111 112 updated_info = info._replace(rank=rank) 113 local_components = [(updated_info if i is info else i) 114 for i in self._state.local_components] 115 116 await self._change_state(local_components=local_components)
Set component rank
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close