hat.monitor.client

Library used by components for communication with Monitor Server

This module provides low-level interface (connect/Client) and high-level interface (Component) for communication with Monitor Server.

connect is used for establishing single chatter based connection with Monitor Server which is represented by Client. Termination of connection is signaled with Client.wait_closed.

Example of low-level interface usage::

def on_state_change():
    print(f"current global state: {client.components}")

conf = {'name': 'client1',
        'group': 'group1',
        'monitor_address': 'tcp+sbs://127.0.0.1:23010'}
client = await hat.monitor.client.connect(conf)
try:
    with client.register_change_cb(on_state_change):
        await client.wait_closed()
finally:
    await client.async_close()

Component provide high-level interface for communication with Monitor Server. Component, listens to client changes and, in regard to blessing and ready, calls or cancels run_cb callback. In case component is ready and blessing token matches, run_cb is called. While run_cb is running, once ready or blessing token changes, run_cb is canceled. If run_cb finishes or raises exception or connection to monitor server is closed, component is closed.

Example of high-level interface usage::

async def run_component(component):
    print("running component")
    try:
        await asyncio.Future()
    finally:
        print("stopping component")

conf = {'name': 'client',
        'group': 'test clients',
        'monitor_address': 'tcp+sbs://127.0.0.1:23010'}
client = await hat.monitor.client.connect(conf)
component = Component(client, run_component)
component.set_ready(True)
try:
    await component.wait_closed()
finally:
    await component.async_close()
  1"""Library used by components for communication with Monitor Server
  2
  3This module provides low-level interface (`connect`/`Client`) and high-level
  4interface (`Component`) for communication with Monitor Server.
  5
  6`connect` is used for establishing single chatter based connection
  7with Monitor Server which is represented by `Client`. Termination of
  8connection is signaled with `Client.wait_closed`.
  9
 10Example of low-level interface usage::
 11
 12    def on_state_change():
 13        print(f"current global state: {client.components}")
 14
 15    conf = {'name': 'client1',
 16            'group': 'group1',
 17            'monitor_address': 'tcp+sbs://127.0.0.1:23010'}
 18    client = await hat.monitor.client.connect(conf)
 19    try:
 20        with client.register_change_cb(on_state_change):
 21            await client.wait_closed()
 22    finally:
 23        await client.async_close()
 24
 25`Component` provide high-level interface for communication with
 26Monitor Server. Component, listens to client changes and, in regard to blessing
 27and ready, calls or cancels `run_cb` callback. In case component is
 28ready and blessing token matches, `run_cb` is called. While `run_cb` is
 29running, once ready or blessing token changes, `run_cb` is canceled. If
 30`run_cb` finishes or raises exception or connection to monitor server is
 31closed, component is closed.
 32
 33Example of high-level interface usage::
 34
 35    async def run_component(component):
 36        print("running component")
 37        try:
 38            await asyncio.Future()
 39        finally:
 40            print("stopping component")
 41
 42    conf = {'name': 'client',
 43            'group': 'test clients',
 44            'monitor_address': 'tcp+sbs://127.0.0.1:23010'}
 45    client = await hat.monitor.client.connect(conf)
 46    component = Component(client, run_component)
 47    component.set_ready(True)
 48    try:
 49        await component.wait_closed()
 50    finally:
 51        await component.async_close()
 52
 53"""
 54
 55import asyncio
 56import logging
 57import typing
 58
 59from hat import aio
 60from hat import chatter
 61from hat import json
 62from hat import util
 63from hat.monitor import common
 64
 65
 66mlog: logging.Logger = logging.getLogger(__name__)
 67"""Module logger"""
 68
 69RunCb = typing.Callable[..., typing.Awaitable]
 70"""Component run callback coroutine
 71
 72First argument is component instance and remaining arguments are one provided
 73during component initialization.
 74
 75"""
 76
 77
 78async def connect(conf: json.Data,
 79                  data: json.Data = None
 80                  ) -> 'Client':
 81    """Connect to local monitor server
 82
 83    Connection is established once chatter communication is established.
 84
 85    Args:
 86        conf: configuration as defined by ``hat://monitor/client.yaml#``
 87
 88    """
 89    client = Client()
 90    client._conf = conf
 91    client._data = data
 92    client._components = []
 93    client._info = None
 94    client._blessing_res = common.BlessingRes(token=None,
 95                                              ready=False)
 96    client._change_cbs = util.CallbackRegistry()
 97
 98    client._conn = await chatter.connect(common.sbs_repo,
 99                                         conf['monitor_address'])
