hat.monitor.observer.master
Observer Master
1"""Observer Master""" 2 3import contextlib 4import itertools 5import logging 6import typing 7 8from hat import aio 9from hat.drivers import chatter 10from hat.drivers import tcp 11 12from hat.monitor.observer import common 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18ComponentsCb: typing.TypeAlias = aio.AsyncCallable[['Master', 19 list[common.ComponentInfo]], # NOQA 20 None] 21"""Components callback""" 22 23BlessingCb: typing.TypeAlias = typing.Callable[['Master', 24 list[common.ComponentInfo]], 25 list[common.ComponentInfo]] 26"""Blessing callback""" 27 28 29async def listen(addr: tcp.Address, 30 *, 31 local_components: list[common.ComponentInfo] = [], 32 global_components_cb: ComponentsCb | None = None, 33 blessing_cb: BlessingCb | None = None, 34 **kwargs 35 ) -> 'Master': 36 """Create listening inactive Observer Master 37 38 All slave connections are always bound to server lifetime 39 (`bind_connections` should not be set). 40 41 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 42 43 """ 44 master = Master() 45 master._global_components_cb = global_components_cb 46 master._blessing_cb = blessing_cb 47 master._mid_conns = {} 48 master._mid_components = {0: [i._replace(mid=0) for i in local_components]} 49 master._global_components = list( 50 _get_global_components(master._mid_components)) 51 master._next_mids = itertools.count(1) 52 master._active_subgroup = None 53 54 master._srv = await chatter.listen(master._on_connection, addr, 55 bind_connections=True, 56 **kwargs) 57 58 return master 59 60 61class Master(aio.Resource): 62 """Observer Master 63 64 For creating new instance of this class see `listen` coroutine. 65 66 """ 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._srv.async_group 72 73 @property 74 def global_components(self) -> list[common.ComponentInfo]: 75 return self._global_components 76 77 @property 78 def is_active(self) -> bool: 79 return self._active_subgroup is not None 80 81 def set_active(self, active: bool): 82 if active and not self._active_subgroup: 83 self._active_subgroup = self.async_group.create_subgroup() 84 85 elif not active and self._active_subgroup: 86 self._active_subgroup.close() 87 self._active_subgroup = None 88 89 async def set_local_components(self, local_components): 90 await self._update_components(0, local_components) 91 92 def _on_connection(self, conn): 93 try: 94 self._active_subgroup.spawn(self._slave_loop, conn) 95 96 except Exception: 97 conn.close() 98 99 async def _slave_loop(self, conn): 100 mid = next(self._next_mids) 101 102 mlog.debug('starting slave loop (mid: %s)', mid) 103 try: 104 msg_type, msg_data = await common.receive_msg(conn) 105 106 if msg_type != 'HatObserver.MsgSlave': 107 raise Exception('unsupported message type') 108 109 self._mid_conns[mid] = conn 110 111 while True: 112 mlog.debug('received msg slave (mid: %s)', mid) 113 components = [common.component_info_from_sbs(i) 114 for i in msg_data['components']] 115 await self._update_components(mid, components) 116 117 msg_type, msg_data = await common.receive_msg(conn) 118 119 if msg_type != 'HatObserver.MsgSlave': 120 raise Exception('unsupported message type') 121 122 except ConnectionError: 123 pass 124 125 except Exception as e: 126 mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e) 127 128 finally: 129 mlog.debug('stopping slave loop (mid: %s)', mid) 130 conn.close() 131 await aio.uncancellable(self._remove_slave(mid)) 132 133 async def _remove_slave(self, mid): 134 conn = self._mid_conns.pop(mid, None) 135 if not conn: 136 return 137 138 await self._update_components(mid, None) 139 140 async def _update_components(self, mid, components): 141 if components is None: 142 if mid not in self._mid_components: 143 return 144 145 self._mid_components.pop(mid) 146 147 else: 148 blessing_reqs = {i.cid: i.blessing_req 149 for i in self._mid_components.get(mid, [])} 150 components = [ 151 i._replace(mid=mid, 152 blessing_req=blessing_reqs.get(i.cid, 153 i.blessing_req)) 154 for i in components] 155 156 if self._mid_components.get(mid) == components: 157 return 158 159 self._mid_components[mid] = components 160 161 global_components = list(_get_global_components(self._mid_components)) 162 self._global_components = (self._blessing_cb(self, global_components) 163 if self._blessing_cb else global_components) 164 165 if components is not None: 166 self._mid_components[mid] = [i for i in self._global_components 167 if i.mid == mid] 168 169 if self._global_components_cb: 170 await aio.call(self._global_components_cb, self, 171 self._global_components) 172 173 for mid, conn in list(self._mid_conns.items()): 174 with contextlib.suppress(ConnectionError): 175 await _send_msg_master(conn, mid, self._global_components) 176 177 178async def _send_msg_master(conn, mid, global_components): 179 components = [common.component_info_to_sbs(i) for i in global_components] 180 await common.send_msg(conn, 'HatObserver.MsgMaster', { 181 'mid': mid, 182 'components': components}) 183 184 185def _get_global_components(mid_components): 186 for mid in sorted(mid_components.keys()): 187 yield from mid_components[mid]
Module logger
ComponentsCb: TypeAlias =
Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], Optional[Awaitable[NoneType]]]
Components callback
BlessingCb: TypeAlias =
Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], list[hat.monitor.common.ComponentInfo]]
Blessing callback
async def
listen( addr: hat.drivers.tcp.Address, *, local_components: list[hat.monitor.common.ComponentInfo] = [], global_components_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], Optional[Awaitable[NoneType]]]] = None, blessing_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], list[hat.monitor.common.ComponentInfo]]] = None, **kwargs) -> Master:
30async def listen(addr: tcp.Address, 31 *, 32 local_components: list[common.ComponentInfo] = [], 33 global_components_cb: ComponentsCb | None = None, 34 blessing_cb: BlessingCb | None = None, 35 **kwargs 36 ) -> 'Master': 37 """Create listening inactive Observer Master 38 39 All slave connections are always bound to server lifetime 40 (`bind_connections` should not be set). 41 42 Additional arguments are passed directly to `hat.drivers.chatter.listen`. 43 44 """ 45 master = Master() 46 master._global_components_cb = global_components_cb 47 master._blessing_cb = blessing_cb 48 master._mid_conns = {} 49 master._mid_components = {0: [i._replace(mid=0) for i in local_components]} 50 master._global_components = list( 51 _get_global_components(master._mid_components)) 52 master._next_mids = itertools.count(1) 53 master._active_subgroup = None 54 55 master._srv = await chatter.listen(master._on_connection, addr, 56 bind_connections=True, 57 **kwargs) 58 59 return master
Create listening inactive Observer Master
All slave connections are always bound to server lifetime
(bind_connections
should not be set).
Additional arguments are passed directly to hat.drivers.chatter.listen
.
class
Master(hat.aio.group.Resource):
62class Master(aio.Resource): 63 """Observer Master 64 65 For creating new instance of this class see `listen` coroutine. 66 67 """ 68 69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._srv.async_group 73 74 @property 75 def global_components(self) -> list[common.ComponentInfo]: 76 return self._global_components 77 78 @property 79 def is_active(self) -> bool: 80 return self._active_subgroup is not None 81 82 def set_active(self, active: bool): 83 if active and not self._active_subgroup: 84 self._active_subgroup = self.async_group.create_subgroup() 85 86 elif not active and self._active_subgroup: 87 self._active_subgroup.close() 88 self._active_subgroup = None 89 90 async def set_local_components(self, local_components): 91 await self._update_components(0, local_components) 92 93 def _on_connection(self, conn): 94 try: 95 self._active_subgroup.spawn(self._slave_loop, conn) 96 97 except Exception: 98 conn.close() 99 100 async def _slave_loop(self, conn): 101 mid = next(self._next_mids) 102 103 mlog.debug('starting slave loop (mid: %s)', mid) 104 try: 105 msg_type, msg_data = await common.receive_msg(conn) 106 107 if msg_type != 'HatObserver.MsgSlave': 108 raise Exception('unsupported message type') 109 110 self._mid_conns[mid] = conn 111 112 while True: 113 mlog.debug('received msg slave (mid: %s)', mid) 114 components = [common.component_info_from_sbs(i) 115 for i in msg_data['components']] 116 await self._update_components(mid, components) 117 118 msg_type, msg_data = await common.receive_msg(conn) 119 120 if msg_type != 'HatObserver.MsgSlave': 121 raise Exception('unsupported message type') 122 123 except ConnectionError: 124 pass 125 126 except Exception as e: 127 mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e) 128 129 finally: 130 mlog.debug('stopping slave loop (mid: %s)', mid) 131 conn.close() 132 await aio.uncancellable(self._remove_slave(mid)) 133 134 async def _remove_slave(self, mid): 135 conn = self._mid_conns.pop(mid, None) 136 if not conn: 137 return 138 139 await self._update_components(mid, None) 140 141 async def _update_components(self, mid, components): 142 if components is None: 143 if mid not in self._mid_components: 144 return 145 146 self._mid_components.pop(mid) 147 148 else: 149 blessing_reqs = {i.cid: i.blessing_req 150 for i in self._mid_components.get(mid, [])} 151 components = [ 152 i._replace(mid=mid, 153 blessing_req=blessing_reqs.get(i.cid, 154 i.blessing_req)) 155 for i in components] 156 157 if self._mid_components.get(mid) == components: 158 return 159 160 self._mid_components[mid] = components 161 162 global_components = list(_get_global_components(self._mid_components)) 163 self._global_components = (self._blessing_cb(self, global_components) 164 if self._blessing_cb else global_components) 165 166 if components is not None: 167 self._mid_components[mid] = [i for i in self._global_components 168 if i.mid == mid] 169 170 if self._global_components_cb: 171 await aio.call(self._global_components_cb, self, 172 self._global_components) 173 174 for mid, conn in list(self._mid_conns.items()): 175 with contextlib.suppress(ConnectionError): 176 await _send_msg_master(conn, mid, self._global_components)
Observer Master
For creating new instance of this class see listen
coroutine.
async_group: hat.aio.group.Group
69 @property 70 def async_group(self) -> aio.Group: 71 """Async group""" 72 return self._srv.async_group
Async group
global_components: list[hat.monitor.common.ComponentInfo]
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close