hat.monitor.observer.master
Observer Master
1"""Observer Master""" 2 3from collections.abc import Iterable 4import contextlib 5import itertools 6import logging 7import typing 8 9from hat import aio 10from hat.drivers import chatter 11from hat.drivers import tcp 12 13from hat.monitor.observer import common 14 15 16mlog: logging.Logger = logging.getLogger(__name__) 17"""Module logger""" 18 19ComponentsCb: typing.TypeAlias = aio.AsyncCallable[ 20 ['Master', list[common.ComponentInfo]], 21 None] 22"""Components callback""" 23 24BlessingCb: typing.TypeAlias = typing.Callable[ 25 ['Master', Iterable[common.ComponentInfo]], 26 Iterable[tuple[common.Mid, common.Cid, common.BlessingReq]]] 27"""Blessing callback""" 28 29 30async def listen(addr: tcp.Address, 31 *, 32 global_components_cb: ComponentsCb | None = None, 33 blessing_cb: BlessingCb | None = None, 34 **kwargs 35 ) -> 'Master': 36 """Create listening inactive Observer Master 37 38 All slave connections are always bound to server lifetime 39 (`bind_connections` should not be set). 40 41 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 42 43 """ 44 master = Master() 45 master._global_components_cb = global_components_cb 46 master._blessing_cb = blessing_cb 47 master._mid_conns = {} 48 master._mid_cid_infos = {0: {}} 49 master._global_components = [] 50 master._next_mids = itertools.count(1) 51 master._active_subgroup = None 52 53 master._srv = await chatter.listen(master._on_connection, addr, 54 bind_connections=True, 55 **kwargs) 56 57 return master 58 59 60class Master(aio.Resource): 61 """Observer Master 62 63 For creating new instance of this class see `listen` coroutine. 64 65 """ 66 67 @property 68 def async_group(self) -> aio.Group: 69 """Async group""" 70 return self._srv.async_group 71 72 @property 73 def global_components(self) -> list[common.ComponentInfo]: 74 return self._global_components 75 76 @property 77 def is_active(self) -> bool: 78 return self._active_subgroup is not None 79 80 def set_active(self, active: bool): 81 if active and not self._active_subgroup: 82 self._active_subgroup = self.async_group.create_subgroup() 83 84 elif not active and self._active_subgroup: 85 self._active_subgroup.close() 86 self._active_subgroup = None 87 88 async def set_local_components(self, local_components: Iterable[common.ComponentInfo]): # NOQA 89 await self._update_components(0, local_components) 90 91 async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]): # NOQA 92 change = False 93 94 for cid, blessing_req in blessing_reqs: 95 info = self._mid_cid_infos[0].get(cid) 96 if not info or info.blessing_req == blessing_req: 97 continue 98 99 self._mid_cid_infos[0][cid] = info._replace( 100 blessing_req=blessing_req) 101 change = True 102 103 if change: 104 await self._update_global_components() 105 106 def _on_connection(self, conn): 107 try: 108 self._active_subgroup.spawn(self._slave_loop, conn) 109 110 except Exception: 111 conn.close() 112 113 async def _slave_loop(self, conn): 114 mid = next(self._next_mids) 115 116 mlog.debug('starting slave loop (mid: %s)', mid) 117 try: 118 while True: 119 msg_type, msg_data = await common.receive_msg(conn) 120 121 if msg_type != 'HatObserver.MsgSlave': 122 raise Exception('unsupported message type') 123 124 mlog.debug('received msg slave (mid: %s)', mid) 125 components = (common.component_info_from_sbs(i) 126 for i in msg_data['components']) 127 await self._update_components(mid, components) 128 129 if mid not in self._mid_conns: 130 self._mid_conns[mid] = conn 131 132 global_components = list( 133 _flatten_mid_cid_infos(self._mid_cid_infos)) 134 await _send_msg_master(conn, mid, global_components) 135 136 except ConnectionError: 137 pass 138 139 except Exception as e: 140 mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e) 141 142 finally: 143 mlog.debug('stopping slave loop (mid: %s)', mid) 144 conn.close() 145 await aio.uncancellable(self._remove_slave(mid)) 146 147 async def _remove_slave(self, mid): 148 self._mid_conns.pop(mid, None) 149 150 if not self._mid_cid_infos.pop(mid, None): 151 return 152 153 await self._update_global_components() 154 155 async def _update_components(self, mid, components): 156 cid_infos = self._mid_cid_infos.get(mid, {}) 157 158 self._mid_cid_infos[mid] = {} 159 for info in components: 160 info = info._replace(mid=mid) 161 162 old_info = cid_infos.get(info.cid) 163 if old_info: 164 info = info._replace(blessing_req=old_info.blessing_req) 165 166 self._mid_cid_infos[mid][info.cid] = info 167 168 if self._mid_cid_infos[mid] == cid_infos: 169 return 170 171 await self._update_global_components() 172 173 async def _update_global_components(self): 174 if self._blessing_cb: 175 infos = _flatten_mid_cid_infos(self._mid_cid_infos) 176 177 for mid, cid, blessing_req in self._blessing_cb(self, infos): 178 info = self._mid_cid_infos[mid][cid] 179 info = info._replace(blessing_req=blessing_req) 180 self._mid_cid_infos[mid][cid] = info 181 182 global_components = list(_flatten_mid_cid_infos(self._mid_cid_infos)) 183 if global_components == self._global_components: 184 return 185 186 self._global_components = global_components 187 188 if self._global_components_cb: 189 await aio.call(self._global_components_cb, self, global_components) 190 191 for mid, conn in list(self._mid_conns.items()): 192 with contextlib.suppress(ConnectionError): 193 await _send_msg_master(conn, mid, global_components) 194 195 196async def _send_msg_master(conn, mid, global_components): 197 components = [common.component_info_to_sbs(i) for i in global_components] 198 await common.send_msg(conn, 'HatObserver.MsgMaster', { 199 'mid': mid, 200 'components': components}) 201 202 203def _flatten_mid_cid_infos(mid_cid_infos): 204 for cid_infos in mid_cid_infos.values(): 205 yield from cid_infos.values()
Module logger
ComponentsCb: TypeAlias =
Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], None | Awaitable[None]]
Components callback
BlessingCb: TypeAlias =
Callable[[ForwardRef('Master'), Iterable[hat.monitor.common.ComponentInfo]], Iterable[tuple[int, int, hat.monitor.common.BlessingReq]]]
Blessing callback
async def
listen( addr: hat.drivers.tcp.Address, *, global_components_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], None | Awaitable[None]]] = None, blessing_cb: Optional[Callable[[Master, Iterable[hat.monitor.common.ComponentInfo]], Iterable[tuple[int, int, hat.monitor.common.BlessingReq]]]] = None, **kwargs) -> Master:
31async def listen(addr: tcp.Address, 32 *, 33 global_components_cb: ComponentsCb | None = None, 34 blessing_cb: BlessingCb | None = None, 35 **kwargs 36 ) -> 'Master': 37 """Create listening inactive Observer Master 38 39 All slave connections are always bound to server lifetime 40 (`bind_connections` should not be set). 41 42 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 43 44 """ 45 master = Master() 46 master._global_components_cb = global_components_cb 47 master._blessing_cb = blessing_cb 48 master._mid_conns = {} 49 master._mid_cid_infos = {0: {}} 50 master._global_components = [] 51 master._next_mids = itertools.count(1) 52 master._active_subgroup = None 53 54 master._srv = await chatter.listen(master._on_connection, addr, 55 bind_connections=True, 56 **kwargs) 57 58 return master
Create listening inactive Observer Master
All slave connections are always bound to server lifetime
(bind_connections
should not be set).
Additional arguments are passed directly to hat.drivers.chatter.listen
.
class
Master(hat.aio.group.Resource):
61class Master(aio.Resource): 62 """Observer Master 63 64 For creating new instance of this class see `listen` coroutine. 65 66 """ 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._srv.async_group 72 73 @property 74 def global_components(self) -> list[common.ComponentInfo]: 75 return self._global_components 76 77 @property 78 def is_active(self) -> bool: 79 return self._active_subgroup is not None 80 81 def set_active(self, active: bool): 82 if active and not self._active_subgroup: 83 self._active_subgroup = self.async_group.create_subgroup() 84 85 elif not active and self._active_subgroup: 86 self._active_subgroup.close() 87 self._active_subgroup = None 88 89 async def set_local_components(self, local_components: Iterable[common.ComponentInfo]): # NOQA 90 await self._update_components(0, local_components) 91 92 async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]): # NOQA 93 change = False 94 95 for cid, blessing_req in blessing_reqs: 96 info = self._mid_cid_infos[0].get(cid) 97 if not info or info.blessing_req == blessing_req: 98 continue 99 100 self._mid_cid_infos[0][cid] = info._replace( 101 blessing_req=blessing_req) 102 change = True 103 104 if change: 105 await self._update_global_components() 106 107 def _on_connection(self, conn): 108 try: 109 self._active_subgroup.spawn(self._slave_loop, conn) 110 111 except Exception: 112 conn.close() 113 114 async def _slave_loop(self, conn): 115 mid = next(self._next_mids) 116 117 mlog.debug('starting slave loop (mid: %s)', mid) 118 try: 119 while True: 120 msg_type, msg_data = await common.receive_msg(conn) 121 122 if msg_type != 'HatObserver.MsgSlave': 123 raise Exception('unsupported message type') 124 125 mlog.debug('received msg slave (mid: %s)', mid) 126 components = (common.component_info_from_sbs(i) 127 for i in msg_data['components']) 128 await self._update_components(mid, components) 129 130 if mid not in self._mid_conns: 131 self._mid_conns[mid] = conn 132 133 global_components = list( 134 _flatten_mid_cid_infos(self._mid_cid_infos)) 135 await _send_msg_master(conn, mid, global_components) 136 137 except ConnectionError: 138 pass 139 140 except Exception as e: 141 mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e) 142 143 finally: 144 mlog.debug('stopping slave loop (mid: %s)', mid) 145 conn.close() 146 await aio.uncancellable(self._remove_slave(mid)) 147 148 async def _remove_slave(self, mid): 149 self._mid_conns.pop(mid, None) 150 151 if not self._mid_cid_infos.pop(mid, None): 152 return 153 154 await self._update_global_components() 155 156 async def _update_components(self, mid, components): 157 cid_infos = self._mid_cid_infos.get(mid, {}) 158 159 self._mid_cid_infos[mid] = {} 160 for info in components: 161 info = info._replace(mid=mid) 162 163 old_info = cid_infos.get(info.cid) 164 if old_info: 165 info = info._replace(blessing_req=old_info.blessing_req) 166 167 self._mid_cid_infos[mid][info.cid] = info 168 169 if self._mid_cid_infos[mid] == cid_infos: 170 return 171 172 await self._update_global_components() 173 174 async def _update_global_components(self): 175 if self._blessing_cb: 176 infos = _flatten_mid_cid_infos(self._mid_cid_infos) 177 178 for mid, cid, blessing_req in self._blessing_cb(self, infos): 179 info = self._mid_cid_infos[mid][cid] 180 info = info._replace(blessing_req=blessing_req) 181 self._mid_cid_infos[mid][cid] = info 182 183 global_components = list(_flatten_mid_cid_infos(self._mid_cid_infos)) 184 if global_components == self._global_components: 185 return 186 187 self._global_components = global_components 188 189 if self._global_components_cb: 190 await aio.call(self._global_components_cb, self, global_components) 191 192 for mid, conn in list(self._mid_conns.items()): 193 with contextlib.suppress(ConnectionError): 194 await _send_msg_master(conn, mid, global_components)
Observer Master
For creating new instance of this class see listen
coroutine.
async_group: hat.aio.group.Group
68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._srv.async_group
Async group
global_components: list[hat.monitor.common.ComponentInfo]
async def
set_local_blessing_reqs( self, blessing_reqs: Iterable[tuple[int, hat.monitor.common.BlessingReq]]):
92 async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]): # NOQA 93 change = False 94 95 for cid, blessing_req in blessing_reqs: 96 info = self._mid_cid_infos[0].get(cid) 97 if not info or info.blessing_req == blessing_req: 98 continue 99 100 self._mid_cid_infos[0][cid] = info._replace( 101 blessing_req=blessing_req) 102 change = True 103 104 if change: 105 await self._update_global_components()