hat.monitor.observer.server

Observer Server

  1"""Observer Server"""
  2
  3import contextlib
  4import itertools
  5import logging
  6import typing
  7
  8from hat import aio
  9from hat import json
 10from hat import util
 11from hat.drivers import chatter
 12from hat.drivers import tcp
 13
 14from hat.monitor.observer import common
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20StateCb: typing.TypeAlias = aio.AsyncCallable[['Server', 'State'], None]
 21"""State callback"""
 22
 23
 24class State(typing.NamedTuple):
 25    mid: int
 26    local_components: list[common.ComponentInfo]
 27    global_components: list[common.ComponentInfo]
 28
 29
 30async def listen(addr: tcp.Address,
 31                 *,
 32                 default_rank: int = 1,
 33                 close_timeout: float = 3,
 34                 state_cb: StateCb | None = None,
 35                 **kwargs
 36                 ) -> 'Server':
 37    """Create listening Observer Server
 38
 39    All client connections are always bound to server lifetime regardles
 40    of `bind_connections` argument.
 41
 42    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
 43
 44    """
 45    server = Server()
 46    server._default_rank = default_rank
 47    server._close_timeout = close_timeout
 48    server._state_cb = state_cb
 49    server._state = State(mid=0,
 50                          local_components=[],
 51                          global_components=[])
 52    server._next_cids = itertools.count(1)
 53    server._cid_conns = {}
 54    server._rank_cache = {}
 55
 56    server._srv = await chatter.listen(server._client_loop, addr, **kwargs)
 57
 58    return server
 59
 60
 61class Server(aio.Resource):
 62    """Observer Server
 63
 64    For creating new instance of this class see `listen` coroutine.
 65
 66    """
 67
 68    @property
 69    def async_group(self) -> aio.Group:
 70        """Async group"""
 71        return self._srv.async_group
 72
 73    @property
 74    def state(self) -> State:
 75        """Server's state"""
 76        return self._state
 77
 78    async def update(self,
 79                     mid: int,
 80                     global_components: list[common.ComponentInfo]):
 81        """Update server's monitor id and global components"""
 82        if (mid == self._state.mid and
 83                global_components == self._state.global_components):
 84            return
 85
 86        blessing_reqs = {i.cid: i.blessing_req
 87                         for i in global_components
 88                         if i.mid == mid}
 89
 90        local_components = [
 91            i._replace(mid=mid,
 92                       blessing_req=blessing_reqs.get(i.cid, i.blessing_req))
 93            for i in self._state.local_components]
 94
 95        await self._change_state(mid=mid,
 96                                 local_components=local_components,
 97                                 global_components=global_components)
 98
 99    async def set_rank(self,
100                       cid: int,
101                       rank: int):
102        """Set component rank"""
103        info = util.first(self._state.local_components,
104                          lambda i: i.cid == cid)
105        if not info or info.rank == rank:
106            return
107
108        if info.name is not None:
109            self._rank_cache[info.name, info.group] = rank
110
111        updated_info = info._replace(rank=rank)
112        local_components = [(updated_info if i is info else i)
113                            for i in self._state.local_components]
114
115        await self._change_state(local_components=local_components)
116
117    async def _client_loop(self, conn):
118        cid = next(self._next_cids)
119        self._cid_conns[cid] = conn
120
121        mlog.debug('starting client loop (cid: %s)', cid)
122        try:
123            local_components = [*self._state.local_components,
124                                self._get_init_info(cid)]
125            await self._change_state(local_components=local_components)
126
127            while True:
128                msg_type, msg_data = await common.receive_msg(conn)
129
130                if msg_type != 'HatObserver.MsgClient':
131                    raise Exception('unsupported message type')
132
133                mlog.debug('received msg client (cid: %s)', cid)
134                await self._update_client(
135                    cid=cid,
136                    name=msg_data['name'],
137                    group=msg_data['group'],
138                    data=json.decode(msg_data['data']),
139                    blessing_res=common.blessing_res_from_sbs(
140                        msg_data['blessingRes']))
141
142        except ConnectionError:
143            pass
144
145        except Exception as e:
146            mlog.error('client loop error (cid: %s): %s', cid, e, exc_info=e)
147
148        finally:
149            mlog.debug('closing client loop (cid: %s)', cid)
150            await aio.uncancellable(self._remove_client(cid))
151
152    async def _change_state(self, **kwargs):
153        self._state = self._state._replace(**kwargs)
154
155        for cid, conn in list(self._cid_conns.items()):
156            with contextlib.suppress(ConnectionError):
157                await common.send_msg(conn, 'HatObserver.MsgServer', {
158                    'cid': cid,
159                    'mid': self._state.mid,
160                    'components': [common.component_info_to_sbs(info)
161                                   for info in self._state.global_components]})
162
163        if self._state_cb:
164            await aio.call(self._state_cb, self, self._state)
165
166    async def _remove_client(self, cid):
167        conn = self._cid_conns.pop(cid)
168
169        try:
170            local_components = [i for i in self._state.local_components
171                                if i.cid != cid]
172            await self._change_state(local_components=local_components)
173
174        except Exception as e:
175            mlog.error('change state error: %s', e, exc_info=e)
176
177        with contextlib.suppress(Exception):
178            await conn.send(chatter.Data('HatObserver.MsgClose', b''))
179            await aio.wait_for(conn.wait_closed(), self._close_timeout)
180
181        await conn.async_close()
182
183    async def _update_client(self, cid, name, group, data, blessing_res):
184        info = util.first(self._state.local_components,
185                          lambda i: i.cid == cid)
186        updated_info = info._replace(name=name,
187                                     group=group,
188                                     data=data,
189                                     blessing_res=blessing_res)
190
191        if info.name is None:
192            rank_cache_key = name, group
193            rank = self._rank_cache.get(rank_cache_key, info.rank)
194            updated_info = updated_info._replace(rank=rank)
195
196        if info == updated_info:
197            return
198
199        local_components = [(updated_info if i is info else i)
200                            for i in self._state.local_components]
201        await self._change_state(local_components=local_components)
202
203    def _get_init_info(self, cid):
204        return common.ComponentInfo(
205            cid=cid,
206            mid=self._state.mid,
207            name=None,
208            group=None,
209            data=None,
210            rank=self._default_rank,
211            blessing_req=common.BlessingReq(token=None,
212                                            timestamp=None),
213            blessing_res=common.BlessingRes(token=None,
214                                            ready=False))
mlog: logging.Logger = <Logger hat.monitor.observer.server (WARNING)>