100    client.async_group.spawn(client._receive_loop)
101
102    mlog.debug("connected to local monitor server %s", conf['monitor_address'])
103    return client
104
105
106class Client(aio.Resource):
107
108    @property
109    def async_group(self) -> aio.Group:
110        """Async group"""
111        return self._conn.async_group
112
113    @property
114    def info(self) -> typing.Optional[common.ComponentInfo]:
115        """Client's component info"""
116        return self._info
117
118    @property
119    def components(self) -> typing.List[common.ComponentInfo]:
120        """Global component state"""
121        return self._components
122
123    def register_change_cb(self,
124                           cb: typing.Callable[[], None]
125                           ) -> util.RegisterCallbackHandle:
126        """Register change callback
127
128        Registered callback is called once info and/or components changes.
129
130        """
131        return self._change_cbs.register(cb)
132
133    def set_blessing_res(self, res: common.BlessingRes):
134        """Set blessing response"""
135        if res == self._blessing_res:
136            return
137
138        self._blessing_res = res
139        self._send_msg_client()
140
141    async def _receive_loop(self):
142        try:
143            self._send_msg_client()
144
145            while True:
146                msg = await self._conn.receive()
147                msg_type = msg.data.module, msg.data.type
148
149                if msg_type == ('HatMonitor', 'MsgServer'):
150                    msg_server = common.msg_server_from_sbs(msg.data.data)
151                    self._process_msg_server(msg_server)
152
153                else:
154                    raise Exception('unsupported message type')
155
156        except ConnectionError:
157            mlog.debug("connection closed")
158
159        except Exception as e:
160            mlog.warning("monitor client error: %s", e, exc_info=e)
161
162        finally:
163            self.close()
164
165    def _send_msg_client(self):
166        msg_client = common.MsgClient(name=self._conf['name'],
167                                      group=self._conf['group'],
168                                      data=self._data,
169                                      blessing_res=self._blessing_res)
170        self._conn.send(chatter.Data(
171            module='HatMonitor',
172            type='MsgClient',
173            data=common.msg_client_to_sbs(msg_client)))
174
175    def _process_msg_server(self, msg_server):
176        components = msg_server.components
177        info = util.first(components, lambda i: (i.cid == msg_server.cid and
178                                                 i.mid == msg_server.mid))
179
180        if (self._components == components and self._info == info):
181            return
182
183        self._components = components
184        self._info = info
185        self._change_cbs.notify()
186
187
188class Component(aio.Resource):
189    """Monitor component
190
191    Implementation of component behaviour according to BLESS_ALL and BLESS_ONE
192    algorithms.
193
194    Component runs client's loop which manages blessing/ready states based on
195    provided monitor client. Initialy, component's ready is disabled.
196
197    Provided client is owned by component instance - closing component
198    will close associated client.
199
200    When component's ready is enabled and blessing token matches ready token,
201    `run_cb` is called with component instance and additionaly provided `args`
202    arguments. While `run_cb` is running, if ready enabled state or
203    blessing token changes, `run_cb` is canceled.
204
205    If `run_cb` finishes or raises exception, component is closed.
206
207    """
208
209    def __init__(self,
210                 client: Client,
211                 run_cb: RunCb,
212                 *args, **kwargs):
213        self._client = client
214        self._run_cb = run_cb
215        self._args = args
216        self._kwargs = kwargs
217        self._ready = False
218        self._change_queue = aio.Queue()
219
220        self.async_group.spawn(self._component_loop)
221
222    @property
223    def async_group(self) -> aio.Group:
224        """Async group"""
225        return self._client.async_group
226
227    @property
228    def client(self) -> Client:
229        """Client"""
230        return self._client
231
232    @property
233    def ready(self) -> bool:
234        """Ready"""
235        return self._ready
236
237    def set_ready(self, ready: bool):
238        """Set ready"""
239        if self._ready == ready:
240            return
241
242        self._ready = ready
243        self._change_queue.put_nowait(None)
244
245    def _on_client_change(self):
246        self._change_queue.put_nowait(None)
247
248    async def _component_loop(self):
249        try:
250            with self._client.register_change_cb(self._on_client_change):
251                while True:
252                    mlog.debug("waiting blessing and ready")
253                    await self._wait_until_blessed_and_ready()
254
255                    async with self.async_group.create_subgroup() as subgroup:
256                        mlog.debug("running component's run_cb")
257
258                        run_future = subgroup.spawn(
259                            self._run_cb, self, *self._args, **self._kwargs)
260                        ready_future = subgroup.spawn(
261                            self._wait_while_blessed_and_ready)
262
263                        await asyncio.wait([run_future, ready_future],
264                                           return_when=asyncio.FIRST_COMPLETED)
265
266                        if run_future.done():
267                            return
268
269        except ConnectionError:
270            raise
271
272        except Exception as e:
273            mlog.warning("component loop error: %s", e, exc_info=e)
274
275        finally:
276            self.close()
277
278    async def _wait_until_blessed_and_ready(self):
279        while True:
280            info = self._client.info
281            token = info.blessing_req.token if info and self._ready else None
282            blessing_res = common.BlessingRes(token=token,
283                                              ready=self._ready)
284
285            self._client.set_blessing_res(blessing_res)
286            if token is not None:
287                break
288
289            await self._change_queue.get_until_empty()
290
291    async def _wait_while_blessed_and_ready(self):
292        while True:
293            info = self._client.info
294            token = info.blessing_req.token if info and self._ready else None
295            blessing_res = common.BlessingRes(token=token,
296                                              ready=self._ready)
297
298            if token is None:
299                self._client.set_blessing_res(blessing_res)
300                break
301
302            await self._change_queue.get_until_empty()
mlog: logging.Logger = <Logger hat.monitor.client (WARNING)>

