hat.monitor.observer.client
Observer Client
1"""Observer Client""" 2 3import logging 4import typing 5 6from hat import aio 7from hat import json 8from hat import util 9from hat.drivers import chatter 10from hat.drivers import tcp 11 12from hat.monitor.observer import common 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18StateCb: typing.TypeAlias = aio.AsyncCallable[['Client', 'State'], None] 19"""State callback""" 20 21CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Client'], None] 22"""Close request callback""" 23 24 25class State(typing.NamedTuple): 26 """Client state""" 27 info: common.ComponentInfo | None 28 components: list[common.ComponentInfo] 29 30 31async def connect(addr: tcp.Address, 32 name: str, 33 group: str, 34 *, 35 data: json.Data = None, 36 state_cb: StateCb | None = None, 37 close_req_cb: CloseReqCb | None = None, 38 **kwargs 39 ) -> 'Client': 40 """Connect to Observer Server 41 42 Additional arguments are passed directly to `hat.drivers.chatter.connect`. 43 44 """ 45 conn = await chatter.connect(addr, **kwargs) 46 47 try: 48 return Client(conn=conn, 49 name=name, 50 group=group, 51 data=data, 52 state_cb=state_cb, 53 close_req_cb=close_req_cb) 54 55 except Exception: 56 await aio.uncancellable(conn.async_close()) 57 raise 58 59 60class Client(aio.Resource): 61 """Observer Client 62 63 For creating new client see `connect` coroutine. 64 65 """ 66 67 def __init__(self, 68 conn: chatter.Connection, 69 name: str, 70 group: str, 71 data: json.Data, 72 state_cb: StateCb | None, 73 close_req_cb: CloseReqCb | None): 74 self._conn = conn 75 self._name = name 76 self._group = group 77 self._data = json.encode(data) 78 self._state_cb = state_cb 79 self._close_req_cb = close_req_cb 80 self._state = State(info=None, 81 components=[]) 82 self._blessing_res = common.BlessingRes(token=None, 83 ready=False) 84 85 self.async_group.spawn(self._receive_loop) 86 87 @property 88 def async_group(self) -> aio.Group: 89 """Async group""" 90 return self._conn.async_group 91 92 @property 93 def state(self) -> State: 94 """Client's state""" 95 return self._state 96 97 async def set_blessing_res(self, res: common.BlessingRes): 98 """Set blessing response""" 99 if res == self._blessing_res: 100 return 101 102 self._blessing_res = res 103 await self._send_msg_client(res) 104 105 async def _receive_loop(self): 106 mlog.debug("starting receive loop") 107 try: 108 await self._send_msg_client(self._blessing_res) 109 110 while True: 111 msg_type, msg_data = await common.receive_msg(self._conn) 112 113 if msg_type == 'HatObserver.MsgServer': 114 mlog.debug("received msg server") 115 components = [common.component_info_from_sbs(i) 116 for i in msg_data['components']] 117 await self._process_msg_server(cid=msg_data['cid'], 118 mid=msg_data['mid'], 119 components=components) 120 121 elif msg_type == 'HatObserver.MsgClose': 122 mlog.debug("received msg close") 123 if self._close_req_cb: 124 await aio.call(self._close_req_cb, self) 125 break 126 127 else: 128 raise Exception('unsupported message type') 129 130 except ConnectionError: 131 mlog.debug("connection closed") 132 133 except Exception as e: 134 mlog.warning("monitor client error: %s", e, exc_info=e) 135 136 finally: 137 mlog.debug("stopping receive loop") 138 self.close() 139 140 async def _send_msg_client(self, blessing_res): 141 await common.send_msg(self._conn, 'HatObserver.MsgClient', { 142 'name': self._name, 143 'group': self._group, 144 'data': self._data, 145 'blessingRes': common.blessing_res_to_sbs(blessing_res)}) 146 147 async def _process_msg_server(self, cid, mid, components): 148 info = util.first(components, lambda i: (i.cid == cid and 149 i.mid == mid)) 150 state = State(info=info, 151 components=components) 152 if self._state == state: 153 return 154 155 self._state = state 156 if self._state_cb: 157 await aio.call(self._state_cb, self, state)
Module logger
StateCb: TypeAlias =
Callable[[ForwardRef('Client'), ForwardRef('State')], Optional[Awaitable[NoneType]]]
State callback
CloseReqCb: TypeAlias =
Callable[[ForwardRef('Client')], Optional[Awaitable[NoneType]]]
Close request callback
class
State(typing.NamedTuple):
26class State(typing.NamedTuple): 27 """Client state""" 28 info: common.ComponentInfo | None 29 components: list[common.ComponentInfo]
Client state
State( info: hat.monitor.common.ComponentInfo | None, components: list[hat.monitor.common.ComponentInfo])
Create new instance of State(info, components)
Inherited Members
- builtins.tuple
- index
- count
async def
connect( addr: hat.drivers.tcp.Address, name: str, group: str, *, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = None, state_cb: Optional[Callable[[Client, State], Optional[Awaitable[NoneType]]]] = None, close_req_cb: Optional[Callable[[Client], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Client:
32async def connect(addr: tcp.Address, 33 name: str, 34 group: str, 35 *, 36 data: json.Data = None, 37 state_cb: StateCb | None = None, 38 close_req_cb: CloseReqCb | None = None, 39 **kwargs 40 ) -> 'Client': 41 """Connect to Observer Server 42 43 Additional arguments are passed directly to `hat.drivers.chatter.connect`. 44 45 """ 46 conn = await chatter.connect(addr, **kwargs) 47 48 try: 49 return Client(conn=conn, 50 name=name, 51 group=group, 52 data=data, 53 state_cb=state_cb, 54 close_req_cb=close_req_cb) 55 56 except Exception: 57 await aio.uncancellable(conn.async_close()) 58 raise
Connect to Observer Server
Additional arguments are passed directly to hat.drivers.chatter.connect
.
class
Client(hat.aio.group.Resource):
61class Client(aio.Resource): 62 """Observer Client 63 64 For creating new client see `connect` coroutine. 65 66 """ 67 68 def __init__(self, 69 conn: chatter.Connection, 70 name: str, 71 group: str, 72 data: json.Data, 73 state_cb: StateCb | None, 74 close_req_cb: CloseReqCb | None): 75 self._conn = conn 76 self._name = name 77 self._group = group 78 self._data = json.encode(data) 79 self._state_cb = state_cb 80 self._close_req_cb = close_req_cb 81 self._state = State(info=None, 82 components=[]) 83 self._blessing_res = common.BlessingRes(token=None, 84 ready=False) 85 86 self.async_group.spawn(self._receive_loop) 87 88 @property 89 def async_group(self) -> aio.Group: 90 """Async group""" 91 return self._conn.async_group 92 93 @property 94 def state(self) -> State: 95 """Client's state""" 96 return self._state 97 98 async def set_blessing_res(self, res: common.BlessingRes): 99 """Set blessing response""" 100 if res == self._blessing_res: 101 return 102 103 self._blessing_res = res 104 await self._send_msg_client(res) 105 106 async def _receive_loop(self): 107 mlog.debug("starting receive loop") 108 try: 109 await self._send_msg_client(self._blessing_res) 110 111 while True: 112 msg_type, msg_data = await common.receive_msg(self._conn) 113 114 if msg_type == 'HatObserver.MsgServer': 115 mlog.debug("received msg server") 116 components = [common.component_info_from_sbs(i) 117 for i in msg_data['components']] 118 await self._process_msg_server(cid=msg_data['cid'], 119 mid=msg_data['mid'], 120 components=components) 121 122 elif msg_type == 'HatObserver.MsgClose': 123 mlog.debug("received msg close") 124 if self._close_req_cb: 125 await aio.call(self._close_req_cb, self) 126 break 127 128 else: 129 raise Exception('unsupported message type') 130 131 except ConnectionError: 132 mlog.debug("connection closed") 133 134 except Exception as e: 135 mlog.warning("monitor client error: %s", e, exc_info=e) 136 137 finally: 138 mlog.debug("stopping receive loop") 139 self.close() 140 141 async def _send_msg_client(self, blessing_res): 142 await common.send_msg(self._conn, 'HatObserver.MsgClient', { 143 'name': self._name, 144 'group': self._group, 145 'data': self._data, 146 'blessingRes': common.blessing_res_to_sbs(blessing_res)}) 147 148 async def _process_msg_server(self, cid, mid, components): 149 info = util.first(components, lambda i: (i.cid == cid and 150 i.mid == mid)) 151 state = State(info=info, 152 components=components) 153 if self._state == state: 154 return 155 156 self._state = state 157 if self._state_cb: 158 await aio.call(self._state_cb, self, state)
Observer Client
For creating new client see connect
coroutine.
Client( conn: hat.drivers.chatter.Connection, name: str, group: str, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')], state_cb: Optional[Callable[[Client, State], Optional[Awaitable[NoneType]]]], close_req_cb: Optional[Callable[[Client], Optional[Awaitable[NoneType]]]])
68 def __init__(self, 69 conn: chatter.Connection, 70 name: str, 71 group: str, 72 data: json.Data, 73 state_cb: StateCb | None, 74 close_req_cb: CloseReqCb | None): 75 self._conn = conn 76 self._name = name 77 self._group = group 78 self._data = json.encode(data) 79 self._state_cb = state_cb 80 self._close_req_cb = close_req_cb 81 self._state = State(info=None, 82 components=[]) 83 self._blessing_res = common.BlessingRes(token=None, 84 ready=False) 85 86 self.async_group.spawn(self._receive_loop)
async_group: hat.aio.group.Group
88 @property 89 def async_group(self) -> aio.Group: 90 """Async group""" 91 return self._conn.async_group
Async group
98 async def set_blessing_res(self, res: common.BlessingRes): 99 """Set blessing response""" 100 if res == self._blessing_res: 101 return 102 103 self._blessing_res = res 104 await self._send_msg_client(res)
Set blessing response
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close