hat.monitor.server.runner

  1from pathlib import Path
  2import asyncio
  3import contextlib
  4import itertools
  5import logging
  6
  7from hat import aio
  8from hat import json
  9from hat.drivers import tcp
 10
 11import hat.monitor.observer.master
 12import hat.monitor.observer.server
 13import hat.monitor.observer.slave
 14import hat.monitor.server.blessing
 15import hat.monitor.server.ui
 16
 17
 18mlog: logging.Logger = logging.getLogger(__name__)
 19"""Module logger"""
 20
 21
 22async def create(conf: json.Data) -> 'Runner':
 23    runner = Runner()
 24    runner._loop = asyncio.get_running_loop()
 25    runner._async_group = aio.Group()
 26    runner._server = None
 27    runner._master = None
 28    runner._ui = None
 29    runner._slave = None
 30    runner._slave_conf = conf['slave']
 31    runner._slave_parents = [tcp.Address(i['host'], i['port'])
 32                             for i in conf['slave']['parents']]
 33    runner._default_algorithm = hat.monitor.server.blessing.Algorithm(
 34        conf['default_algorithm'])
 35    runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v)
 36                                for k, v in conf['group_algorithms'].items()}
 37
 38    runner.async_group.spawn(aio.call_on_cancel, runner._on_close)
 39
 40    try:
 41        mlog.debug('starting server')
 42        runner._server = await hat.monitor.observer.server.listen(
 43            tcp.Address(conf['server']['host'], conf['server']['port']),
 44            default_rank=conf['server']['default_rank'],
 45            state_cb=runner._on_server_state)
 46        runner._bind_resource(runner._server)
 47
 48        mlog.debug('starting master')
 49        runner._master = await hat.monitor.observer.master.listen(
 50            tcp.Address(conf['master']['host'], conf['master']['port']),
 51            global_components_cb=runner._on_master_global_components,
 52            blessing_cb=runner._calculate_blessing)
 53        runner._bind_resource(runner._master)
 54
 55        await runner._master.set_local_components(
 56            runner._server.state.local_components)
 57
 58        ui_conf = conf.get('ui')
 59        if ui_conf:
 60            mlog.debug('starting ui')
 61            htpasswd = (Path(ui_conf['htpasswd']) if 'htpasswd' in ui_conf
 62                        else None)
 63            runner._ui = await hat.monitor.server.ui.create(
 64                ui_conf['host'],
 65                ui_conf['port'],
 66                runner._server.state,
 67                set_rank_cb=runner._on_ui_set_rank,
 68                htpasswd=htpasswd)
 69            runner._bind_resource(runner._ui)
 70
 71        runner.async_group.spawn(runner._runner_loop)
 72
 73    except BaseException:
 74        await aio.uncancellable(runner.async_close())
 75        raise
 76
 77    return runner
 78
 79
 80class Runner(aio.Resource):
 81
 82    @property
 83    def async_group(self):
 84        return self._async_group
 85
 86    async def _on_close(self):
 87        if self._ui:
 88            await self._ui.async_close()
 89
 90        if self._server:
 91            await self._server.async_close()
 92
 93        if self._master:
 94            await self._master.async_close()
 95
 96        if self._slave:
 97            await self._slave.async_close()
 98
 99    async def _on_server_state(self, server, state):
