hat.monitor.server.runner

  1import asyncio
  2import contextlib
  3import itertools
  4import logging
  5
  6from hat import aio
  7from hat import json
  8from hat.drivers import tcp
  9
 10import hat.monitor.observer.master
 11import hat.monitor.observer.server
 12import hat.monitor.observer.slave
 13import hat.monitor.server.blessing
 14import hat.monitor.server.ui
 15
 16
 17mlog: logging.Logger = logging.getLogger(__name__)
 18"""Module logger"""
 19
 20
 21async def create(conf: json.Data) -> 'Runner':
 22    runner = Runner()
 23    runner._loop = asyncio.get_running_loop()
 24    runner._async_group = aio.Group()
 25    runner._server = None
 26    runner._master = None
 27    runner._ui = None
 28    runner._slave = None
 29    runner._slave_conf = conf['slave']
 30    runner._slave_parents = [tcp.Address(i['host'], i['port'])
 31                             for i in conf['slave']['parents']]
 32    runner._default_algorithm = hat.monitor.server.blessing.Algorithm(
 33        conf['default_algorithm'])
 34    runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v)
 35                                for k, v in conf['group_algorithms'].items()}
 36
 37    runner.async_group.spawn(aio.call_on_cancel, runner._on_close)
 38
 39    try:
 40        mlog.debug('starting server')
 41        runner._server = await hat.monitor.observer.server.listen(
 42            tcp.Address(conf['server']['host'], conf['server']['port']),
 43            default_rank=conf['server']['default_rank'],
 44            state_cb=runner._on_server_state)
 45        runner._bind_resource(runner._server)
 46
 47        mlog.debug('starting master')
 48        runner._master = await hat.monitor.observer.master.listen(
 49            tcp.Address(conf['master']['host'], conf['master']['port']),
 50            local_components=runner._server.state.local_components,
 51            global_components_cb=runner._on_master_global_components,
 52            blessing_cb=runner._calculate_blessing)
 53        runner._bind_resource(runner._master)
 54
 55        ui_conf = conf.get('ui')
 56        if ui_conf:
 57            mlog.debug('starting ui')
 58            runner._ui = await hat.monitor.server.ui.create(
 59                ui_conf['host'],
 60                ui_conf['port'],
 61                runner._server.state,
 62                set_rank_cb=runner._on_ui_set_rank)
 63            runner._bind_resource(runner._ui)
 64
 65        runner.async_group.spawn(runner._runner_loop)
 66
 67    except BaseException:
 68        await aio.uncancellable(runner.async_close())
 69        raise
 70
 71    return runner
 72
 73
 74class Runner(aio.Resource):
 75
 76    @property
 77    def async_group(self):
 78        return self._async_group
 79
 80    async def _on_close(self):
 81        if self._ui:
 82            await self._ui.async_close()
 83
 84        if self._server:
 85            await self._server.async_close()
 86
 87        if self._master:
 88            await self._master.async_close()
 89
 90        if self._slave:
 91            await self._slave.async_close()
 92
 93    async def _on_server_state(self, server, state):
 94        if self._ui:
 95            self._ui.set_state(state)
 96
 97        if self._master:
 98            await self._master.set_local_components(state.local_components)
 99