Module logger

StateCb: TypeAlias = Callable[[ForwardRef('Server'), ForwardRef('State')], Optional[Awaitable[NoneType]]]

State callback

class State(typing.NamedTuple):
25class State(typing.NamedTuple):
26    mid: int
27    local_components: list[common.ComponentInfo]
28    global_components: list[common.ComponentInfo]

State(mid, local_components, global_components)

State( mid: int, local_components: list[hat.monitor.common.ComponentInfo], global_components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(mid, local_components, global_components)

mid: int

Alias for field number 0

local_components: list[hat.monitor.common.ComponentInfo]

Alias for field number 1

global_components: list[hat.monitor.common.ComponentInfo]

Alias for field number 2

Inherited Members
builtins.tuple
index
count
async def listen( addr: hat.drivers.tcp.Address, *, default_rank: int = 1, close_timeout: float = 3, state_cb: Optional[Callable[[Server, State], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Server:
31async def listen(addr: tcp.Address,
32                 *,
33                 default_rank: int = 1,
34                 close_timeout: float = 3,
35                 state_cb: StateCb | None = None,
36                 **kwargs
37                 ) -> 'Server':
38    """Create listening Observer Server
39
40    All client connections are always bound to server lifetime regardles
41    of `bind_connections` argument.
42
43    Additional arguments are passed directly to `hat.drivers.chatter.listen`.
44
45    """
46    server = Server()
47    server._default_rank = default_rank
48    server._close_timeout = close_timeout
49    server._state_cb = state_cb
50    server._state = State(mid=0,
51                          local_components=[],
52                          global_components=[])
53    server._next_cids = itertools.count(1)
54    server._cid_conns = {}
55    server._rank_cache = {}
56
57    server._srv = await chatter.listen(server._client_loop, addr, **kwargs)
58
59    return server

Create listening Observer Server

All client connections are always bound to server lifetime regardles of bind_connections argument.

Additional arguments are passed directly to hat.drivers.chatter.listen.

class Server(hat.aio.group.Resource):
 62class Server(aio.Resource):
 63    """Observer Server
 64
 65    For creating new instance of this class see `listen` coroutine.
 66
 67    """
 68
 69    @property
 70    def async_group(self) -> aio.Group:
 71        """Async group"""
 72        return self._srv.async_group
 73
 74    @property
 75    def state(self) -> State:
 76        """Server's state"""
 77        return self._state
 78
 79    async def update(self,
 80                     mid: int,
 81                     global_components: list[common.ComponentInfo]):
 82        """Update server's monitor id and global components"""
 83        if (mid == self._state.mid and
 84                global_components == self._state.global_components):
 85            return
 86
 87        blessing_reqs = {i.cid: i.blessing_req
 88                         for i in global_components
 89                         if i.mid == mid}
 90
 91        local_components = [
 92            i._replace(mid=mid,
 93                       blessing_req=blessing_reqs.get(i.cid, i.blessing_req))
 94            for i in self._state.local_components]
 95
 96        await self._change_state(mid=mid,
 97                                 local_components=local_components,
 98                                 global_components=global_components)
 99
100    async def set_rank(self,
101                       cid: int,
102                       rank: int):
103        """Set component rank"""
104        info = util.first(self._state.local_components,
105                          lambda i: i.cid == cid)
106        if not info or info.rank == rank:
107            return
108
109        if info.name is not None:
110            self._rank_cache[info.name, info.group] = rank
111
112        updated_info = info._replace(rank=rank)
113        local_components = [(updated_info if i is info else i)
114                            for i in self._state.local_components]
115
116        await self._change_state(local_components=local_components)
117
118    async def _client_loop(self, conn):
119        cid = next(self._next_cids)
120        self._cid_conns[cid] = conn
121
122        mlog.debug('starting client loop (cid: %s)', cid)
123        try:
124            local_components = [*self._state.local_components,
125                                self._get_init_info(cid)]
126            await self._change_state(local_components=local_components)
127
128            while True:
129                msg_type, msg_data = await common.receive_msg(conn)
130
131                if msg_type != 'HatObserver.MsgClient':
132                    raise Exception('unsupported message type')
133
134                mlog.debug('received msg client (cid: %s)', cid)
135                await self._update_client(
136                    cid=cid,
137                    name=msg_data['name'],
138                    group=msg_data['group'],
139                    data=json.decode(msg_data['data']),
140                    blessing_res=common.blessing_res_from_sbs(
141                        msg_data['blessingRes']))
142
143        except ConnectionError:
144            pass
145
146        except Exception as e:
147            mlog.error('client loop error (cid: %s): %s', cid, e, exc_info=e)
148
149        finally:
150            mlog.debug('closing client loop (cid: %s)', cid)
151            await aio.uncancellable(self._remove_client(cid))
152
153    async def _change_state(self, **kwargs):
154        self._state = self._state._replace(**kwargs)
155
156        for cid, conn in list(self._cid_conns.items()):
157            with contextlib.suppress(ConnectionError):
158                await common.send_msg(conn, 'HatObserver.MsgServer', {
159                    'cid': cid,
160                    'mid': self._state.mid,
161                    'components': [common.component_info_to_sbs(info)
162                                   for info in self._state.global_components]})
163
164        if self._state_cb:
165            await aio.call(self._state_cb, self, self._state)
166
167    async def _remove_client(self, cid):
168        conn = self._cid_conns.pop(cid)
169
170        try:
171            local_components = [i for i in self._state.local_components
172                                if i.cid != cid]
173            await self._change_state(local_components=local_components)
174
175        except Exception as e:
176            mlog.error('change state error: %s', e, exc_info=e)
177
178        with contextlib.suppress(Exception):
179            await conn.send(chatter.Data('HatObserver.MsgClose', b''))
180            await aio.wait_for(conn.wait_closed(), self._close_timeout)
181
182        await conn.async_close()
183
184    async def _update_client(self, cid, name, group, data, blessing_res):
185        info = util.first(self._state.local_components,
186                          lambda i: i.cid == cid)
187        updated_info = info._replace(name=name,
188                                     group=group,
189                                     data=data,
190                                     blessing_res=blessing_res)
191
192        if info.name is None:
193            rank_cache_key = name, group
194            rank = self._rank_cache.get(rank_cache_key, info.rank)
195            updated_info = updated_info._replace(rank=rank)
196
197        if info == updated_info:
198            return
199
200        local_components = [(updated_info if i is info else i)
201                            for i in self._state.local_components]
202        await self._change_state(local_components=local_components)
203
204    def _get_init_info(self, cid):
205        return common.ComponentInfo(
206            cid=cid,
207            mid=self._state.mid,
208            name=None,
209            group=None,
210            data=None,
211            rank=self._default_rank,
212            blessing_req=common.BlessingReq(token=None,
213                                            timestamp=None),
214            blessing_res=common.BlessingRes(token=None,
215                                            ready=False))

Observer Server

For creating new instance of this class see listen coroutine.

async_group: hat.aio.group.Group
69    @property
70    def async_group(self) -> aio.Group:
71        """Async group"""
72        return self._srv.async_group

Async group

state: State
74    @property
75    def state(self) -> State:
76        """Server's state"""
77        return self._state

Server's state

async def update( self, mid: int, global_components: list[hat.monitor.common.ComponentInfo]):
79    async def update(self,
80                     mid: int,
81                     global_components: list[common.ComponentInfo]):
82        """Update server's monitor id and global components"""
83        if (mid == self._state.mid and
84                global_components == self._state.global_components):
85            return
86
87        blessing_reqs = {i.cid: i.blessing_req
88                         for i in global_components
89                         if i.mid == mid}
90
91        local_components = [
92            i._replace(mid=mid,
93                       blessing_req=blessing_reqs.get(i.cid, i.blessing_req))
94            for i in self._state.local_components]
95
96        await self._change_state(mid=mid,
97                                 local_components=local_components,
98                                 global_components=global_components)

Update server's monitor id and global components

async def set_rank(self, cid: int, rank: int):
100    async def set_rank(self,
101                       cid: int,
102                       rank: int):
103        """Set component rank"""
104        info = util.first(self._state.local_components,
105                          lambda i: i.cid == cid)
106        if not info or info.rank == rank:
107            return
108
109        if info.name is not None:
110            self._rank_cache[info.name, info.group] = rank
111
112        updated_info = info._replace(rank=rank)
113        local_components = [(updated_info if i is info else i)
114                            for i in self._state.local_components]
115
116        await self._change_state(local_components=local_components)

Set component rank

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close