hat.monitor.observer.slave
Observer Slave
1"""Observer Slave""" 2 3import logging 4import typing 5 6from hat import aio 7from hat.drivers import chatter 8from hat.drivers import tcp 9 10from hat.monitor.observer import common 11 12 13mlog: logging.Logger = logging.getLogger(__name__) 14"""Module logger""" 15 16StateCb: typing.TypeAlias = aio.AsyncCallable[['Slave', 'State'], None] 17"""State callback""" 18 19 20class State(typing.NamedTuple): 21 mid: int | None 22 global_components: list[common.ComponentInfo] 23 24 25async def connect(addr: tcp.Address, 26 *, 27 local_components: list[common.ComponentInfo] = [], 28 state_cb: StateCb | None = None, 29 **kwargs 30 ) -> 'Slave': 31 """Connect to Observer Master 32 33 Additional arguments are passed directly to `hat.drivers.chatter.connect`. 34 35 """ 36 conn = await chatter.connect(addr, **kwargs) 37 38 try: 39 return Slave(conn=conn, 40 local_components=local_components, 41 state_cb=state_cb) 42 43 except Exception: 44 await aio.uncancellable(conn.async_close()) 45 raise 46 47 48class Slave(aio.Resource): 49 """Observer Slave 50 51 For creating new instance of this class see `connect` coroutine. 52 53 """ 54 55 def __init__(self, 56 conn: chatter.Connection, 57 local_components: list[common.ComponentInfo], 58 state_cb: StateCb | None): 59 self._conn = conn 60 self._local_components = local_components 61 self._state_cb = state_cb 62 self._state = State(mid=None, 63 global_components=[]) 64 65 self.async_group.spawn(self._slave_loop) 66 67 @property 68 def async_group(self) -> aio.Group: 69 """Async group""" 70 return self._conn.async_group 71 72 @property 73 def state(self) -> State: 74 """Slave's state""" 75 return self._state 76 77 async def update(self, local_components: list[common.ComponentInfo]): 78 """Update slaves's local components""" 79 if self._local_components == local_components: 80 return 81 82 self._local_components = local_components 83 await self._send_msg_slave(local_components) 84 85 async def _slave_loop(self): 86 mlog.debug('starting slave loop') 87 try: 88 await self._send_msg_slave(self._local_components) 89 90 while True: 91 msg_type, msg_data = await common.receive_msg(self._conn) 92 93 if msg_type != 'HatObserver.MsgMaster': 94 raise Exception('unsupported message type') 95 96 mlog.debug('received msg master') 97 components = [common.component_info_from_sbs(i) 98 for i in msg_data['components']] 99 self._state = State(mid=msg_data['mid'], 100 global_components=components) 101 102 if self._state_cb: 103 await aio.call(self._state_cb, self, self._state) 104 105 except ConnectionError: 106 pass 107 108 except Exception as e: 109 mlog.error('slave loop error: %s', e, exc_info=e) 110 111 finally: 112 mlog.debug('stopping slave loop') 113 self.close() 114 115 async def _send_msg_slave(self, local_components): 116 await common.send_msg(self._conn, 'HatObserver.MsgSlave', { 117 'components': [common.component_info_to_sbs(i) 118 for i in local_components]})
Module logger
StateCb: TypeAlias =
Callable[[ForwardRef('Slave'), ForwardRef('State')], Optional[Awaitable[NoneType]]]
State callback
class
State(typing.NamedTuple):
21class State(typing.NamedTuple): 22 mid: int | None 23 global_components: list[common.ComponentInfo]
State(mid, global_components)
State( mid: int | None, global_components: list[hat.monitor.common.ComponentInfo])
Create new instance of State(mid, global_components)
Inherited Members
- builtins.tuple
- index
- count
async def
connect( addr: hat.drivers.tcp.Address, *, local_components: list[hat.monitor.common.ComponentInfo] = [], state_cb: Optional[Callable[[Slave, State], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Slave:
26async def connect(addr: tcp.Address, 27 *, 28 local_components: list[common.ComponentInfo] = [], 29 state_cb: StateCb | None = None, 30 **kwargs 31 ) -> 'Slave': 32 """Connect to Observer Master 33 34 Additional arguments are passed directly to `hat.drivers.chatter.connect`. 35 36 """ 37 conn = await chatter.connect(addr, **kwargs) 38 39 try: 40 return Slave(conn=conn, 41 local_components=local_components, 42 state_cb=state_cb) 43 44 except Exception: 45 await aio.uncancellable(conn.async_close()) 46 raise
Connect to Observer Master
Additional arguments are passed directly to hat.drivers.chatter.connect
.
class
Slave(hat.aio.group.Resource):
49class Slave(aio.Resource): 50 """Observer Slave 51 52 For creating new instance of this class see `connect` coroutine. 53 54 """ 55 56 def __init__(self, 57 conn: chatter.Connection, 58 local_components: list[common.ComponentInfo], 59 state_cb: StateCb | None): 60 self._conn = conn 61 self._local_components = local_components 62 self._state_cb = state_cb 63 self._state = State(mid=None, 64 global_components=[]) 65 66 self.async_group.spawn(self._slave_loop) 67 68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._conn.async_group 72 73 @property 74 def state(self) -> State: 75 """Slave's state""" 76 return self._state 77 78 async def update(self, local_components: list[common.ComponentInfo]): 79 """Update slaves's local components""" 80 if self._local_components == local_components: 81 return 82 83 self._local_components = local_components 84 await self._send_msg_slave(local_components) 85 86 async def _slave_loop(self): 87 mlog.debug('starting slave loop') 88 try: 89 await self._send_msg_slave(self._local_components) 90 91 while True: 92 msg_type, msg_data = await common.receive_msg(self._conn) 93 94 if msg_type != 'HatObserver.MsgMaster': 95 raise Exception('unsupported message type') 96 97 mlog.debug('received msg master') 98 components = [common.component_info_from_sbs(i) 99 for i in msg_data['components']] 100 self._state = State(mid=msg_data['mid'], 101 global_components=components) 102 103 if self._state_cb: 104 await aio.call(self._state_cb, self, self._state) 105 106 except ConnectionError: 107 pass 108 109 except Exception as e: 110 mlog.error('slave loop error: %s', e, exc_info=e) 111 112 finally: 113 mlog.debug('stopping slave loop') 114 self.close() 115 116 async def _send_msg_slave(self, local_components): 117 await common.send_msg(self._conn, 'HatObserver.MsgSlave', { 118 'components': [common.component_info_to_sbs(i) 119 for i in local_components]})
Observer Slave
For creating new instance of this class see connect
coroutine.
Slave( conn: hat.drivers.chatter.Connection, local_components: list[hat.monitor.common.ComponentInfo], state_cb: Optional[Callable[[Slave, State], Optional[Awaitable[NoneType]]]])
56 def __init__(self, 57 conn: chatter.Connection, 58 local_components: list[common.ComponentInfo], 59 state_cb: StateCb | None): 60 self._conn = conn 61 self._local_components = local_components 62 self._state_cb = state_cb 63 self._state = State(mid=None, 64 global_components=[]) 65 66 self.async_group.spawn(self._slave_loop)
async_group: hat.aio.group.Group
68 @property 69 def async_group(self) -> aio.Group: 70 """Async group""" 71 return self._conn.async_group
Async group
78 async def update(self, local_components: list[common.ComponentInfo]): 79 """Update slaves's local components""" 80 if self._local_components == local_components: 81 return 82 83 self._local_components = local_components 84 await self._send_msg_slave(local_components)
Update slaves's local components
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close