100        if self._ui:
101            self._ui.set_state(state)
102
103        if self._master:
104            await self._master.set_local_components(state.local_components)
105
106        if self._slave and self._slave.is_open:
107            with contextlib.suppress(ConnectionError):
108                await self._slave.update(state.local_components)
109
110    async def _on_master_global_components(self, master, global_components):
111        if self._server and master.is_active:
112            await self._server.update(0, global_components)
113
114    async def _on_ui_set_rank(self, ui, cid, rank):
115        if self._server:
116            await self._server.set_rank(cid, rank)
117
118    async def _on_slave_state(self, slave, state):
119        if not self._server or not self._master:
120            return
121
122        if state.mid is None or self._master.is_active:
123            return
124
125        await self._master.set_local_blessing_reqs(
126            (info.cid, info.blessing_req)
127            for info in state.global_components
128            if info.mid == state.mid)
129
130        await self._server.update(state.mid, state.global_components)
131
132    async def _runner_loop(self):
133        try:
134            await self._set_master_active(False)
135
136            if not self._slave_parents:
137                self._master.set_active(True)
138                await self._loop.create_future()
139
140            while True:
141                if not self._slave:
142                    await self._create_slave_loop(
143                        self._slave_conf['connect_retry_count'])
144
145                if self._slave and self._slave.is_open:
146                    await self._set_master_active(False)
147                    await self._slave.wait_closed()
148
149                elif self._slave:
150                    await self._slave.async_close()
151                    self._slave = None
152
153                else:
154                    mlog.debug('no master detected - activating local master')
155                    await self._set_master_active(True)
156                    await self._create_slave_loop(None)
157
158        except ConnectionError:
159            pass
160
161        except Exception as e:
162            mlog.error('runner loop error: %s', e, exc_info=e)
163
164        finally:
165            self.close()
166
167    def _bind_resource(self, resource):
168        self.async_group.spawn(aio.call_on_done, resource.wait_closing(),
169                               self.close)
170
171    def _calculate_blessing(self, master, components):
172        yield from hat.monitor.server.blessing.calculate(
173            components=components,
174            default_algorithm=self._default_algorithm,
175            group_algorithms=self._group_algorithms)
176
177    async def _set_master_active(self, active):
178        self._master.set_active(active)
179        await self._on_server_state(self._server, self._server.state)
180
181        if active:
182            await self._server.update(0, self._master.global_components)
183
184        elif self._slave and self._slave.state.mid is not None:
185            await self._server.update(self._slave.state.mid,
186                                      self._slave.state.global_components)
187
188    async def _create_slave_loop(self, retry_count):
189        counter = (range(retry_count + 1) if retry_count is not None
190                   else itertools.repeat(None))
191
192        for count in counter:
193            for addr in self._slave_parents:
194                with contextlib.suppress(Exception):
195                    self._slave = await self._create_slave(addr)
196                    return
197
198            if count is None or count < retry_count:
199                await asyncio.sleep(self._slave_conf['connect_retry_delay'])
200
201    async def _create_slave(self, addr):
202        try:
203            return await aio.wait_for(
204                hat.monitor.observer.slave.connect(
205                    addr,
206                    local_components=self._server.state.local_components,
207                    state_cb=self._on_slave_state),
208                self._slave_conf['connect_timeout'])
209
210        except aio.CancelledWithResultError as e:
211            if e.result:
212                await aio.uncancellable(e.result.async_close())
213            raise
mlog: logging.Logger = <Logger hat.monitor.server.runner (WARNING)>

Module logger

async def create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Runner:
23async def create(conf: json.Data) -> 'Runner':
24    runner = Runner()
25    runner._loop = asyncio.get_running_loop()
26    runner._async_group = aio.Group()
27    runner._server = None
28    runner._master = None
29    runner._ui = None
30    runner._slave = None
31    runner._slave_conf = conf['slave']
32    runner._slave_parents = [tcp.Address(i['host'], i['port'])
33                             for i in conf['slave']['parents']]
34    runner._default_algorithm = hat.monitor.server.blessing.Algorithm(
35        conf['default_algorithm'])
36    runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v)
37                                for k, v in conf['group_algorithms'].items()}
38
39    runner.async_group.spawn(aio.call_on_cancel, runner._on_close)
40
41    try:
42        mlog.debug('starting server')
43        runner._server = await hat.monitor.observer.server.listen(
44            tcp.Address(conf['server']['host'], conf['server']['port']),
45            default_rank=conf['server']['default_rank'],
46            state_cb=runner._on_server_state)
47        runner._bind_resource(runner._server)
48
49        mlog.debug('starting master')
50        runner._master = await hat.monitor.observer.master.listen(
51            tcp.Address(conf['master']['host'], conf['master']['port']),
52            global_components_cb=runner._on_master_global_components,
53            blessing_cb=runner._calculate_blessing)
54        runner._bind_resource(runner._master)
55
56        await runner._master.set_local_components(
57            runner._server.state.local_components)
58
59        ui_conf = conf.get('ui')
60        if ui_conf:
61            mlog.debug('starting ui')
62            htpasswd = (Path(ui_conf['htpasswd']) if 'htpasswd' in ui_conf
63                        else None)
64            runner._ui = await hat.monitor.server.ui.create(
65                ui_conf['host'],
66                ui_conf['port'],
67                runner._server.state,
68                set_rank_cb=runner._on_ui_set_rank,
69                htpasswd=htpasswd)
70            runner._bind_resource(runner._ui)
71
72        runner.async_group.spawn(runner._runner_loop)
73
74    except BaseException:
75        await aio.uncancellable(runner.async_close())
76        raise
77
78    return runner
class Runner(hat.aio.group.Resource):
 81class Runner(aio.Resource):
 82
 83    @property
 84    def async_group(self):
 85        return self._async_group
 86
 87    async def _on_close(self):
 88        if self._ui:
 89            await self._ui.async_close()
 90
 91        if self._server:
 92            await self._server.async_close()
 93
 94        if self._master:
 95            await self._master.async_close()
 96
 97        if self._slave:
 98            await self._slave.async_close()
 99