Module logger

RunCb = typing.Callable[..., typing.Awaitable]

Component run callback coroutine

First argument is component instance and remaining arguments are one provided during component initialization.

async def connect(conf: ~Data, data: ~Data = None) -> hat.monitor.client.Client:
 79async def connect(conf: json.Data,
 80                  data: json.Data = None
 81                  ) -> 'Client':
 82    """Connect to local monitor server
 83
 84    Connection is established once chatter communication is established.
 85
 86    Args:
 87        conf: configuration as defined by ``hat://monitor/client.yaml#``
 88
 89    """
 90    client = Client()
 91    client._conf = conf
 92    client._data = data
 93    client._components = []
 94    client._info = None
 95    client._blessing_res = common.BlessingRes(token=None,
 96                                              ready=False)
 97    client._change_cbs = util.CallbackRegistry()
 98
 99    client._conn = await chatter.connect(common.sbs_repo,
100                                         conf['monitor_address'])
101    client.async_group.spawn(client._receive_loop)
102
103    mlog.debug("connected to local monitor server %s", conf['monitor_address'])
104    return client

Connect to local monitor server

Connection is established once chatter communication is established.

Args
  • conf: configuration as defined by hat://monitor/client.yaml#
class Client(hat.aio.Resource):
107class Client(aio.Resource):
108
109    @property
110    def async_group(self) -> aio.Group:
111        """Async group"""
112        return self._conn.async_group
113
114    @property
115    def info(self) -> typing.Optional[common.ComponentInfo]:
116        """Client's component info"""
117        return self._info
118
119    @property
120    def components(self) -> typing.List[common.ComponentInfo]:
121        """Global component state"""
122        return self._components
123
124    def register_change_cb(self,
125                           cb: typing.Callable[[], None]
126                           ) -> util.RegisterCallbackHandle:
127        """Register change callback
128
129        Registered callback is called once info and/or components changes.
130
131        """
132        return self._change_cbs.register(cb)
133
134    def set_blessing_res(self, res: common.BlessingRes):
135        """Set blessing response"""
136        if res == self._blessing_res:
137            return
138
139        self._blessing_res = res
140        self._send_msg_client()
141
142    async def _receive_loop(self):
143        try:
144            self._send_msg_client()
145
146            while True:
147                msg = await self._conn.receive()
148                msg_type = msg.data.module, msg.data.type
149
150                if msg_type == ('HatMonitor', 'MsgServer'):
151                    msg_server = common.msg_server_from_sbs(msg.data.data)
152                    self._process_msg_server(msg_server)
153
154                else:
155                    raise Exception('unsupported message type')
156
157        except ConnectionError:
158            mlog.debug("connection closed")
159
160        except Exception as e:
161            mlog.warning("monitor client error: %s", e, exc_info=e)
162
163        finally:
164            self.close()
165
166    def _send_msg_client(self):
167        msg_client = common.MsgClient(name=self._conf['name'],
168                                      group=self._conf['group'],
169                                      data=self._data,
170                                      blessing_res=self._blessing_res)
171        self._conn.send(chatter.Data(
172            module='HatMonitor',
173            type='MsgClient',
174            data=common.msg_client_to_sbs(msg_client)))
175
176    def _process_msg_server(self, msg_server):
177        components = msg_server.components
178        info = util.first(components, lambda i: (i.cid == msg_server.cid and
179                                                 i.mid == msg_server.mid))
180
181        if (self._components == components and self._info == info):
182            return
183
184        self._components = components
185        self._info = info
186        self._change_cbs.notify()

