hat.monitor.observer.master

Observer Master

  1"""Observer Master"""
  2
  3from collections.abc import Iterable
  4import contextlib
  5import itertools
  6import logging
  7import typing
  8
  9from hat import aio
 10from hat.drivers import chatter
 11from hat.drivers import tcp
 12
 13from hat.monitor.observer import common
 14
 15
 16mlog: logging.Logger = logging.getLogger(__name__)
 17"""Module logger"""
 18
 19ComponentsCb: typing.TypeAlias = aio.AsyncCallable[
 20    ['Master', list[common.ComponentInfo]],
 21    None]
 22"""Components callback"""
 23
 24BlessingCb: typing.TypeAlias = typing.Callable[
 25    ['Master', Iterable[common.ComponentInfo]],
 26    Iterable[tuple[common.Mid, common.Cid, common.BlessingReq]]]
 27"""Blessing callback"""
 28
 29
 30async def listen(addr: tcp.Address,
 31                 *,
 32                 global_components_cb: ComponentsCb | None = None,
 33                 blessing_cb: BlessingCb | None = None,
 34                 **kwargs
 35                 ) -> 'Master':
 36    """Create listening inactive Observer Master
 37
 38    All slave connections are always bound to server lifetime
 39    (`bind_connections` should not be set).
 40
 41    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
 42
 43    """
 44    master = Master()
 45    master._global_components_cb = global_components_cb
 46    master._blessing_cb = blessing_cb
 47    master._mid_conns = {}
 48    master._mid_cid_infos = {0: {}}
 49    master._global_components = []
 50    master._next_mids = itertools.count(1)
 51    master._active_subgroup = None
 52
 53    master._srv = await chatter.listen(master._on_connection, addr,
 54                                       bind_connections=True,
 55                                       **kwargs)
 56
 57    return master
 58
 59
 60class Master(aio.Resource):
 61    """Observer Master
 62
 63    For creating new instance of this class see `listen` coroutine.
 64
 65    """
 66
 67    @property
 68    def async_group(self) -> aio.Group:
 69        """Async group"""
 70        return self._srv.async_group
 71
 72    @property
 73    def global_components(self) -> list[common.ComponentInfo]:
 74        return self._global_components
 75
 76    @property
 77    def is_active(self) -> bool:
 78        return self._active_subgroup is not None
 79
 80    def set_active(self, active: bool):
 81        if active and not self._active_subgroup:
 82            self._active_subgroup = self.async_group.create_subgroup()
 83
 84        elif not active and self._active_subgroup:
 85            self._active_subgroup.close()
 86            self._active_subgroup = None
 87
 88    async def set_local_components(self, local_components: Iterable[common.ComponentInfo]):  # NOQA
 89        await self._update_components(0, local_components)
 90
 91    async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]):  # NOQA
 92        change = False
 93
 94        for cid, blessing_req in blessing_reqs:
 95            info = self._mid_cid_infos[0].get(cid)
 96            if not info or info.blessing_req == blessing_req:
 97                continue
 98
 99            self._mid_cid_infos[0][cid] = info._replace(
100                blessing_req=blessing_req)
101            change = True
102
103        if change:
104            await self._update_global_components()
105
106    def _on_connection(self, conn):
107        try:
108            self._active_subgroup.spawn(self._slave_loop, conn)
109
110        except Exception:
111            conn.close()
112
113    async def _slave_loop(self, conn):
114        mid = next(self._next_mids)
115
116        mlog.debug('starting slave loop (mid: %s)', mid)
117        try:
118            while True:
119                msg_type, msg_data = await common.receive_msg(conn)
120
121                if msg_type != 'HatObserver.MsgSlave':
122                    raise Exception('unsupported message type')
123
124                mlog.debug('received msg slave (mid: %s)', mid)
125                components = (common.component_info_from_sbs(i)
126                              for i in msg_data['components'])
127                await self._update_components(mid, components)
128
129                if mid not in self._mid_conns:
130                    self._mid_conns[mid] = conn
131
132                    global_components = list(
133                        _flatten_mid_cid_infos(self._mid_cid_infos))
134                    await _send_msg_master(conn, mid, global_components)
135
136        except ConnectionError:
137            pass
138
139        except Exception as e:
140            mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e)
141
142        finally:
143            mlog.debug('stopping slave loop (mid: %s)', mid)
144            conn.close()
145            await aio.uncancellable(self._remove_slave(mid))
146
147    async def _remove_slave(self, mid):
148        self._mid_conns.pop(mid, None)
149
150        if not self._mid_cid_infos.pop(mid, None):
151            return
152
153        await self._update_global_components()
154
155    async def _update_components(self, mid, components):
156        cid_infos = self._mid_cid_infos.get(mid, {})
157
158        self._mid_cid_infos[mid] = {}
159        for info in components:
160            info = info._replace(mid=mid)
161
162            old_info = cid_infos.get(info.cid)
163            if old_info:
164                info = info._replace(blessing_req=old_info.blessing_req)
165
166            self._mid_cid_infos[mid][info.cid] = info
167
168        if self._mid_cid_infos[mid] == cid_infos:
169            return
170
171        await self._update_global_components()
172
173    async def _update_global_components(self):
174        if self._blessing_cb:
175            infos = _flatten_mid_cid_infos(self._mid_cid_infos)
176
177            for mid, cid, blessing_req in self._blessing_cb(self, infos):
178                info = self._mid_cid_infos[mid][cid]
179                info = info._replace(blessing_req=blessing_req)
180                self._mid_cid_infos[mid][cid] = info
181
182        global_components = list(_flatten_mid_cid_infos(self._mid_cid_infos))
183        if global_components == self._global_components:
184            return
185
186        self._global_components = global_components
187
188        if self._global_components_cb:
189            await aio.call(self._global_components_cb, self, global_components)
190
191        for mid, conn in list(self._mid_conns.items()):
192            with contextlib.suppress(ConnectionError):
193                await _send_msg_master(conn, mid, global_components)
194
195
196async def _send_msg_master(conn, mid, global_components):
197    components = [common.component_info_to_sbs(i) for i in global_components]
198    await common.send_msg(conn, 'HatObserver.MsgMaster', {
199        'mid': mid,
200        'components': components})
201
202
203def _flatten_mid_cid_infos(mid_cid_infos):
204    for cid_infos in mid_cid_infos.values():
205        yield from cid_infos.values()
mlog: logging.Logger = <Logger hat.monitor.observer.master (WARNING)>

