hat.monitor.component
Monitor Component
1"""Monitor Component""" 2 3import asyncio 4import logging 5import typing 6 7from hat import aio 8from hat import json 9from hat.drivers import tcp 10 11from hat.monitor import common 12from hat.monitor.observer import client 13 14 15mlog: logging.Logger = logging.getLogger(__name__) 16"""Module logger""" 17 18State: typing.TypeAlias = client.State 19"""Component state""" 20 21Runner: typing.TypeAlias = aio.Resource 22"""Component runner""" 23 24RunnerCb: typing.TypeAlias = aio.AsyncCallable[['Component'], Runner] 25"""Runner callback""" 26 27StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None] 28"""State callback""" 29 30CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None] 31"""Close request callback""" 32 33 34async def connect(addr: tcp.Address, 35 name: str, 36 group: str, 37 runner_cb: RunnerCb, 38 *, 39 data: json.Data = None, 40 state_cb: StateCb | None = None, 41 close_req_cb: CloseReqCb | None = None, 42 **kwargs 43 ) -> 'Component': 44 """Connect to local monitor server and create component 45 46 Implementation of component behavior according to BLESS_ALL and BLESS_ONE 47 algorithms. 48 49 Component runs client's loop which manages blessing req/res states based on 50 provided monitor client. Initially, component's ready is disabled. 51 52 Component is considered active when component's ready is ``True`` and 53 blessing req/res tokens are matching. 54 55 When component becomes active, `component_cb` is called. Result of calling 56 `component_cb` should be runner representing user defined components 57 activity. Once component stops being active, runner is closed. If 58 component becomes active again, `component_cb` call is repeated. 59 60 If runner is closed, while component remains active, component is closed. 61 62 If connection to Monitor Server is closed, component is also closed. 63 If component is closed while active, runner is closed. 64 65 Additional arguments are passed to `hat.monitor.observer.client.connect`. 66 67 """ 68 component = Component() 69 component._runner_cb = runner_cb 70 component._state_cb = state_cb 71 component._close_req_cb = close_req_cb 72 component._blessing_res = common.BlessingRes(token=None, 73 ready=False) 74 component._change_event = asyncio.Event() 75 76 component._client = await client.connect( 77 addr, name, group, 78 data=data, 79 state_cb=component._on_client_state, 80 close_req_cb=component._on_client_close_req, 81 **kwargs) 82 83 try: 84 component.async_group.spawn(component._component_loop) 85 86 except Exception: 87 await aio.uncancellable(component._client.async_close()) 88 raise 89 90 return component 91 92 93class Component(aio.Resource): 94 """Monitor Component 95 96 For creating new component see `connect` coroutine. 97 98 """ 99 100 @property 101 def async_group(self) -> aio.Group: 102 """Async group""" 103 return self._client.async_group 104 105 @property 106 def state(self) -> State: 107 """Component's state""" 108 return self._client.state 109 110 @property 111 def ready(self) -> bool: 112 """Ready""" 113 return self._blessing_res.ready 114 115 async def set_ready(self, ready: bool): 116 """Set ready""" 117 if self._blessing_res.ready == ready: 118 return 119 120 await self._change_blessing_res(ready=ready) 121 122 async def _on_client_state(self, c, state): 123 self._change_event.set() 124 125 if not self._state_cb: 126 return 127 128 await aio.call(self._state_cb, self, state) 129 130 async def _on_client_close_req(self, c): 131 if not self._close_req_cb: 132 return 133 134 await aio.call(self._close_req_cb, self) 135 136 async def _change_blessing_res(self, **kwargs): 137 self._blessing_res = self._blessing_res._replace(**kwargs) 138 await self._client.set_blessing_res(self._blessing_res) 139 140 self._change_event.set() 141 142 async def _component_loop(self): 143 mlog.debug("starting component loop") 144 try: 145 await self._change_blessing_res() 146 147 while True: 148 mlog.debug("waiting blessing and ready") 149 token = await self._get_blessed_and_ready_token() 150 151 if self._blessing_res.token != token: 152 await self._change_blessing_res(token=token) 153 154 ready = await self._wait_blessed_and_ready_token() 155 if not ready: 156 continue 157 158 try: 159 mlog.debug("creating component runner") 160 runner = await aio.call(self._runner_cb, self) 161 162 try: 163 async with self.async_group.create_subgroup() as subgroup: # NOQA 164 blessed_and_ready_task = subgroup.spawn( 165 self._wait_while_blessed_and_ready) 166 runner_closing_task = subgroup.spawn( 167 runner.wait_closing) 168 169 mlog.debug("wait while blessed and ready") 170 await asyncio.wait( 171 [blessed_and_ready_task, runner_closing_task], 172 return_when=asyncio.FIRST_COMPLETED) 173 174 if (runner_closing_task.done() and 175 not blessed_and_ready_task.done()): 176 mlog.debug( 177 "runner closed while blessed and ready") 178 break 179 180 finally: 181 mlog.debug("closing component runner") 182 await aio.uncancellable(runner.async_close()) 183 184 finally: 185 await self._change_blessing_res(token=None) 186 187 except ConnectionError: 188 pass 189 190 except Exception as e: 191 mlog.warning("component loop error: %s", e, exc_info=e) 192 193 finally: 194 mlog.debug("stopping component loop") 195 self.close() 196 197 async def _get_blessed_and_ready_token(self): 198 while True: 199 if self._blessing_res.ready: 200 info = self._client.state.info 201 token = info.blessing_req.token if info else None 202 203 if token is not None: 204 return token 205 206 await self._change_event.wait() 207 self._change_event.clear() 208 209 async def _wait_blessed_and_ready_token(self): 210 while True: 211 if not self._blessing_res.ready: 212 return False 213 214 if self._blessing_res.token is None: 215 return False 216 217 info = self._client.state.info 218 token = info.blessing_res.token if info else None 219 220 if token == self._blessing_res.token: 221 return token == info.blessing_req.token 222 223 await self._change_event.wait() 224 self._change_event.clear() 225 226 async def _wait_while_blessed_and_ready(self): 227 while True: 228 if not self._blessing_res.ready: 229 return 230 231 info = self._client.state.info 232 token = info.blessing_req.token if info else None 233 234 if token is None or token != self._blessing_res.token: 235 return 236 237 await self._change_event.wait() 238 self._change_event.clear()
Module logger
26class State(typing.NamedTuple): 27 """Client state""" 28 info: common.ComponentInfo | None 29 components: list[common.ComponentInfo]
Component state
Create new instance of State(info, components)
Inherited Members
- builtins.tuple
- index
- count
Component runner
Runner callback
State callback
Close request callback
35async def connect(addr: tcp.Address, 36 name: str, 37 group: str, 38 runner_cb: RunnerCb, 39 *, 40 data: json.Data = None, 41 state_cb: StateCb | None = None, 42 close_req_cb: CloseReqCb | None = None, 43 **kwargs 44 ) -> 'Component': 45 """Connect to local monitor server and create component 46 47 Implementation of component behavior according to BLESS_ALL and BLESS_ONE 48 algorithms. 49 50 Component runs client's loop which manages blessing req/res states based on 51 provided monitor client. Initially, component's ready is disabled. 52 53 Component is considered active when component's ready is ``True`` and 54 blessing req/res tokens are matching. 55 56 When component becomes active, `component_cb` is called. Result of calling 57 `component_cb` should be runner representing user defined components 58 activity. Once component stops being active, runner is closed. If 59 component becomes active again, `component_cb` call is repeated. 60 61 If runner is closed, while component remains active, component is closed. 62 63 If connection to Monitor Server is closed, component is also closed. 64 If component is closed while active, runner is closed. 65 66 Additional arguments are passed to `hat.monitor.observer.client.connect`. 67 68 """ 69 component = Component() 70 component._runner_cb = runner_cb 71 component._state_cb = state_cb 72 component._close_req_cb = close_req_cb 73 component._blessing_res = common.BlessingRes(token=None, 74 ready=False) 75 component._change_event = asyncio.Event() 76 77 component._client = await client.connect( 78 addr, name, group, 79 data=data, 80 state_cb=component._on_client_state, 81 close_req_cb=component._on_client_close_req, 82 **kwargs) 83 84 try: 85 component.async_group.spawn(component._component_loop) 86 87 except Exception: 88 await aio.uncancellable(component._client.async_close()) 89 raise 90 91 return component
Connect to local monitor server and create component
Implementation of component behavior according to BLESS_ALL and BLESS_ONE algorithms.
Component runs client's loop which manages blessing req/res states based on provided monitor client. Initially, component's ready is disabled.
Component is considered active when component's ready is True
and
blessing req/res tokens are matching.
When component becomes active, component_cb
is called. Result of calling
component_cb
should be runner representing user defined components
activity. Once component stops being active, runner is closed. If
component becomes active again, component_cb
call is repeated.
If runner is closed, while component remains active, component is closed.
If connection to Monitor Server is closed, component is also closed. If component is closed while active, runner is closed.
Additional arguments are passed to hat.monitor.observer.client.connect
.
94class Component(aio.Resource): 95 """Monitor Component 96 97 For creating new component see `connect` coroutine. 98 99 """ 100 101 @property 102 def async_group(self) -> aio.Group: 103 """Async group""" 104 return self._client.async_group 105 106 @property 107 def state(self) -> State: 108 """Component's state""" 109 return self._client.state 110 111 @property 112 def ready(self) -> bool: 113 """Ready""" 114 return self._blessing_res.ready 115 116 async def set_ready(self, ready: bool): 117 """Set ready""" 118 if self._blessing_res.ready == ready: 119 return 120 121 await self._change_blessing_res(ready=ready) 122 123 async def _on_client_state(self, c, state): 124 self._change_event.set() 125 126 if not self._state_cb: 127 return 128 129 await aio.call(self._state_cb, self, state) 130 131 async def _on_client_close_req(self, c): 132 if not self._close_req_cb: 133 return 134 135 await aio.call(self._close_req_cb, self) 136 137 async def _change_blessing_res(self, **kwargs): 138 self._blessing_res = self._blessing_res._replace(**kwargs) 139 await self._client.set_blessing_res(self._blessing_res) 140 141 self._change_event.set() 142 143 async def _component_loop(self): 144 mlog.debug("starting component loop") 145 try: 146 await self._change_blessing_res() 147 148 while True: 149 mlog.debug("waiting blessing and ready") 150 token = await self._get_blessed_and_ready_token() 151 152 if self._blessing_res.token != token: 153 await self._change_blessing_res(token=token) 154 155 ready = await self._wait_blessed_and_ready_token() 156 if not ready: 157 continue 158 159 try: 160 mlog.debug("creating component runner") 161 runner = await aio.call(self._runner_cb, self) 162 163 try: 164 async with self.async_group.create_subgroup() as subgroup: # NOQA 165 blessed_and_ready_task = subgroup.spawn( 166 self._wait_while_blessed_and_ready) 167 runner_closing_task = subgroup.spawn( 168 runner.wait_closing) 169 170 mlog.debug("wait while blessed and ready") 171 await asyncio.wait( 172 [blessed_and_ready_task, runner_closing_task], 173 return_when=asyncio.FIRST_COMPLETED) 174 175 if (runner_closing_task.done() and 176 not blessed_and_ready_task.done()): 177 mlog.debug( 178 "runner closed while blessed and ready") 179 break 180 181 finally: 182 mlog.debug("closing component runner") 183 await aio.uncancellable(runner.async_close()) 184 185 finally: 186 await self._change_blessing_res(token=None) 187 188 except ConnectionError: 189 pass 190 191 except Exception as e: 192 mlog.warning("component loop error: %s", e, exc_info=e) 193 194 finally: 195 mlog.debug("stopping component loop") 196 self.close() 197 198 async def _get_blessed_and_ready_token(self): 199 while True: 200 if self._blessing_res.ready: 201 info = self._client.state.info 202 token = info.blessing_req.token if info else None 203 204 if token is not None: 205 return token 206 207 await self._change_event.wait() 208 self._change_event.clear() 209 210 async def _wait_blessed_and_ready_token(self): 211 while True: 212 if not self._blessing_res.ready: 213 return False 214 215 if self._blessing_res.token is None: 216 return False 217 218 info = self._client.state.info 219 token = info.blessing_res.token if info else None 220 221 if token == self._blessing_res.token: 222 return token == info.blessing_req.token 223 224 await self._change_event.wait() 225 self._change_event.clear() 226 227 async def _wait_while_blessed_and_ready(self): 228 while True: 229 if not self._blessing_res.ready: 230 return 231 232 info = self._client.state.info 233 token = info.blessing_req.token if info else None 234 235 if token is None or token != self._blessing_res.token: 236 return 237 238 await self._change_event.wait() 239 self._change_event.clear()
Monitor Component
For creating new component see connect
coroutine.
101 @property 102 def async_group(self) -> aio.Group: 103 """Async group""" 104 return self._client.async_group
Async group
106 @property 107 def state(self) -> State: 108 """Component's state""" 109 return self._client.state
Component's state
116 async def set_ready(self, ready: bool): 117 """Set ready""" 118 if self._blessing_res.ready == ready: 119 return 120 121 await self._change_blessing_res(ready=ready)
Set ready
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close