hat.monitor.observer.slave

Observer Slave

  1"""Observer Slave"""
  2
  3import logging
  4import typing
  5
  6from hat import aio
  7from hat.drivers import chatter
  8from hat.drivers import tcp
  9
 10from hat.monitor.observer import common
 11
 12
 13mlog: logging.Logger = logging.getLogger(__name__)
 14"""Module logger"""
 15
 16StateCb: typing.TypeAlias = aio.AsyncCallable[['Slave', 'State'], None]
 17"""State callback"""
 18
 19
 20class State(typing.NamedTuple):
 21    mid: int | None
 22    global_components: list[common.ComponentInfo]
 23
 24
 25async def connect(addr: tcp.Address,
 26                  *,
 27                  local_components: list[common.ComponentInfo] = [],
 28                  state_cb: StateCb | None = None,
 29                  **kwargs
 30                  ) -> 'Slave':
 31    """Connect to Observer Master
 32
 33    Additional arguments are passed directly to `hat.drivers.chatter.connect`.
 34
 35    """
 36    conn = await chatter.connect(addr, **kwargs)
 37
 38    try:
 39        return Slave(conn=conn,
 40                     local_components=local_components,
 41                     state_cb=state_cb)
 42
 43    except Exception:
 44        await aio.uncancellable(conn.async_close())
 45        raise
 46
 47
 48class Slave(aio.Resource):
 49    """Observer Slave
 50
 51    For creating new instance of this class see `connect` coroutine.
 52
 53    """
 54
 55    def __init__(self,
 56                 conn: chatter.Connection,
 57                 local_components: list[common.ComponentInfo],
 58                 state_cb: StateCb | None):
 59        self._conn = conn
 60        self._local_components = local_components
 61        self._state_cb = state_cb
 62        self._state = State(mid=None,
 63                            global_components=[])
 64
 65        self.async_group.spawn(self._slave_loop)
 66
 67    @property
 68    def async_group(self) -> aio.Group:
 69        """Async group"""
 70        return self._conn.async_group
 71
 72    @property
 73    def state(self) -> State:
 74        """Slave's state"""
 75        return self._state
 76
 77    async def update(self, local_components: list[common.ComponentInfo]):
 78        """Update slaves's local components"""
 79        if self._local_components == local_components:
 80            return
 81
 82        self._local_components = local_components
 83        await self._send_msg_slave(local_components)
 84
 85    async def _slave_loop(self):
 86        mlog.debug('starting slave loop')
 87        try:
 88            await self._send_msg_slave(self._local_components)
 89
 90            while True:
 91                msg_type, msg_data = await common.receive_msg(self._conn)
 92
 93                if msg_type != 'HatObserver.MsgMaster':
 94                    raise Exception('unsupported message type')
 95
 96                mlog.debug('received msg master')
 97                components = [common.component_info_from_sbs(i)
 98                              for i in msg_data['components']]
 99                self._state = State(mid=msg_data['mid'],
100                                    global_components=components)
101
102                if self._state_cb:
103                    await aio.call(self._state_cb, self, self._state)
104
105        except ConnectionError:
106            pass
107
108        except Exception as e:
109            mlog.error('slave loop error: %s', e, exc_info=e)
110
111        finally:
112            mlog.debug('stopping slave loop')
113            self.close()
114
115    async def _send_msg_slave(self, local_components):
116        await common.send_msg(self._conn, 'HatObserver.MsgSlave', {
117            'components': [common.component_info_to_sbs(i)
118                           for i in local_components]})
mlog: logging.Logger = <Logger hat.monitor.observer.slave (WARNING)>

Module logger

StateCb: TypeAlias = Callable[[ForwardRef('Slave'), ForwardRef('State')], Optional[Awaitable[NoneType]]]

State callback

class State(typing.NamedTuple):
21class State(typing.NamedTuple):
22    mid: int | None
23    global_components: list[common.ComponentInfo]

State(mid, global_components)