Resource with lifetime control based on Group.

Client()
async_group: hat.aio.Group

Async group

Client's component info

Global component state

def register_change_cb(self, cb: Callable[[], NoneType]) -> hat.util.RegisterCallbackHandle:
124    def register_change_cb(self,
125                           cb: typing.Callable[[], None]
126                           ) -> util.RegisterCallbackHandle:
127        """Register change callback
128
129        Registered callback is called once info and/or components changes.
130
131        """
132        return self._change_cbs.register(cb)

Register change callback

Registered callback is called once info and/or components changes.

def set_blessing_res(self, res: hat.monitor.common.BlessingRes)
134    def set_blessing_res(self, res: common.BlessingRes):
135        """Set blessing response"""
136        if res == self._blessing_res:
137            return
138
139        self._blessing_res = res
140        self._send_msg_client()

Set blessing response

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close
class Component(hat.aio.Resource):
189class Component(aio.Resource):
190    """Monitor component
191
192    Implementation of component behaviour according to BLESS_ALL and BLESS_ONE
193    algorithms.
194
195    Component runs client's loop which manages blessing/ready states based on
196    provided monitor client. Initialy, component's ready is disabled.
197
198    Provided client is owned by component instance - closing component
199    will close associated client.
200
201    When component's ready is enabled and blessing token matches ready token,
202    `run_cb` is called with component instance and additionaly provided `args`
203    arguments. While `run_cb` is running, if ready enabled state or
204    blessing token changes, `run_cb` is canceled.
205
206    If `run_cb` finishes or raises exception, component is closed.
207
208    """
209
210    def __init__(self,
211                 client: Client,
212                 run_cb: RunCb,
213                 *args, **kwargs):
214        self._client = client
215        self._run_cb = run_cb
216        self._args = args
217        self._kwargs = kwargs
218        self._ready = False
219        self._change_queue = aio.Queue()
220
221        self.async_group.spawn(self._component_loop)
222
223    @property
224    def async_group(self) -> aio.Group:
225        """Async group"""
226        return self._client.async_group
227
228    @property
229    def client(self) -> Client:
230        """Client"""
231        return self._client
232
233    @property
234    def ready(self) -> bool:
235        """Ready"""
236        return self._ready
237
238    def set_ready(self, ready: bool):
239        """Set ready"""
240        if self._ready == ready:
241            return
242
243        self._ready = ready
244        self._change_queue.put_nowait(None)
245
246    def _on_client_change(self):
247        self._change_queue.put_nowait(None)
248
249    async def _component_loop(self):
250        try:
251            with self._client.register_change_cb(self._on_client_change):
252                while True:
253                    mlog.debug("waiting blessing and ready")
254                    await self._wait_until_blessed_and_ready()
255
256                    async with self.async_group.create_subgroup() as subgroup:
257                        mlog.debug("running component's run_cb")
258
259                        run_future = subgroup.spawn(
260                            self._run_cb, self, *self._args, **self._kwargs)
261                        ready_future = subgroup.spawn(
262                            self._wait_while_blessed_and_ready)
263
264                        await asyncio.wait([run_future, ready_future],
265                                           return_when=asyncio.FIRST_COMPLETED)
266
267                        if run_future.done():
268                            return
269
270        except ConnectionError:
271            raise
272
273        except Exception as e:
274            mlog.warning("component loop error: %s", e, exc_info=e)
275
276        finally:
277            self.close()
278
279    async def _wait_until_blessed_and_ready(self):
280        while True:
281            info = self._client.info
282            token = info.blessing_req.token if info and self._ready else None
283            blessing_res = common.BlessingRes(token=token,
284                                              ready=self._ready)
285
286            self._client.set_blessing_res(blessing_res)
287            if token is not None:
288                break
289
290            await self._change_queue.get_until_empty()
291
292    async def _wait_while_blessed_and_ready(self):
293        while True:
294            info = self._client.info
295            token = info.blessing_req.token if info and self._ready else None
296            blessing_res = common.BlessingRes(token=token,
297                                              ready=self._ready)
298
299            if token is None:
300                self._client.set_blessing_res(blessing_res)
301                break
302
303            await self._change_queue.get_until_empty()

