hat.monitor.observer.master

Observer Master

  1"""Observer Master"""
  2
  3import contextlib
  4import itertools
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat.drivers import chatter
 10from hat.drivers import tcp
 11
 12from hat.monitor.observer import common
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18ComponentsCb: typing.TypeAlias = aio.AsyncCallable[['Master',
 19                                                    list[common.ComponentInfo]],  # NOQA
 20                                                   None]
 21"""Components callback"""
 22
 23BlessingCb: typing.TypeAlias = typing.Callable[['Master',
 24                                                list[common.ComponentInfo]],
 25                                               list[common.ComponentInfo]]
 26"""Blessing callback"""
 27
 28
 29async def listen(addr: tcp.Address,
 30                 *,
 31                 local_components: list[common.ComponentInfo] = [],
 32                 global_components_cb: ComponentsCb | None = None,
 33                 blessing_cb: BlessingCb | None = None,
 34                 **kwargs
 35                 ) -> 'Master':
 36    """Create listening inactive Observer Master
 37
 38    All slave connections are always bound to server lifetime
 39    (`bind_connections` should not be set).
 40
 41    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
 42
 43    """
 44    master = Master()
 45    master._global_components_cb = global_components_cb
 46    master._blessing_cb = blessing_cb
 47    master._mid_conns = {}
 48    master._mid_components = {0: [i._replace(mid=0) for i in local_components]}
 49    master._global_components = list(
 50        _get_global_components(master._mid_components))
 51    master._next_mids = itertools.count(1)
 52    master._active_subgroup = None
 53
 54    master._srv = await chatter.listen(master._on_connection, addr,
 55                                       bind_connections=True,
 56                                       **kwargs)
 57
 58    return master
 59
 60
 61class Master(aio.Resource):
 62    """Observer Master
 63
 64    For creating new instance of this class see `listen` coroutine.
 65
 66    """
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._srv.async_group
 72
 73    @property
 74    def global_components(self) -> list[common.ComponentInfo]:
 75        return self._global_components
 76
 77    @property
 78    def is_active(self) -> bool:
 79        return self._active_subgroup is not None
 80
 81    def set_active(self, active: bool):
 82        if active and not self._active_subgroup:
 83            self._active_subgroup = self.async_group.create_subgroup()
 84
 85        elif not active and self._active_subgroup:
 86            self._active_subgroup.close()
 87            self._active_subgroup = None
 88
 89    async def set_local_components(self, local_components):
 90        await self._update_components(0, local_components)
 91
 92    def _on_connection(self, conn):
 93        try:
 94            self._active_subgroup.spawn(self._slave_loop, conn)
 95
 96        except Exception:
 97            conn.close()
 98
 99    async def _slave_loop(self, conn):
100        mid = next(self._next_mids)
101
102        mlog.debug('starting slave loop (mid: %s)', mid)
103        try:
104            msg_type, msg_data = await common.receive_msg(conn)
105
106            if msg_type != 'HatObserver.MsgSlave':
107                raise Exception('unsupported message type')
108
109            self._mid_conns[mid] = conn
110
111            while True:
112                mlog.debug('received msg slave (mid: %s)', mid)
113                components = [common.component_info_from_sbs(i)
114                              for i in msg_data['components']]
115                await self._update_components(mid, components)
116
117                msg_type, msg_data = await common.receive_msg(conn)
118
119                if msg_type != 'HatObserver.MsgSlave':
120                    raise Exception('unsupported message type')
121
122        except ConnectionError:
123            pass
124
125        except Exception as e:
126            mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e)
127
128        finally:
129            mlog.debug('stopping slave loop (mid: %s)', mid)
130            conn.close()
131            await aio.uncancellable(self._remove_slave(mid))
132
133    async def _remove_slave(self, mid):
134        conn = self._mid_conns.pop(mid, None)
135        if not conn:
136            return
137
138        await self._update_components(mid, None)
139
140    async def _update_components(self, mid, components):
141        if components is None:
142            if mid not in self._mid_components:
143                return
144
145            self._mid_components.pop(mid)
146
147        else:
148            blessing_reqs = {i.cid: i.blessing_req
149                             for i in self._mid_components.get(mid, [])}
150            components = [
151                i._replace(mid=mid,
152                           blessing_req=blessing_reqs.get(i.cid,
153                                                          i.blessing_req))
154                for i in components]
155
156            if self._mid_components.get(mid) == components:
157                return
158
159            self._mid_components[mid] = components
160
161        global_components = list(_get_global_components(self._mid_components))
162        self._global_components = (self._blessing_cb(self, global_components)
163                                   if self._blessing_cb else global_components)
164
165        if components is not None:
166            self._mid_components[mid] = [i for i in self._global_components
167                                         if i.mid == mid]
168
169        if self._global_components_cb:
170            await aio.call(self._global_components_cb, self,
171                           self._global_components)
172
173        for mid, conn in list(self._mid_conns.items()):
174            with contextlib.suppress(ConnectionError):
175                await _send_msg_master(conn, mid, self._global_components)
176
177
178async def _send_msg_master(conn, mid, global_components):
179    components = [common.component_info_to_sbs(i) for i in global_components]
180    await common.send_msg(conn, 'HatObserver.MsgMaster', {
181        'mid': mid,
182        'components': components})
183
184
185def _get_global_components(mid_components):
186    for mid in sorted(mid_components.keys()):
187        yield from mid_components[mid]
mlog: logging.Logger = <Logger hat.monitor.observer.master (WARNING)>

Module logger

ComponentsCb: TypeAlias = Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], Optional[Awaitable[NoneType]]]

Components callback

BlessingCb: TypeAlias = Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], list[hat.monitor.common.ComponentInfo]]

Blessing callback