State( mid: int | None, global_components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(mid, global_components)

mid: int | None

Alias for field number 0

global_components: list[hat.monitor.common.ComponentInfo]

Alias for field number 1

Inherited Members
builtins.tuple
index
count
async def connect( addr: hat.drivers.tcp.Address, *, local_components: list[hat.monitor.common.ComponentInfo] = [], state_cb: Optional[Callable[[Slave, State], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Slave:
26async def connect(addr: tcp.Address,
27                  *,
28                  local_components: list[common.ComponentInfo] = [],
29                  state_cb: StateCb | None = None,
30                  **kwargs
31                  ) -> 'Slave':
32    """Connect to Observer Master
33
34    Additional arguments are passed directly to `hat.drivers.chatter.connect`.
35
36    """
37    conn = await chatter.connect(addr, **kwargs)
38
39    try:
40        return Slave(conn=conn,
41                     local_components=local_components,
42                     state_cb=state_cb)
43
44    except Exception:
45        await aio.uncancellable(conn.async_close())
46        raise

Connect to Observer Master

Additional arguments are passed directly to hat.drivers.chatter.connect.

class Slave(hat.aio.group.Resource):
 49class Slave(aio.Resource):
 50    """Observer Slave
 51
 52    For creating new instance of this class see `connect` coroutine.
 53
 54    """
 55
 56    def __init__(self,
 57                 conn: chatter.Connection,
 58                 local_components: list[common.ComponentInfo],
 59                 state_cb: StateCb | None):
 60        self._conn = conn
 61        self._local_components = local_components
 62        self._state_cb = state_cb
 63        self._state = State(mid=None,
 64                            global_components=[])
 65
 66        self.async_group.spawn(self._slave_loop)
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._conn.async_group
 72
 73    @property
 74    def state(self) -> State:
 75        """Slave's state"""
 76        return self._state
 77
 78    async def update(self, local_components: list[common.ComponentInfo]):
 79        """Update slaves's local components"""
 80        if self._local_components == local_components:
 81            return
 82
 83        self._local_components = local_components
 84        await self._send_msg_slave(local_components)
 85
 86    async def _slave_loop(self):
 87        mlog.debug('starting slave loop')
 88        try:
 89            await self._send_msg_slave(self._local_components)
 90
 91            while True:
 92                msg_type, msg_data = await common.receive_msg(self._conn)
 93
 94                if msg_type != 'HatObserver.MsgMaster':
 95                    raise Exception('unsupported message type')
 96
 97                mlog.debug('received msg master')
 98                components = [common.component_info_from_sbs(i)
 99                              for i in msg_data['components']]
100                self._state = State(mid=msg_data['mid'],
101                                    global_components=components)
102
103                if self._state_cb:
104                    await aio.call(self._state_cb, self, self._state)
105
106        except ConnectionError:
107            pass
108
109        except Exception as e:
110            mlog.error('slave loop error: %s', e, exc_info=e)
111
112        finally:
113            mlog.debug('stopping slave loop')
114            self.close()
115
116    async def _send_msg_slave(self, local_components):
117        await common.send_msg(self._conn, 'HatObserver.MsgSlave', {
118            'components': [common.component_info_to_sbs(i)
119                           for i in local_components]})

Observer Slave

For creating new instance of this class see connect coroutine.

Slave( conn: hat.drivers.chatter.Connection, local_components: list[hat.monitor.common.ComponentInfo], state_cb: Optional[Callable[[Slave, State], Optional[Awaitable[NoneType]]]])
56    def __init__(self,
57                 conn: chatter.Connection,
58                 local_components: list[common.ComponentInfo],
59                 state_cb: StateCb | None):
60        self._conn = conn
61        self._local_components = local_components
62        self._state_cb = state_cb
63        self._state = State(mid=None,
64                            global_components=[])
65
66        self.async_group.spawn(self._slave_loop)
async_group: hat.aio.group.Group
68    @property
69    def async_group(self) -> aio.Group:
70        """Async group"""
71        return self._conn.async_group

Async group

state: State
73    @property
74    def state(self) -> State:
75        """Slave's state"""
76        return self._state

Slave's state

async def update(self, local_components: list[hat.monitor.common.ComponentInfo]):
78    async def update(self, local_components: list[common.ComponentInfo]):
79        """Update slaves's local components"""
80        if self._local_components == local_components:
81            return
82
83        self._local_components = local_components
84        await self._send_msg_slave(local_components)

Update slaves's local components

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close