Monitor component

Implementation of component behaviour according to BLESS_ALL and BLESS_ONE algorithms.

Component runs client's loop which manages blessing/ready states based on provided monitor client. Initialy, component's ready is disabled.

Provided client is owned by component instance - closing component will close associated client.

When component's ready is enabled and blessing token matches ready token, run_cb is called with component instance and additionaly provided args arguments. While run_cb is running, if ready enabled state or blessing token changes, run_cb is canceled.

If run_cb finishes or raises exception, component is closed.

Component( client: hat.monitor.client.Client, run_cb: Callable[..., Awaitable], *args, **kwargs)
210    def __init__(self,
211                 client: Client,
212                 run_cb: RunCb,
213                 *args, **kwargs):
214        self._client = client
215        self._run_cb = run_cb
216        self._args = args
217        self._kwargs = kwargs
218        self._ready = False
219        self._change_queue = aio.Queue()
220
221        self.async_group.spawn(self._component_loop)
async_group: hat.aio.Group

Async group

Client

ready: bool

Ready

def set_ready(self, ready: bool)
238    def set_ready(self, ready: bool):
239        """Set ready"""
240        if self._ready == ready:
241            return
242
243        self._ready = ready
244        self._change_queue.put_nowait(None)

Set ready

Inherited Members
hat.aio.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close