100        if self._slave and self._slave.is_open:
101            with contextlib.suppress(ConnectionError):
102                await self._slave.update(state.local_components)
103
104    async def _on_master_global_components(self, master, global_components):
105        if self._server and master.is_active:
106            await self._server.update(0, global_components)
107
108    async def _on_ui_set_rank(self, ui, cid, rank):
109        if self._server:
110            await self._server.set_rank(cid, rank)
111
112    async def _on_slave_state(self, slave, state):
113        if (self._server and
114                self._master and
115                not self._master.is_active and
116                state.mid is not None):
117            await self._server.update(state.mid, state.global_components)
118
119    async def _runner_loop(self):
120        try:
121            await self._set_master_active(False)
122
123            if not self._slave_parents:
124                self._master.set_active(True)
125                await self._loop.create_future()
126
127            while True:
128                if not self._slave:
129                    await self._server.update(0, [])
130                    await self._create_slave_loop(
131                        self._slave_conf['connect_retry_count'])
132
133                if self._slave and self._slave.is_open:
134                    await self._set_master_active(False)
135                    await self._slave.wait_closed()
136
137                elif self._slave:
138                    await self._slave.async_close()
139                    self._slave = None
140
141                else:
142                    mlog.debug('no master detected - activating local master')
143                    await self._set_master_active(True)
144                    await self._create_slave_loop(None)
145
146        except ConnectionError:
147            pass
148
149        except Exception as e:
150            mlog.error('runner loop error: %s', e, exc_info=e)
151
152        finally:
153            self.close()
154
155    def _bind_resource(self, resource):
156        self.async_group.spawn(aio.call_on_done, resource.wait_closing(),
157                               self.close)
158
159    def _calculate_blessing(self, master, components):
160        return hat.monitor.server.blessing.calculate(
161            components=components,
162            default_algorithm=self._default_algorithm,
163            group_algorithms=self._group_algorithms)
164
165    async def _set_master_active(self, active):
166        self._master.set_active(active)
167        await self._on_server_state(self._server, self._server.state)
168
169        if active:
170            await self._server.update(0, self._master.global_components)
171
172        elif self._slave and self._slave.state.mid is not None:
173            await self._server.update(self._slave.state.mid,
174                                      self._slave.state.global_components)
175
176    async def _create_slave_loop(self, retry_count):
177        counter = (range(retry_count + 1) if retry_count is not None
178                   else itertools.repeat(None))
179
180        for count in counter:
181            for addr in self._slave_parents:
182                with contextlib.suppress(Exception):
183                    self._slave = await self._create_slave(addr)
184                    return
185
186            if count is None or count < retry_count:
187                await asyncio.sleep(self._slave_conf['connect_retry_delay'])
188
189    async def _create_slave(self, addr):
190        try:
191            return await aio.wait_for(
192                hat.monitor.observer.slave.connect(
193                    addr,
194                    local_components=self._server.state.local_components,
195                    state_cb=self._on_slave_state),
196                self._slave_conf['connect_timeout'])
197
198        except aio.CancelledWithResultError as e:
199            if e.result:
200                await aio.uncancellable(e.result.async_close())
201            raise
mlog: logging.Logger = <Logger hat.monitor.server.runner (WARNING)>

Module logger

async def create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]) -> Runner:
22async def create(conf: json.Data) -> 'Runner':
23    runner = Runner()
24    runner._loop = asyncio.get_running_loop()
25    runner._async_group = aio.Group()
26    runner._server = None
27    runner._master = None
28    runner._ui = None
29    runner._slave = None
30    runner._slave_conf = conf['slave']
31    runner._slave_parents = [tcp.Address(i['host'], i['port'])
32                             for i in conf['slave']['parents']]
33    runner._default_algorithm = hat.monitor.server.blessing.Algorithm(
34        conf['default_algorithm'])
35    runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v)
36                                for k, v in conf['group_algorithms'].items()}
37
38    runner.async_group.spawn(aio.call_on_cancel, runner._on_close)
39
40    try:
41        mlog.debug('starting server')
42        runner._server = await hat.monitor.observer.server.listen(
43            tcp.Address(conf['server']['host'], conf['server']['port']),
44            default_rank=conf['server']['default_rank'],
45            state_cb=runner._on_server_state)
46        runner._bind_resource(runner._server)
47
48        mlog.debug('starting master')
49        runner._master = await hat.monitor.observer.master.listen(
50            tcp.Address(conf['master']['host'], conf['master']['port']),
51            local_components=runner._server.state.local_components,
52            global_components_cb=runner._on_master_global_components,
53            blessing_cb=runner._calculate_blessing)
54        runner._bind_resource(runner._master)
55
56        ui_conf = conf.get('ui')
57        if ui_conf:
58            mlog.debug('starting ui')
59            runner._ui = await hat.monitor.server.ui.create(
60                ui_conf['host'],
61                ui_conf['port'],
62                runner._server.state,
63                set_rank_cb=runner._on_ui_set_rank)
64            runner._bind_resource(runner._ui)
65
66        runner.async_group.spawn(runner._runner_loop)
67
68    except BaseException:
69        await aio.uncancellable(runner.async_close())
70        raise
71
72    return runner
class Runner(hat.aio.group.Resource):
 75class Runner(aio.Resource):
 76
 77    @property
 78    def async_group(self):
 79        return self._async_group
 80
 81    async def _on_close(self):
 82        if self._ui:
 83            await self._ui.async_close()
 84
 85        if self._server:
 86            await self._server.async_close()
 87
 88        if self._master:
 89            await self._master.async_close()
 90
 91        if self._slave:
 92            await self._slave.async_close()
 93
 94    async def _on_server_state(self, server, state):
 95        if self._ui:
 96            self._ui.set_state(state)
 97
 98        if self._master:
 99            await self._master.set_local_components(state.local_components)