100    async def _on_server_state(self, server, state):
101        if self._ui:
102            self._ui.set_state(state)
103
104        if self._master:
105            await self._master.set_local_components(state.local_components)
106
107        if self._slave and self._slave.is_open:
108            with contextlib.suppress(ConnectionError):
109                await self._slave.update(state.local_components)
110
111    async def _on_master_global_components(self, master, global_components):
112        if self._server and master.is_active:
113            await self._server.update(0, global_components)
114
115    async def _on_ui_set_rank(self, ui, cid, rank):
116        if self._server:
117            await self._server.set_rank(cid, rank)
118
119    async def _on_slave_state(self, slave, state):
120        if not self._server or not self._master:
121            return
122
123        if state.mid is None or self._master.is_active:
124            return
125
126        await self._master.set_local_blessing_reqs(
127            (info.cid, info.blessing_req)
128            for info in state.global_components
129            if info.mid == state.mid)
130
131        await self._server.update(state.mid, state.global_components)
132
133    async def _runner_loop(self):
134        try:
135            await self._set_master_active(False)
136
137            if not self._slave_parents:
138                self._master.set_active(True)
139                await self._loop.create_future()
140
141            while True:
142                if not self._slave:
143                    await self._create_slave_loop(
144                        self._slave_conf['connect_retry_count'])
145
146                if self._slave and self._slave.is_open:
147                    await self._set_master_active(False)
148                    await self._slave.wait_closed()
149
150                elif self._slave:
151                    await self._slave.async_close()
152                    self._slave = None
153
154                else:
155                    mlog.debug('no master detected - activating local master')
156                    await self._set_master_active(True)
157                    await self._create_slave_loop(None)
158
159        except ConnectionError:
160            pass
161
162        except Exception as e:
163            mlog.error('runner loop error: %s', e, exc_info=e)
164
165        finally:
166            self.close()
167
168    def _bind_resource(self, resource):
169        self.async_group.spawn(aio.call_on_done, resource.wait_closing(),
170                               self.close)
171
172    def _calculate_blessing(self, master, components):
173        yield from hat.monitor.server.blessing.calculate(
174            components=components,
175            default_algorithm=self._default_algorithm,
176            group_algorithms=self._group_algorithms)
177
178    async def _set_master_active(self, active):
179        self._master.set_active(active)
180        await self._on_server_state(self._server, self._server.state)
181
182        if active:
183            await self._server.update(0, self._master.global_components)
184
185        elif self._slave and self._slave.state.mid is not None:
186            await self._server.update(self._slave.state.mid,
187                                      self._slave.state.global_components)
188
189    async def _create_slave_loop(self, retry_count):
190        counter = (range(retry_count + 1) if retry_count is not None
191                   else itertools.repeat(None))
192
193        for count in counter:
194            for addr in self._slave_parents:
195                with contextlib.suppress(Exception):
196                    self._slave = await self._create_slave(addr)
197                    return
198
199            if count is None or count < retry_count:
200                await asyncio.sleep(self._slave_conf['connect_retry_delay'])
201
202    async def _create_slave(self, addr):
203        try:
204            return await aio.wait_for(
205                hat.monitor.observer.slave.connect(
206                    addr,
207                    local_components=self._server.state.local_components,
208                    state_cb=self._on_slave_state),
209                self._slave_conf['connect_timeout'])
210
211        except aio.CancelledWithResultError as e:
212            if e.result:
213                await aio.uncancellable(e.result.async_close())
214            raise

Resource with lifetime control based on Group.

async_group
83    @property
84    def async_group(self):
85        return self._async_group

Group controlling resource's lifetime.