Module logger

ComponentsCb: TypeAlias = Callable[[ForwardRef('Master'), list[hat.monitor.common.ComponentInfo]], None | Awaitable[None]]

Components callback

BlessingCb: TypeAlias = Callable[[ForwardRef('Master'), Iterable[hat.monitor.common.ComponentInfo]], Iterable[tuple[int, int, hat.monitor.common.BlessingReq]]]

Blessing callback

async def listen( addr: hat.drivers.tcp.Address, *, global_components_cb: Optional[Callable[[Master, list[hat.monitor.common.ComponentInfo]], None | Awaitable[None]]] = None, blessing_cb: Optional[Callable[[Master, Iterable[hat.monitor.common.ComponentInfo]], Iterable[tuple[int, int, hat.monitor.common.BlessingReq]]]] = None, **kwargs) -> Master:
31async def listen(addr: tcp.Address,
32                 *,
33                 global_components_cb: ComponentsCb | None = None,
34                 blessing_cb: BlessingCb | None = None,
35                 **kwargs
36                 ) -> 'Master':
37    """Create listening inactive Observer Master
38
39    All slave connections are always bound to server lifetime
40    (`bind_connections` should not be set).
41
42    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
43
44    """
45    master = Master()
46    master._global_components_cb = global_components_cb
47    master._blessing_cb = blessing_cb
48    master._mid_conns = {}
49    master._mid_cid_infos = {0: {}}
50    master._global_components = []
51    master._next_mids = itertools.count(1)
52    master._active_subgroup = None
53
54    master._srv = await chatter.listen(master._on_connection, addr,
55                                       bind_connections=True,
56                                       **kwargs)
57
58    return master

Create listening inactive Observer Master

All slave connections are always bound to server lifetime (bind_connections should not be set).

Additional arguments are passed directly to hat.drivers.chatter.listen.

class Master(hat.aio.group.Resource):
 61class Master(aio.Resource):
 62    """Observer Master
 63
 64    For creating new instance of this class see `listen` coroutine.
 65
 66    """
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._srv.async_group
 72
 73    @property
 74    def global_components(self) -> list[common.ComponentInfo]:
 75        return self._global_components
 76
 77    @property
 78    def is_active(self) -> bool:
 79        return self._active_subgroup is not None
 80
 81    def set_active(self, active: bool):
 82        if active and not self._active_subgroup:
 83            self._active_subgroup = self.async_group.create_subgroup()
 84
 85        elif not active and self._active_subgroup:
 86            self._active_subgroup.close()
 87            self._active_subgroup = None
 88
 89    async def set_local_components(self, local_components: Iterable[common.ComponentInfo]):  # NOQA
 90        await self._update_components(0, local_components)
 91
 92    async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]):  # NOQA
 93        change = False
 94
 95        for cid, blessing_req in blessing_reqs:
 96            info = self._mid_cid_infos[0].get(cid)
 97            if not info or info.blessing_req == blessing_req:
 98                continue
 99
