hat.monitor.component

Monitor Component

  1"""Monitor Component"""
  2
  3import asyncio
  4import logging
  5import typing
  6
  7from hat import aio
  8from hat import json
  9from hat.drivers import tcp
 10
 11from hat.monitor import common
 12from hat.monitor.observer import client
 13
 14
 15mlog: logging.Logger = logging.getLogger(__name__)
 16"""Module logger"""
 17
 18State: typing.TypeAlias = client.State
 19"""Component state"""
 20
 21Runner: typing.TypeAlias = aio.Resource
 22"""Component runner"""
 23
 24RunnerCb: typing.TypeAlias = aio.AsyncCallable[['Component'], Runner]
 25"""Runner callback"""
 26
 27StateCb: typing.TypeAlias = aio.AsyncCallable[['Component', State], None]
 28"""State callback"""
 29
 30CloseReqCb: typing.TypeAlias = aio.AsyncCallable[['Component'], None]
 31"""Close request callback"""
 32
 33
 34async def connect(addr: tcp.Address,
 35                  name: str,
 36                  group: str,
 37                  runner_cb: RunnerCb,
 38                  *,
 39                  data: json.Data = None,
 40                  state_cb: StateCb | None = None,
 41                  close_req_cb: CloseReqCb | None = None,
 42                  **kwargs
 43                  ) -> 'Component':
 44    """Connect to local monitor server and create component
 45
 46    Implementation of component behavior according to BLESS_ALL and BLESS_ONE
 47    algorithms.
 48
 49    Component runs client's loop which manages blessing req/res states based on
 50    provided monitor client. Initially, component's ready is disabled.
 51
 52    Component is considered active when component's ready is ``True`` and
 53    blessing req/res tokens are matching.
 54
 55    When component becomes active, `component_cb` is called. Result of calling
 56    `component_cb` should be runner representing user defined components
 57    activity. Once component stops being active, runner is closed. If
 58    component becomes active again, `component_cb` call is repeated.
 59
 60    If runner is closed, while component remains active, component is closed.
 61
 62    If connection to Monitor Server is closed, component is also closed.
 63    If component is closed while active, runner is closed.
 64
 65    Additional arguments are passed to `hat.monitor.observer.client.connect`.
 66
 67    """
 68    component = Component()
 69    component._runner_cb = runner_cb
 70    component._state_cb = state_cb
 71    component._close_req_cb = close_req_cb
 72    component._blessing_res = common.BlessingRes(token=None,
 73                                                 ready=False)
 74    component._change_event = asyncio.Event()
 75
 76    component._client = await client.connect(
 77        addr, name, group,
 78        data=data,
 79        state_cb=component._on_client_state,
 80        close_req_cb=component._on_client_close_req,
 81        **kwargs)
 82
 83    try:
 84        component.async_group.spawn(component._component_loop)
 85
 86    except Exception:
 87        await aio.uncancellable(component._client.async_close())
 88        raise
 89
 90    return component
 91
 92
 93class Component(aio.Resource):
 94    """Monitor Component
 95
 96    For creating new component see `connect` coroutine.
 97
 98    """
 99
100    @property
101    def async_group(self) -> aio.Group:
102        """Async group"""
103        return self._client.async_group
104
105    @property
106    def state(self) -> State:
107        """Component's state"""
108        return self._client.state
109
110    @property
111    def ready(self) -> bool:
112        """Ready"""
113        return self._blessing_res.ready
114
115    async def set_ready(self, ready: bool):
116        """Set ready"""
117        if self._blessing_res.ready == ready:
118            return
119
120        await self._change_blessing_res(ready=ready)
121
122    async def _on_client_state(self, c, state):
123        self._change_event.set()
124
125        if not self._state_cb:
126            return
127
128        await aio.call(self._state_cb, self, state)
129
130    async def _on_client_close_req(self, c):
131        if not self._close_req_cb:
132            return
133
134        await aio.call(self._close_req_cb, self)
135
136    async def _change_blessing_res(self, **kwargs):
137        self._blessing_res = self._blessing_res._replace(**kwargs)
138        await self._client.set_blessing_res(self._blessing_res)
139
140        self._change_event.set()
141
142    async def _component_loop(self):
143        mlog.debug("starting component loop")
144        try:
145            await self._change_blessing_res()
146
147            while True:
148                mlog.debug("waiting blessing and ready")
149                token = await self._get_blessed_and_ready_token()
150
151                if self._blessing_res.token != token:
152                    await self._change_blessing_res(token=token)
153
154                ready = await self._wait_blessed_and_ready_token()
155                if not ready:
156                    continue
157
158                try:
159                    mlog.debug("creating component runner")
160                    runner = await aio.call(self._runner_cb, self)
161
162                    try:
163                        async with self.async_group.create_subgroup() as subgroup:  # NOQA
164                            blessed_and_ready_task = subgroup.spawn(
165                                self._wait_while_blessed_and_ready)
166                            runner_closing_task = subgroup.spawn(
167                                runner.wait_closing)
168
169                            mlog.debug("wait while blessed and ready")
170                            await asyncio.wait(
171                                [blessed_and_ready_task, runner_closing_task],
172                                return_when=asyncio.FIRST_COMPLETED)
173
174                            if (runner_closing_task.done() and
175                                    not blessed_and_ready_task.done()):
176                                mlog.debug(
177                                    "runner closed while blessed and ready")
178                                break
179
180                    finally:
181                        mlog.debug("closing component runner")
182                        await aio.uncancellable(runner.async_close())
183
184                finally:
185                    await self._change_blessing_res(token=None)
186
187        except ConnectionError:
188            pass
189
190        except Exception as e:
191            mlog.warning("component loop error: %s", e, exc_info=e)
192
193        finally:
194            mlog.debug("stopping component loop")
195            self.close()
196
197    async def _get_blessed_and_ready_token(self):
198        while True:
199            if self._blessing_res.ready:
200                info = self._client.state.info
201                token = info.blessing_req.token if info else None
202
203                if token is not None:
204                    return token
205
206            await self._change_event.wait()
207            self._change_event.clear()
208
209    async def _wait_blessed_and_ready_token(self):
210        while True:
211            if not self._blessing_res.ready:
212                return False
213
214            if self._blessing_res.token is None:
215                return False
216
217            info = self._client.state.info
218            token = info.blessing_res.token if info else None
219
220            if token == self._blessing_res.token:
221                return token == info.blessing_req.token
222
223            await self._change_event.wait()
224            self._change_event.clear()
225
226    async def _wait_while_blessed_and_ready(self):
227        while True:
228            if not self._blessing_res.ready:
229                return
230
231            info = self._client.state.info
232            token = info.blessing_req.token if info else None
233
234            if token is None or token != self._blessing_res.token:
235                return
236
237            await self._change_event.wait()
238            self._change_event.clear()
mlog: logging.Logger = <Logger hat.monitor.component (WARNING)>

