hat.monitor.observer.client

Observer Client

  1"""Observer Client"""
  2
  3import logging
  4import typing
  5
  6from hat import aio
  7from hat import json
  8from hat import util
  9from hat.drivers import chatter
 10from hat.drivers import tcp
 11
 12from hat.monitor.observer import common
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18StateCb: typing.TypeAlias = aio.AsyncCallable[['Client', 'State'], None]
 19"""State callback"""
 20
 21CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Client'], None]
 22"""Close request callback"""
 23
 24
 25class State(typing.NamedTuple):
 26    """Client state"""
 27    info: common.ComponentInfo | None
 28    components: list[common.ComponentInfo]
 29
 30
 31async def connect(addr: tcp.Address,
 32                  name: str,
 33                  group: str,
 34                  *,
 35                  data: json.Data = None,
 36                  state_cb: StateCb | None = None,
 37                  close_req_cb: CloseReqCb | None = None,
 38                  **kwargs
 39                  ) -> 'Client':
 40    """Connect to Observer Server
 41
 42    Additional arguments are passed directly to `hat.drivers.chatter.connect`.
 43
 44    """
 45    conn = await chatter.connect(addr, **kwargs)
 46
 47    try:
 48        return Client(conn=conn,
 49                      name=name,
 50                      group=group,
 51                      data=data,
 52                      state_cb=state_cb,
 53                      close_req_cb=close_req_cb)
 54
 55    except Exception:
 56        await aio.uncancellable(conn.async_close())
 57        raise
 58
 59
 60class Client(aio.Resource):
 61    """Observer Client
 62
 63    For creating new client see `connect` coroutine.
 64
 65    """
 66
 67    def __init__(self,
 68                 conn: chatter.Connection,
 69                 name: str,
 70                 group: str,
 71                 data: json.Data,
 72                 state_cb: StateCb | None,
 73                 close_req_cb: CloseReqCb | None):
 74        self._conn = conn
 75        self._name = name
 76        self._group = group
 77        self._data = json.encode(data)
 78        self._state_cb = state_cb
 79        self._close_req_cb = close_req_cb
 80        self._state = State(info=None,
 81                            components=[])
 82        self._blessing_res = common.BlessingRes(token=None,
 83                                                ready=False)
 84
 85        self.async_group.spawn(self._receive_loop)
 86
 87    @property
 88    def async_group(self) -> aio.Group:
 89        """Async group"""
 90        return self._conn.async_group
 91
 92    @property
 93    def state(self) -> State:
 94        """Client's state"""
 95        return self._state
 96
 97    async def set_blessing_res(self, res: common.BlessingRes):
 98        """Set blessing response"""
 99        if res == self._blessing_res:
100            return
101
102        self._blessing_res = res
103        await self._send_msg_client(res)
104
105    async def _receive_loop(self):
106        mlog.debug("starting receive loop")
107        try:
108            await self._send_msg_client(self._blessing_res)
109
110            while True:
111                msg_type, msg_data = await common.receive_msg(self._conn)
112
113                if msg_type == 'HatObserver.MsgServer':
114                    mlog.debug("received msg server")
115                    components = [common.component_info_from_sbs(i)
116                                  for i in msg_data['components']]
117                    await self._process_msg_server(cid=msg_data['cid'],
118                                                   mid=msg_data['mid'],
119                                                   components=components)
120
121                elif msg_type == 'HatObserver.MsgClose':
122                    mlog.debug("received msg close")
123                    if self._close_req_cb:
124                        await aio.call(self._close_req_cb, self)
125                    break
126
127                else:
128                    raise Exception('unsupported message type')
129
130        except ConnectionError:
131            mlog.debug("connection closed")
132
133        except Exception as e:
134            mlog.warning("monitor client error: %s", e, exc_info=e)
135
136        finally:
137            mlog.debug("stopping receive loop")
138            self.close()
139
140    async def _send_msg_client(self, blessing_res):
141        await common.send_msg(self._conn, 'HatObserver.MsgClient', {
142            'name': self._name,
143            'group': self._group,
144            'data': self._data,
145            'blessingRes': common.blessing_res_to_sbs(blessing_res)})
146
147    async def _process_msg_server(self, cid, mid, components):
148        info = util.first(components, lambda i: (i.cid == cid and
149                                                 i.mid == mid))
150        state = State(info=info,
151                      components=components)
152        if self._state == state:
153            return
154
155        self._state = state
156        if self._state_cb:
157            await aio.call(self._state_cb, self, state)
mlog: logging.Logger = <Logger hat.monitor.observer.client (WARNING)>

Module logger

StateCb: TypeAlias = Callable[[ForwardRef('Client'), ForwardRef('State')], Optional[Awaitable[NoneType]]]

State callback

CloseReqCb: TypeAlias = Callable[[ForwardRef('Client')], Optional[Awaitable[NoneType]]]

Close request callback

class State(typing.NamedTuple):
26class State(typing.NamedTuple):
27    """Client state"""
28    info: common.ComponentInfo | None
29    components: list[common.ComponentInfo]

Client state