100            self._mid_cid_infos[0][cid] = info._replace(
101                blessing_req=blessing_req)
102            change = True
103
104        if change:
105            await self._update_global_components()
106
107    def _on_connection(self, conn):
108        try:
109            self._active_subgroup.spawn(self._slave_loop, conn)
110
111        except Exception:
112            conn.close()
113
114    async def _slave_loop(self, conn):
115        mid = next(self._next_mids)
116
117        mlog.debug('starting slave loop (mid: %s)', mid)
118        try:
119            while True:
120                msg_type, msg_data = await common.receive_msg(conn)
121
122                if msg_type != 'HatObserver.MsgSlave':
123                    raise Exception('unsupported message type')
124
125                mlog.debug('received msg slave (mid: %s)', mid)
126                components = (common.component_info_from_sbs(i)
127                              for i in msg_data['components'])
128                await self._update_components(mid, components)
129
130                if mid not in self._mid_conns:
131                    self._mid_conns[mid] = conn
132
133                    global_components = list(
134                        _flatten_mid_cid_infos(self._mid_cid_infos))
135                    await _send_msg_master(conn, mid, global_components)
136
137        except ConnectionError:
138            pass
139
140        except Exception as e:
141            mlog.error('slave loop error (mid: %s): %s', mid, e, exc_info=e)
142
143        finally:
144            mlog.debug('stopping slave loop (mid: %s)', mid)
145            conn.close()
146            await aio.uncancellable(self._remove_slave(mid))
147
148    async def _remove_slave(self, mid):
149        self._mid_conns.pop(mid, None)
150
151        if not self._mid_cid_infos.pop(mid, None):
152            return
153
154        await self._update_global_components()
155
156    async def _update_components(self, mid, components):
157        cid_infos = self._mid_cid_infos.get(mid, {})
158
159        self._mid_cid_infos[mid] = {}
160        for info in components:
161            info = info._replace(mid=mid)
162
163            old_info = cid_infos.get(info.cid)
164            if old_info:
165                info = info._replace(blessing_req=old_info.blessing_req)
166
167            self._mid_cid_infos[mid][info.cid] = info
168
169        if self._mid_cid_infos[mid] == cid_infos:
170            return
171
172        await self._update_global_components()
173
174    async def _update_global_components(self):
175        if self._blessing_cb:
176            infos = _flatten_mid_cid_infos(self._mid_cid_infos)
177
178            for mid, cid, blessing_req in self._blessing_cb(self, infos):
179                info = self._mid_cid_infos[mid][cid]
180                info = info._replace(blessing_req=blessing_req)
181                self._mid_cid_infos[mid][cid] = info
182
183        global_components = list(_flatten_mid_cid_infos(self._mid_cid_infos))
184        if global_components == self._global_components:
185            return
186
187        self._global_components = global_components
188
189        if self._global_components_cb:
190            await aio.call(self._global_components_cb, self, global_components)
191
192        for mid, conn in list(self._mid_conns.items()):
193            with contextlib.suppress(ConnectionError):
194                await _send_msg_master(conn, mid, global_components)

Observer Master

For creating new instance of this class see listen coroutine.

async_group: hat.aio.group.Group
68    @property
69    def async_group(self) -> aio.Group:
70        """Async group"""
71        return self._srv.async_group

Async group

global_components: list[hat.monitor.common.ComponentInfo]
73    @property
74    def global_components(self) -> list[common.ComponentInfo]:
75        return self._global_components
is_active: bool
77    @property
78    def is_active(self) -> bool:
79        return self._active_subgroup is not None
def set_active(self, active: bool):
81    def set_active(self, active: bool):
82        if active and not self._active_subgroup:
83            self._active_subgroup = self.async_group.create_subgroup()
84
85        elif not active and self._active_subgroup:
86            self._active_subgroup.close()
87            self._active_subgroup = None
async def set_local_components(self, local_components: Iterable[hat.monitor.common.ComponentInfo]):
89    async def set_local_components(self, local_components: Iterable[common.ComponentInfo]):  # NOQA
90        await self._update_components(0, local_components)
async def set_local_blessing_reqs( self, blessing_reqs: Iterable[tuple[int, hat.monitor.common.BlessingReq]]):
 92    async def set_local_blessing_reqs(self, blessing_reqs: Iterable[tuple[common.Cid, common.BlessingReq]]):  # NOQA
 93        change = False
 94
 95        for cid, blessing_req in blessing_reqs:
 96            info = self._mid_cid_infos[0].get(cid)
 97            if not info or info.blessing_req == blessing_req:
 98                continue
 99
100            self._mid_cid_infos[0][cid] = info._replace(
101                blessing_req=blessing_req)
102            change = True
103
104        if change:
105            await self._update_global_components()