async def listen( addr: hat.drivers.tcp.Address, *, local_components: list[hat.monitor.common.ComponentInfo] = [], global_components_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], Optional[Awaitable[NoneType]]]] = None, blessing_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], list[hat.monitor.common.ComponentInfo]]] = None, **kwargs) -> Master:
30async def listen(addr: tcp.Address,
31                 *,
32                 local_components: list[common.ComponentInfo] = [],
33                 global_components_cb: ComponentsCb | None = None,
34                 blessing_cb: BlessingCb | None = None,
35                 **kwargs
36                 ) -> 'Master':
37    """Create listening inactive Observer Master
38
39    All slave connections are always bound to server lifetime
40    (`bind_connections` should not be set).
41
42    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
43
44    """
45    master = Master()
46    master._global_components_cb = global_components_cb
47    master._blessing_cb = blessing_cb
48    master._mid_conns = {}
49    master._mid_components = {0: [i._replace(mid=0) for i in local_components]}
50    master._global_components = list(
51        _get_global_components(master._mid_components))
52    master._next_mids = itertools.count(1)
53    master._active_subgroup = None
54
55    master._srv = await chatter.listen(master._on_connection, addr,
56                                       bind_connections=True,
57                                       **kwargs)
58
59    return master

Create listening inactive Observer Master

All slave connections are always bound to server lifetime (bind_connections should not be set).

Additional arguments are passed directly to hat.drivers.chatter.listen.

class Master(hat.aio.group.Resource):
 62class Master(aio.Resource):
 63    """Observer Master
 64
 65    For creating new instance of this class see `listen` coroutine.
 66
 67    """
 68
 69    @property
 70    def async_group(self) -> aio.Group:
 71        """Async group"""
 72        return self._srv.async_group
 73
 74    @property
 75    def global_components(self) -> list[common.ComponentInfo]:
 76        return self._global_components
 77
 78    @property
 79    def is_active(self) -> bool:
 80        return self._active_subgroup is not None
 81
 82    def set_active(self, active: bool):
 83        if active and not self._active_subgroup:
 84            self._active_subgroup = self.async_group.create_subgroup()
 85
 86        elif not active and self._active_subgroup:
 87            self._active_subgroup.close()
 88            self._active_subgroup = None
 89
 90    async def set_local_components(self, local_components):
 91        await self._update_components(0, local_components)
 92
 93    def _on_connection(self, conn):
 94        try:
 95            self._active_subgroup.spawn(self._slave_loop, conn)
 96
 97        except Exception:
 98            conn.close()
 99
100    async def _slave_loop(self, conn):
101        mid = next(self._next_mids)
102
103        mlog.debug('starting slave loop (mid: %s)', mid)
104        try:
105            msg_type, msg_data = await common.receive_msg(conn)
106
107            if msg_type != 'HatObserver.MsgSlave':
108                raise Exception('unsupported message type')
109
110            self._mid_conns[mid] = conn
111
112            while True:
113                mlog.debug('received msg slave (mid: %s)', mid)
114                components = [common.component_info_from_sbs(i)
115                              for i in msg_data['components']]
116                await self._update_components(mid, components)
117
118                msg_type, msg_data = await common.receive_msg(conn)
119
120                if msg_type != 'HatObserver.MsgSlave':
121                    raise Exception('unsupported message type')
122
123        except ConnectionError:
124            pass
125
126        except Exception as e:
127            mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e)
128
129        finally:
130            mlog.debug('stopping slave loop (mid: %s)', mid)
131            conn.close()
132            await aio.uncancellable(self._remove_slave(mid))
133
134    async def _remove_slave(self, mid):
135        conn = self._mid_conns.pop(mid, None)
136        if not conn:
137            return
138
139        await self._update_components(mid, None)
140
141    async def _update_components(self, mid, components):
142        if components is None:
143            if mid not in self._mid_components:
144                return
145
146            self._mid_components.pop(mid)
147
148        else:
149            blessing_reqs = {i.cid: i.blessing_req
150                             for i in self._mid_components.get(mid, [])}
151            components = [
152                i._replace(mid=mid,
153                           blessing_req=blessing_reqs.get(i.cid,
154                                                          i.blessing_req))
155                for i in components]
156
157            if self._mid_components.get(mid) == components:
158                return
159
160            self._mid_components[mid] = components
161
162        global_components = list(_get_global_components(self._mid_components))
163        self._global_components = (self._blessing_cb(self, global_components)
164                                   if self._blessing_cb else global_components)
165
166        if components is not None:
167            self._mid_components[mid] = [i for i in self._global_components
168                                         if i.mid == mid]
169
170        if self._global_components_cb:
171            await aio.call(self._global_components_cb, self,
172                           self._global_components)
173
174        for mid, conn in list(self._mid_conns.items()):
175            with contextlib.suppress(ConnectionError):
176                await _send_msg_master(conn, mid, self._global_components)

Observer Master

For creating new instance of this class see listen coroutine.

async_group: hat.aio.group.Group
69    @property
70    def async_group(self) -> aio.Group:
71        """Async group"""
72        return self._srv.async_group

Async group

global_components: list[hat.monitor.common.ComponentInfo]
74    @property
75    def global_components(self) -> list[common.ComponentInfo]:
76        return self._global_components
is_active: bool
78    @property
79    def is_active(self) -> bool:
80        return self._active_subgroup is not None
def set_active(self, active: bool):
82    def set_active(self, active: bool):
83        if active and not self._active_subgroup:
84            self._active_subgroup = self.async_group.create_subgroup()
85
86        elif not active and self._active_subgroup:
87            self._active_subgroup.close()
88            self._active_subgroup = None
async def set_local_components(self, local_components):
90    async def set_local_components(self, local_components):
91        await self._update_components(0, local_components)
Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close