100
101        if self._slave and self._slave.is_open:
102            with contextlib.suppress(ConnectionError):
103                await self._slave.update(state.local_components)
104
105    async def _on_master_global_components(self, master, global_components):
106        if self._server and master.is_active:
107            await self._server.update(0, global_components)
108
109    async def _on_ui_set_rank(self, ui, cid, rank):
110        if self._server:
111            await self._server.set_rank(cid, rank)
112
113    async def _on_slave_state(self, slave, state):
114        if (self._server and
115                self._master and
116                not self._master.is_active and
117                state.mid is not None):
118            await self._server.update(state.mid, state.global_components)
119
120    async def _runner_loop(self):
121        try:
122            await self._set_master_active(False)
123
124            if not self._slave_parents:
125                self._master.set_active(True)
126                await self._loop.create_future()
127
128            while True:
129                if not self._slave:
130                    await self._server.update(0, [])
131                    await self._create_slave_loop(
132                        self._slave_conf['connect_retry_count'])
133
134                if self._slave and self._slave.is_open:
135                    await self._set_master_active(False)
136                    await self._slave.wait_closed()
137
138                elif self._slave:
139                    await self._slave.async_close()
140                    self._slave = None
141
142                else:
143                    mlog.debug('no master detected - activating local master')
144                    await self._set_master_active(True)
145                    await self._create_slave_loop(None)
146
147        except ConnectionError:
148            pass
149
150        except Exception as e:
151            mlog.error('runner loop error: %s', e, exc_info=e)
152
153        finally:
154            self.close()
155
156    def _bind_resource(self, resource):
157        self.async_group.spawn(aio.call_on_done, resource.wait_closing(),
158                               self.close)
159
160    def _calculate_blessing(self, master, components):
161        return hat.monitor.server.blessing.calculate(
162            components=components,
163            default_algorithm=self._default_algorithm,
164            group_algorithms=self._group_algorithms)
165
166    async def _set_master_active(self, active):
167        self._master.set_active(active)
168        await self._on_server_state(self._server, self._server.state)
169
170        if active:
171            await self._server.update(0, self._master.global_components)
172
173        elif self._slave and self._slave.state.mid is not None:
174            await self._server.update(self._slave.state.mid,
175                                      self._slave.state.global_components)
176
177    async def _create_slave_loop(self, retry_count):
178        counter = (range(retry_count + 1) if retry_count is not None
179                   else itertools.repeat(None))
180
181        for count in counter:
182            for addr in self._slave_parents:
183                with contextlib.suppress(Exception):
184                    self._slave = await self._create_slave(addr)
185                    return
186
187            if count is None or count < retry_count:
188                await asyncio.sleep(self._slave_conf['connect_retry_delay'])
189
190    async def _create_slave(self, addr):
191        try:
192            return await aio.wait_for(
193                hat.monitor.observer.slave.connect(
194                    addr,
195                    local_components=self._server.state.local_components,
196                    state_cb=self._on_slave_state),
197                self._slave_conf['connect_timeout'])
198
199        except aio.CancelledWithResultError as e:
200            if e.result:
201                await aio.uncancellable(e.result.async_close())
202            raise

Resource with lifetime control based on Group.

async_group
77    @property
78    def async_group(self):
79        return self._async_group

Group controlling resource's lifetime.

Inherited Members
hat.aio.group.Resource
is_open
is_closing
is_closed
wait_closing
wait_closed
close
async_close