hat.monitor.server.runner
1from pathlib import Path 2import asyncio 3import contextlib 4import itertools 5import logging 6 7from hat import aio 8from hat import json 9from hat.drivers import tcp 10 11import hat.monitor.observer.master 12import hat.monitor.observer.server 13import hat.monitor.observer.slave 14import hat.monitor.server.blessing 15import hat.monitor.server.ui 16 17 18mlog: logging.Logger = logging.getLogger(__name__) 19"""Module logger""" 20 21 22async def create(conf: json.Data) -> 'Runner': 23 runner = Runner() 24 runner._loop = asyncio.get_running_loop() 25 runner._async_group = aio.Group() 26 runner._server = None 27 runner._master = None 28 runner._ui = None 29 runner._slave = None 30 runner._slave_conf = conf['slave'] 31 runner._slave_parents = [tcp.Address(i['host'], i['port']) 32 for i in conf['slave']['parents']] 33 runner._default_algorithm = hat.monitor.server.blessing.Algorithm( 34 conf['default_algorithm']) 35 runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v) 36 for k, v in conf['group_algorithms'].items()} 37 38 runner.async_group.spawn(aio.call_on_cancel, runner._on_close) 39 40 try: 41 mlog.debug('starting server') 42 runner._server = await hat.monitor.observer.server.listen( 43 tcp.Address(conf['server']['host'], conf['server']['port']), 44 default_rank=conf['server']['default_rank'], 45 state_cb=runner._on_server_state) 46 runner._bind_resource(runner._server) 47 48 mlog.debug('starting master') 49 runner._master = await hat.monitor.observer.master.listen( 50 tcp.Address(conf['master']['host'], conf['master']['port']), 51 global_components_cb=runner._on_master_global_components, 52 blessing_cb=runner._calculate_blessing) 53 runner._bind_resource(runner._master) 54 55 await runner._master.set_local_components( 56 runner._server.state.local_components) 57 58 ui_conf = conf.get('ui') 59 if ui_conf: 60 mlog.debug('starting ui') 61 htpasswd = (Path(ui_conf['htpasswd']) if 'htpasswd' in ui_conf 62 else None) 63 runner._ui = await hat.monitor.server.ui.create( 64 ui_conf['host'], 65 ui_conf['port'], 66 runner._server.state, 67 set_rank_cb=runner._on_ui_set_rank, 68 htpasswd=htpasswd) 69 runner._bind_resource(runner._ui) 70 71 runner.async_group.spawn(runner._runner_loop) 72 73 except BaseException: 74 await aio.uncancellable(runner.async_close()) 75 raise 76 77 return runner 78 79 80class Runner(aio.Resource): 81 82 @property 83 def async_group(self): 84 return self._async_group 85 86 async def _on_close(self): 87 if self._ui: 88 await self._ui.async_close() 89 90 if self._server: 91 await self._server.async_close() 92 93 if self._master: 94 await self._master.async_close() 95 96 if self._slave: 97 await self._slave.async_close() 98 99 async def _on_server_state(self, server, state): 100 if self._ui: 101 self._ui.set_state(state) 102 103 if self._master: 104 await self._master.set_local_components(state.local_components) 105 106 if self._slave and self._slave.is_open: 107 with contextlib.suppress(ConnectionError): 108 await self._slave.update(state.local_components) 109 110 async def _on_master_global_components(self, master, global_components): 111 if self._server and master.is_active: 112 await self._server.update(0, global_components) 113 114 async def _on_ui_set_rank(self, ui, cid, rank): 115 if self._server: 116 await self._server.set_rank(cid, rank) 117 118 async def _on_slave_state(self, slave, state): 119 if not self._server or not self._master: 120 return 121 122 if state.mid is None or self._master.is_active: 123 return 124 125 await self._master.set_local_blessing_reqs( 126 (info.cid, info.blessing_req) 127 for info in state.global_components 128 if info.mid == state.mid) 129 130 await self._server.update(state.mid, state.global_components) 131 132 async def _runner_loop(self): 133 try: 134 await self._set_master_active(False) 135 136 if not self._slave_parents: 137 self._master.set_active(True) 138 await self._loop.create_future() 139 140 while True: 141 if not self._slave: 142 await self._create_slave_loop( 143 self._slave_conf['connect_retry_count']) 144 145 if self._slave and self._slave.is_open: 146 await self._set_master_active(False) 147 await self._slave.wait_closed() 148 149 elif self._slave: 150 await self._slave.async_close() 151 self._slave = None 152 153 else: 154 mlog.debug('no master detected - activating local master') 155 await self._set_master_active(True) 156 await self._create_slave_loop(None) 157 158 except ConnectionError: 159 pass 160 161 except Exception as e: 162 mlog.error('runner loop error: %s', e, exc_info=e) 163 164 finally: 165 self.close() 166 167 def _bind_resource(self, resource): 168 self.async_group.spawn(aio.call_on_done, resource.wait_closing(), 169 self.close) 170 171 def _calculate_blessing(self, master, components): 172 yield from hat.monitor.server.blessing.calculate( 173 components=components, 174 default_algorithm=self._default_algorithm, 175 group_algorithms=self._group_algorithms) 176 177 async def _set_master_active(self, active): 178 self._master.set_active(active) 179 await self._on_server_state(self._server, self._server.state) 180 181 if active: 182 await self._server.update(0, self._master.global_components) 183 184 elif self._slave and self._slave.state.mid is not None: 185 await self._server.update(self._slave.state.mid, 186 self._slave.state.global_components) 187 188 async def _create_slave_loop(self, retry_count): 189 counter = (range(retry_count + 1) if retry_count is not None 190 else itertools.repeat(None)) 191 192 for count in counter: 193 for addr in self._slave_parents: 194 with contextlib.suppress(Exception): 195 self._slave = await self._create_slave(addr) 196 return 197 198 if count is None or count < retry_count: 199 await asyncio.sleep(self._slave_conf['connect_retry_delay']) 200 201 async def _create_slave(self, addr): 202 try: 203 return await aio.wait_for( 204 hat.monitor.observer.slave.connect( 205 addr, 206 local_components=self._server.state.local_components, 207 state_cb=self._on_slave_state), 208 self._slave_conf['connect_timeout']) 209 210 except aio.CancelledWithResultError as e: 211 if e.result: 212 await aio.uncancellable(e.result.async_close()) 213 raise
Module logger
async def
create( conf: Union[NoneType, bool, int, float, str, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')]]) -> Runner:
23async def create(conf: json.Data) -> 'Runner': 24 runner = Runner() 25 runner._loop = asyncio.get_running_loop() 26 runner._async_group = aio.Group() 27 runner._server = None 28 runner._master = None 29 runner._ui = None 30 runner._slave = None 31 runner._slave_conf = conf['slave'] 32 runner._slave_parents = [tcp.Address(i['host'], i['port']) 33 for i in conf['slave']['parents']] 34 runner._default_algorithm = hat.monitor.server.blessing.Algorithm( 35 conf['default_algorithm']) 36 runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v) 37 for k, v in conf['group_algorithms'].items()} 38 39 runner.async_group.spawn(aio.call_on_cancel, runner._on_close) 40 41 try: 42 mlog.debug('starting server') 43 runner._server = await hat.monitor.observer.server.listen( 44 tcp.Address(conf['server']['host'], conf['server']['port']), 45 default_rank=conf['server']['default_rank'], 46 state_cb=runner._on_server_state) 47 runner._bind_resource(runner._server) 48 49 mlog.debug('starting master') 50 runner._master = await hat.monitor.observer.master.listen( 51 tcp.Address(conf['master']['host'], conf['master']['port']), 52 global_components_cb=runner._on_master_global_components, 53 blessing_cb=runner._calculate_blessing) 54 runner._bind_resource(runner._master) 55 56 await runner._master.set_local_components( 57 runner._server.state.local_components) 58 59 ui_conf = conf.get('ui') 60 if ui_conf: 61 mlog.debug('starting ui') 62 htpasswd = (Path(ui_conf['htpasswd']) if 'htpasswd' in ui_conf 63 else None) 64 runner._ui = await hat.monitor.server.ui.create( 65 ui_conf['host'], 66 ui_conf['port'], 67 runner._server.state, 68 set_rank_cb=runner._on_ui_set_rank, 69 htpasswd=htpasswd) 70 runner._bind_resource(runner._ui) 71 72 runner.async_group.spawn(runner._runner_loop) 73 74 except BaseException: 75 await aio.uncancellable(runner.async_close()) 76 raise 77 78 return runner
class
Runner(hat.aio.group.Resource):
81class Runner(aio.Resource): 82 83 @property 84 def async_group(self): 85 return self._async_group 86 87 async def _on_close(self): 88 if self._ui: 89 await self._ui.async_close() 90 91 if self._server: 92 await self._server.async_close() 93 94 if self._master: 95 await self._master.async_close() 96 97 if self._slave: 98 await self._slave.async_close() 99 100 async def _on_server_state(self, server, state): 101 if self._ui: 102 self._ui.set_state(state) 103 104 if self._master: 105 await self._master.set_local_components(state.local_components) 106 107 if self._slave and self._slave.is_open: 108 with contextlib.suppress(ConnectionError): 109 await self._slave.update(state.local_components) 110 111 async def _on_master_global_components(self, master, global_components): 112 if self._server and master.is_active: 113 await self._server.update(0, global_components) 114 115 async def _on_ui_set_rank(self, ui, cid, rank): 116 if self._server: 117 await self._server.set_rank(cid, rank) 118 119 async def _on_slave_state(self, slave, state): 120 if not self._server or not self._master: 121 return 122 123 if state.mid is None or self._master.is_active: 124 return 125 126 await self._master.set_local_blessing_reqs( 127 (info.cid, info.blessing_req) 128 for info in state.global_components 129 if info.mid == state.mid) 130 131 await self._server.update(state.mid, state.global_components) 132 133 async def _runner_loop(self): 134 try: 135 await self._set_master_active(False) 136 137 if not self._slave_parents: 138 self._master.set_active(True) 139 await self._loop.create_future() 140 141 while True: 142 if not self._slave: 143 await self._create_slave_loop( 144 self._slave_conf['connect_retry_count']) 145 146 if self._slave and self._slave.is_open: 147 await self._set_master_active(False) 148 await self._slave.wait_closed() 149 150 elif self._slave: 151 await self._slave.async_close() 152 self._slave = None 153 154 else: 155 mlog.debug('no master detected - activating local master') 156 await self._set_master_active(True) 157 await self._create_slave_loop(None) 158 159 except ConnectionError: 160 pass 161 162 except Exception as e: 163 mlog.error('runner loop error: %s', e, exc_info=e) 164 165 finally: 166 self.close() 167 168 def _bind_resource(self, resource): 169 self.async_group.spawn(aio.call_on_done, resource.wait_closing(), 170 self.close) 171 172 def _calculate_blessing(self, master, components): 173 yield from hat.monitor.server.blessing.calculate( 174 components=components, 175 default_algorithm=self._default_algorithm, 176 group_algorithms=self._group_algorithms) 177 178 async def _set_master_active(self, active): 179 self._master.set_active(active) 180 await self._on_server_state(self._server, self._server.state) 181 182 if active: 183 await self._server.update(0, self._master.global_components) 184 185 elif self._slave and self._slave.state.mid is not None: 186 await self._server.update(self._slave.state.mid, 187 self._slave.state.global_components) 188 189 async def _create_slave_loop(self, retry_count): 190 counter = (range(retry_count + 1) if retry_count is not None 191 else itertools.repeat(None)) 192 193 for count in counter: 194 for addr in self._slave_parents: 195 with contextlib.suppress(Exception): 196 self._slave = await self._create_slave(addr) 197 return 198 199 if count is None or count < retry_count: 200 await asyncio.sleep(self._slave_conf['connect_retry_delay']) 201 202 async def _create_slave(self, addr): 203 try: 204 return await aio.wait_for( 205 hat.monitor.observer.slave.connect( 206 addr, 207 local_components=self._server.state.local_components, 208 state_cb=self._on_slave_state), 209 self._slave_conf['connect_timeout']) 210 211 except aio.CancelledWithResultError as e: 212 if e.result: 213 await aio.uncancellable(e.result.async_close()) 214 raise
Resource with lifetime control based on Group
.