Module logger

class State(typing.NamedTuple):
26class State(typing.NamedTuple):
27    """Client state"""
28    info: common.ComponentInfo | None
29    components: list[common.ComponentInfo]

Component state

State( info: hat.monitor.common.ComponentInfo | None, components: list[hat.monitor.common.ComponentInfo])

Create new instance of State(info, components)

Alias for field number 0

Alias for field number 1

Inherited Members
builtins.tuple
index
count
Runner: TypeAlias = hat.aio.group.Resource

Component runner

RunnerCb: TypeAlias = Callable[[ForwardRef('Component')], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]]

Runner callback

StateCb: TypeAlias = Callable[[ForwardRef('Component'), State], Optional[Awaitable[NoneType]]]

State callback

CloseReqCb: TypeAlias = Callable[[ForwardRef('Component')], Optional[Awaitable[NoneType]]]

Close request callback

async def connect( addr: hat.drivers.tcp.Address, name: str, group: str, runner_cb: Callable[[Component], Union[hat.aio.group.Resource, Awaitable[hat.aio.group.Resource]]], *, data: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')] = None, state_cb: Optional[Callable[[Component, State], Optional[Awaitable[NoneType]]]] = None, close_req_cb: Optional[Callable[[Component], Optional[Awaitable[NoneType]]]] = None, **kwargs) -> Component:
35async def connect(addr: tcp.Address,
36                  name: str,
37                  group: str,
38                  runner_cb: RunnerCb,
39                  *,
40                  data: json.Data = None,
41                  state_cb: StateCb | None = None,
42                  close_req_cb: CloseReqCb | None = None,
43                  **kwargs
44                  ) -> 'Component':
45    """Connect to local monitor server and create component
46
47    Implementation of component behavior according to BLESS_ALL and BLESS_ONE
48    algorithms.
49
50    Component runs client's loop which manages blessing req/res states based on
51    provided monitor client. Initially, component's ready is disabled.
52
53    Component is considered active when component's ready is ``True`` and
54    blessing req/res tokens are matching.
55
56    When component becomes active, `component_cb` is called. Result of calling
57    `component_cb` should be runner representing user defined components
58    activity. Once component stops being active, runner is closed. If
59    component becomes active again, `component_cb` call is repeated.
60
61    If runner is closed, while component remains active, component is closed.
62
63    If connection to Monitor Server is closed, component is also closed.
64    If component is closed while active, runner is closed.
65
66    Additional arguments are passed to `hat.monitor.observer.client.connect`.
67
68    """
69    component = Component()
70    component._runner_cb = runner_cb
71    component._state_cb = state_cb
72    component._close_req_cb = close_req_cb
73    component._blessing_res = common.BlessingRes(token=None,
74                                                 ready=False)
75    component._change_event = asyncio.Event()
76
77    component._client = await client.connect(
78        addr, name, group,
79        data=data,
80        state_cb=component._on_client_state,
81        close_req_cb=component._on_client_close_req,
82        **kwargs)
83
84    try:
85        component.async_group.spawn(component._component_loop)
86
87    except Exception:
88        await aio.uncancellable(component._client.async_close())
89        raise
90
91    return component

Connect to local monitor server and create component

Implementation of component behavior according to BLESS_ALL and BLESS_ONE algorithms.

Component runs client's loop which manages blessing req/res states based on provided monitor client. Initially, component's ready is disabled.

Component is considered active when component's ready is True and blessing req/res tokens are matching.

When component becomes active, component_cb is called. Result of calling component_cb should be runner representing user defined components activity. Once component stops being active, runner is closed. If component becomes active again, component_cb call is repeated.

If runner is closed, while component remains active, component is closed.

If connection to Monitor Server is closed, component is also closed. If component is closed while active, runner is closed.

Additional arguments are passed to hat.monitor.observer.client.connect.

class Component(hat.aio.group.Resource):
 94class Component(aio.Resource):
 95    """Monitor Component
 96
 97    For creating new component see `connect` coroutine.
 98
 99    """
100
101    @property
102    def async_group(self) -> aio.Group:
103        """Async group"""
104        return self._client.async_group
105
106    @property
107    def state(self) -> State:
108        """Component's state"""
109        return self._client.state
110
111    @property
112    def ready(self) -> bool:
113        """Ready"""
114        return self._blessing_res.ready
115
116    async def set_ready(self, ready: bool):
117        """Set ready"""
118        if self._blessing_res.ready == ready:
119            return
120
121        await self._change_blessing_res(ready=ready)
122
123    async def _on_client_state(self, c, state):
124        self._change_event.set()
125
126        if not self._state_cb:
127            return
128
129        await aio.call(self._state_cb, self, state)
130
131    async def _on_client_close_req(self, c):
132        if not self._close_req_cb:
133            return
134
135        await aio.call(self._close_req_cb, self)
136
137    async def _change_blessing_res(self, **kwargs):
138        self._blessing_res = self._blessing_res._replace(**kwargs)
139        await self._client.set_blessing_res(self._blessing_res)
140
141        self._change_event.set()
142
143    async def _component_loop(self):
144        mlog.debug("starting component loop")
145        try:
146            await self._change_blessing_res()
147
148            while True:
149                mlog.debug("waiting blessing and ready")
150                token = await self._get_blessed_and_ready_token()
151
152                if self._blessing_res.token != token:
153                    await self._change_blessing_res(token=token)
154
155                ready = await self._wait_blessed_and_ready_token()
156                if not ready:
157                    continue
158
159                try:
160                    mlog.debug("creating component runner")
161                    runner = await aio.call(self._runner_cb, self)
162
163                    try:
164                        async with self.async_group.create_subgroup() as subgroup:  # NOQA
165                            blessed_and_ready_task = subgroup.spawn(
166                                self._wait_while_blessed_and_ready)
167                            runner_closing_task = subgroup.spawn(
168                                runner.wait_closing)
169
170                            mlog.debug("wait while blessed and ready")
171                            await asyncio.wait(
172                                [blessed_and_ready_task, runner_closing_task],
173                                return_when=asyncio.FIRST_COMPLETED)
174
175                            if (runner_closing_task.done() and
176                                    not blessed_and_ready_task.done()):
177                                mlog.debug(
178                                    "runner closed while blessed and ready")
179                                break
180
181                    finally:
182                        mlog.debug("closing component runner")
183                        await aio.uncancellable(runner.async_close())
184
185                finally:
186                    await self._change_blessing_res(token=None)
187
188        except ConnectionError:
189            pass
190
191        except Exception as e:
192            mlog.warning("component loop error: %s", e, exc_info=e)
193
194        finally:
195            mlog.debug("stopping component loop")
196            self.close()
197
198    async def _get_blessed_and_ready_token(self):
199        while True:
200            if self._blessing_res.ready:
201                info = self._client.state.info
202                token = info.blessing_req.token if info else None
203
204                if token is not None:
205                    return token
206
207            await self._change_event.wait()
208            self._change_event.clear()
209
210    async def _wait_blessed_and_ready_token(self):
211        while True:
212            if not self._blessing_res.ready:
213                return False
214
215            if self._blessing_res.token is None:
216                return False
217
218            info = self._client.state.info
219            token = info.blessing_res.token if info else None
220
221            if token == self._blessing_res.token:
222                return token == info.blessing_req.token
223
224            await self._change_event.wait()
225            self._change_event.clear()
226
227    async def _wait_while_blessed_and_ready(self):
228        while True:
229            if not self._blessing_res.ready:
230                return
231
232            info = self._client.state.info
233            token = info.blessing_req.token if info else None
234
235            if token is None or token != self._blessing_res.token:
236                return
237
238            await self._change_event.wait()
239            self._change_event.clear()

Monitor Component

For creating new component see connect coroutine.

async_group: hat.aio.group.Group
101    @property
102    def async_group(self) -> aio.Group:
103        """Async group"""
104        return self._client.async_group

Async group

state: State
106    @property
107    def state(self) -> State:
108        """Component's state"""
109        return self._client.state

Component's state

ready: bool
111    @property
112    def ready(self) -> bool:
113        """Ready"""
114        return self._blessing_res.ready

Ready

async def set_ready(self, ready: bool):
116    async def set_ready(self, ready: bool):
117        """Set ready"""
118        if self._blessing_res.ready == ready:
119            return
120
121        await self._change_blessing_res(ready=ready)

Set ready

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close