State( info: hat.monitor.common.ComponentInfo | None, components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(info, components)

Alias for field number 0

Alias for field number 1

Inherited Members
builtins.tuple
index
count
async def connect( addr: hat.drivers.tcp.Address, name: str, group: str, *, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = None, state_cb: Optional[Callable[[Client, State], Optional[Awaitable[NoneType]]]] = None, close_req_cb: Optional[Callable[[Client], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Client:
32async def connect(addr: tcp.Address,
33                  name: str,
34                  group: str,
35                  *,
36                  data: json.Data = None,
37                  state_cb: StateCb | None = None,
38                  close_req_cb: CloseReqCb | None = None,
39                  **kwargs
40                  ) -> 'Client':
41    """Connect to Observer Server
42
43    Additional arguments are passed directly to `hat.drivers.chatter.connect`.
44
45    """
46    conn = await chatter.connect(addr, **kwargs)
47
48    try:
49        return Client(conn=conn,
50                      name=name,
51                      group=group,
52                      data=data,
53                      state_cb=state_cb,
54                      close_req_cb=close_req_cb)
55
56    except Exception:
57        await aio.uncancellable(conn.async_close())
58        raise

Connect to Observer Server

Additional arguments are passed directly to hat.drivers.chatter.connect.

class Client(hat.aio.group.Resource):
 61class Client(aio.Resource):
 62    """Observer Client
 63
 64    For creating new client see `connect` coroutine.
 65
 66    """
 67
 68    def __init__(self,
 69                 conn: chatter.Connection,
 70                 name: str,
 71                 group: str,
 72                 data: json.Data,
 73                 state_cb: StateCb | None,
 74                 close_req_cb: CloseReqCb | None):
 75        self._conn = conn
 76        self._name = name
 77        self._group = group
 78        self._data = json.encode(data)
 79        self._state_cb = state_cb
 80        self._close_req_cb = close_req_cb
 81        self._state = State(info=None,
 82                            components=[])
 83        self._blessing_res = common.BlessingRes(token=None,
 84                                                ready=False)
 85
 86        self.async_group.spawn(self._receive_loop)
 87
 88    @property
 89    def async_group(self) -> aio.Group:
 90        """Async group"""
 91        return self._conn.async_group
 92
 93    @property
 94    def state(self) -> State:
 95        """Client's state"""
 96        return self._state
 97
 98    async def set_blessing_res(self, res: common.BlessingRes):
 99        """Set blessing response"""
100        if res == self._blessing_res:
101            return
102
103        self._blessing_res = res
104        await self._send_msg_client(res)
105
106    async def _receive_loop(self):
107        mlog.debug("starting receive loop")
108        try:
109            await self._send_msg_client(self._blessing_res)
110
111            while True:
112                msg_type, msg_data = await common.receive_msg(self._conn)
113
114                if msg_type == 'HatObserver.MsgServer':
115                    mlog.debug("received msg server")
116                    components = [common.component_info_from_sbs(i)
117                                  for i in msg_data['components']]
118                    await self._process_msg_server(cid=msg_data['cid'],
119                                                   mid=msg_data['mid'],
120                                                   components=components)
121
122                elif msg_type == 'HatObserver.MsgClose':
123                    mlog.debug("received msg close")
124                    if self._close_req_cb:
125                        await aio.call(self._close_req_cb, self)
126                    break
127
128                else:
129                    raise Exception('unsupported message type')
130
131        except ConnectionError:
132            mlog.debug("connection closed")
133
134        except Exception as e:
135            mlog.warning("monitor client error: %s", e, exc_info=e)
136
137        finally:
138            mlog.debug("stopping receive loop")
139            self.close()
140
141    async def _send_msg_client(self, blessing_res):
142        await common.send_msg(self._conn, 'HatObserver.MsgClient', {
143            'name': self._name,
144            'group': self._group,
145            'data': self._data,
146            'blessingRes': common.blessing_res_to_sbs(blessing_res)})
147
148    async def _process_msg_server(self, cid, mid, components):
149        info = util.first(components, lambda i: (i.cid == cid and
150                                                 i.mid == mid))
151        state = State(info=info,
152                      components=components)
153        if self._state == state:
154            return
155
156        self._state = state
157        if self._state_cb:
158            await aio.call(self._state_cb, self, state)

Observer Client

For creating new client see connect coroutine.

Client( conn: hat.drivers.chatter.Connection, name: str, group: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], state_cb: Optional[Callable[[Client, State], Optional[Awaitable[NoneType]]]], close_req_cb: Optional[Callable[[Client], Optional[Awaitable[NoneType]]]])
68    def __init__(self,
69                 conn: chatter.Connection,
70                 name: str,
71                 group: str,
72                 data: json.Data,
73                 state_cb: StateCb | None,
74                 close_req_cb: CloseReqCb | None):
75        self._conn = conn
76        self._name = name
77        self._group = group
78        self._data = json.encode(data)
79        self._state_cb = state_cb
80        self._close_req_cb = close_req_cb
81        self._state = State(info=None,
82                            components=[])
83        self._blessing_res = common.BlessingRes(token=None,
84                                                ready=False)
85
86        self.async_group.spawn(self._receive_loop)
async_group: hat.aio.group.Group
88    @property
89    def async_group(self) -> aio.Group:
90        """Async group"""
91        return self._conn.async_group

Async group

state: State
93    @property
94    def state(self) -> State:
95        """Client's state"""
96        return self._state

Client's state

async def set_blessing_res(self, res: hat.monitor.common.BlessingRes):
 98    async def set_blessing_res(self, res: common.BlessingRes):
 99        """Set blessing response"""
100        if res == self._blessing_res:
101            return
102
103        self._blessing_res = res
104        await self._send_msg_client(res)

Set blessing response

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close