hat.monitor.server.runner
1import asyncio 2import contextlib 3import itertools 4import logging 5 6from hat import aio 7from hat import json 8from hat.drivers import tcp 9 10import hat.monitor.observer.master 11import hat.monitor.observer.server 12import hat.monitor.observer.slave 13import hat.monitor.server.blessing 14import hat.monitor.server.ui 15 16 17mlog: logging.Logger = logging.getLogger(__name__) 18"""Module logger""" 19 20 21async def create(conf: json.Data) -> 'Runner': 22 runner = Runner() 23 runner._loop = asyncio.get_running_loop() 24 runner._async_group = aio.Group() 25 runner._server = None 26 runner._master = None 27 runner._ui = None 28 runner._slave = None 29 runner._slave_conf = conf['slave'] 30 runner._slave_parents = [tcp.Address(i['host'], i['port']) 31 for i in conf['slave']['parents']] 32 runner._default_algorithm = hat.monitor.server.blessing.Algorithm( 33 conf['default_algorithm']) 34 runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v) 35 for k, v in conf['group_algorithms'].items()} 36 37 runner.async_group.spawn(aio.call_on_cancel, runner._on_close) 38 39 try: 40 mlog.debug('starting server') 41 runner._server = await hat.monitor.observer.server.listen( 42 tcp.Address(conf['server']['host'], conf['server']['port']), 43 default_rank=conf['server']['default_rank'], 44 state_cb=runner._on_server_state) 45 runner._bind_resource(runner._server) 46 47 mlog.debug('starting master') 48 runner._master = await hat.monitor.observer.master.listen( 49 tcp.Address(conf['master']['host'], conf['master']['port']), 50 local_components=runner._server.state.local_components, 51 global_components_cb=runner._on_master_global_components, 52 blessing_cb=runner._calculate_blessing) 53 runner._bind_resource(runner._master) 54 55 ui_conf = conf.get('ui') 56 if ui_conf: 57 mlog.debug('starting ui') 58 runner._ui = await hat.monitor.server.ui.create( 59 ui_conf['host'], 60 ui_conf['port'], 61 runner._server.state, 62 set_rank_cb=runner._on_ui_set_rank) 63 runner._bind_resource(runner._ui) 64 65 runner.async_group.spawn(runner._runner_loop) 66 67 except BaseException: 68 await aio.uncancellable(runner.async_close()) 69 raise 70 71 return runner 72 73 74class Runner(aio.Resource): 75 76 @property 77 def async_group(self): 78 return self._async_group 79 80 async def _on_close(self): 81 if self._ui: 82 await self._ui.async_close() 83 84 if self._server: 85 await self._server.async_close() 86 87 if self._master: 88 await self._master.async_close() 89 90 if self._slave: 91 await self._slave.async_close() 92 93 async def _on_server_state(self, server, state): 94 if self._ui: 95 self._ui.set_state(state) 96 97 if self._master: 98 await self._master.set_local_components(state.local_components) 99 100 if self._slave and self._slave.is_open: 101 with contextlib.suppress(ConnectionError): 102 await self._slave.update(state.local_components) 103 104 async def _on_master_global_components(self, master, global_components): 105 if self._server and master.is_active: 106 await self._server.update(0, global_components) 107 108 async def _on_ui_set_rank(self, ui, cid, rank): 109 if self._server: 110 await self._server.set_rank(cid, rank) 111 112 async def _on_slave_state(self, slave, state): 113 if (self._server and 114 self._master and 115 not self._master.is_active and 116 state.mid is not None): 117 await self._server.update(state.mid, state.global_components) 118 119 async def _runner_loop(self): 120 try: 121 await self._set_master_active(False) 122 123 if not self._slave_parents: 124 self._master.set_active(True) 125 await self._loop.create_future() 126 127 while True: 128 if not self._slave: 129 await self._server.update(0, []) 130 await self._create_slave_loop( 131 self._slave_conf['connect_retry_count']) 132 133 if self._slave and self._slave.is_open: 134 await self._set_master_active(False) 135 await self._slave.wait_closed() 136 137 elif self._slave: 138 await self._slave.async_close() 139 self._slave = None 140 141 else: 142 mlog.debug('no master detected - activating local master') 143 await self._set_master_active(True) 144 await self._create_slave_loop(None) 145 146 except ConnectionError: 147 pass 148 149 except Exception as e: 150 mlog.error('runner loop error: %s', e, exc_info=e) 151 152 finally: 153 self.close() 154 155 def _bind_resource(self, resource): 156 self.async_group.spawn(aio.call_on_done, resource.wait_closing(), 157 self.close) 158 159 def _calculate_blessing(self, master, components): 160 return hat.monitor.server.blessing.calculate( 161 components=components, 162 default_algorithm=self._default_algorithm, 163 group_algorithms=self._group_algorithms) 164 165 async def _set_master_active(self, active): 166 self._master.set_active(active) 167 await self._on_server_state(self._server, self._server.state) 168 169 if active: 170 await self._server.update(0, self._master.global_components) 171 172 elif self._slave and self._slave.state.mid is not None: 173 await self._server.update(self._slave.state.mid, 174 self._slave.state.global_components) 175 176 async def _create_slave_loop(self, retry_count): 177 counter = (range(retry_count + 1) if retry_count is not None 178 else itertools.repeat(None)) 179 180 for count in counter: 181 for addr in self._slave_parents: 182 with contextlib.suppress(Exception): 183 self._slave = await self._create_slave(addr) 184 return 185 186 if count is None or count < retry_count: 187 await asyncio.sleep(self._slave_conf['connect_retry_delay']) 188 189 async def _create_slave(self, addr): 190 try: 191 return await aio.wait_for( 192 hat.monitor.observer.slave.connect( 193 addr, 194 local_components=self._server.state.local_components, 195 state_cb=self._on_slave_state), 196 self._slave_conf['connect_timeout']) 197 198 except aio.CancelledWithResultError as e: 199 if e.result: 200 await aio.uncancellable(e.result.async_close()) 201 raise
Module logger
async def
create( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]) -> Runner:
22async def create(conf: json.Data) -> 'Runner': 23 runner = Runner() 24 runner._loop = asyncio.get_running_loop() 25 runner._async_group = aio.Group() 26 runner._server = None 27 runner._master = None 28 runner._ui = None 29 runner._slave = None 30 runner._slave_conf = conf['slave'] 31 runner._slave_parents = [tcp.Address(i['host'], i['port']) 32 for i in conf['slave']['parents']] 33 runner._default_algorithm = hat.monitor.server.blessing.Algorithm( 34 conf['default_algorithm']) 35 runner._group_algorithms = {k: hat.monitor.server.blessing.Algorithm(v) 36 for k, v in conf['group_algorithms'].items()} 37 38 runner.async_group.spawn(aio.call_on_cancel, runner._on_close) 39 40 try: 41 mlog.debug('starting server') 42 runner._server = await hat.monitor.observer.server.listen( 43 tcp.Address(conf['server']['host'], conf['server']['port']), 44 default_rank=conf['server']['default_rank'], 45 state_cb=runner._on_server_state) 46 runner._bind_resource(runner._server) 47 48 mlog.debug('starting master') 49 runner._master = await hat.monitor.observer.master.listen( 50 tcp.Address(conf['master']['host'], conf['master']['port']), 51 local_components=runner._server.state.local_components, 52 global_components_cb=runner._on_master_global_components, 53 blessing_cb=runner._calculate_blessing) 54 runner._bind_resource(runner._master) 55 56 ui_conf = conf.get('ui') 57 if ui_conf: 58 mlog.debug('starting ui') 59 runner._ui = await hat.monitor.server.ui.create( 60 ui_conf['host'], 61 ui_conf['port'], 62 runner._server.state, 63 set_rank_cb=runner._on_ui_set_rank) 64 runner._bind_resource(runner._ui) 65 66 runner.async_group.spawn(runner._runner_loop) 67 68 except BaseException: 69 await aio.uncancellable(runner.async_close()) 70 raise 71 72 return runner
class
Runner(hat.aio.group.Resource):
75class Runner(aio.Resource): 76 77 @property 78 def async_group(self): 79 return self._async_group 80 81 async def _on_close(self): 82 if self._ui: 83 await self._ui.async_close() 84 85 if self._server: 86 await self._server.async_close() 87 88 if self._master: 89 await self._master.async_close() 90 91 if self._slave: 92 await self._slave.async_close() 93 94 async def _on_server_state(self, server, state): 95 if self._ui: 96 self._ui.set_state(state) 97 98 if self._master: 99 await self._master.set_local_components(state.local_components) 100 101 if self._slave and self._slave.is_open: 102 with contextlib.suppress(ConnectionError): 103 await self._slave.update(state.local_components) 104 105 async def _on_master_global_components(self, master, global_components): 106 if self._server and master.is_active: 107 await self._server.update(0, global_components) 108 109 async def _on_ui_set_rank(self, ui, cid, rank): 110 if self._server: 111 await self._server.set_rank(cid, rank) 112 113 async def _on_slave_state(self, slave, state): 114 if (self._server and 115 self._master and 116 not self._master.is_active and 117 state.mid is not None): 118 await self._server.update(state.mid, state.global_components) 119 120 async def _runner_loop(self): 121 try: 122 await self._set_master_active(False) 123 124 if not self._slave_parents: 125 self._master.set_active(True) 126 await self._loop.create_future() 127 128 while True: 129 if not self._slave: 130 await self._server.update(0, []) 131 await self._create_slave_loop( 132 self._slave_conf['connect_retry_count']) 133 134 if self._slave and self._slave.is_open: 135 await self._set_master_active(False) 136 await self._slave.wait_closed() 137 138 elif self._slave: 139 await self._slave.async_close() 140 self._slave = None 141 142 else: 143 mlog.debug('no master detected - activating local master') 144 await self._set_master_active(True) 145 await self._create_slave_loop(None) 146 147 except ConnectionError: 148 pass 149 150 except Exception as e: 151 mlog.error('runner loop error: %s', e, exc_info=e) 152 153 finally: 154 self.close() 155 156 def _bind_resource(self, resource): 157 self.async_group.spawn(aio.call_on_done, resource.wait_closing(), 158 self.close) 159 160 def _calculate_blessing(self, master, components): 161 return hat.monitor.server.blessing.calculate( 162 components=components, 163 default_algorithm=self._default_algorithm, 164 group_algorithms=self._group_algorithms) 165 166 async def _set_master_active(self, active): 167 self._master.set_active(active) 168 await self._on_server_state(self._server, self._server.state) 169 170 if active: 171 await self._server.update(0, self._master.global_components) 172 173 elif self._slave and self._slave.state.mid is not None: 174 await self._server.update(self._slave.state.mid, 175 self._slave.state.global_components) 176 177 async def _create_slave_loop(self, retry_count): 178 counter = (range(retry_count + 1) if retry_count is not None 179 else itertools.repeat(None)) 180 181 for count in counter: 182 for addr in self._slave_parents: 183 with contextlib.suppress(Exception): 184 self._slave = await self._create_slave(addr) 185 return 186 187 if count is None or count < retry_count: 188 await asyncio.sleep(self._slave_conf['connect_retry_delay']) 189 190 async def _create_slave(self, addr): 191 try: 192 return await aio.wait_for( 193 hat.monitor.observer.slave.connect( 194 addr, 195 local_components=self._server.state.local_components, 196 state_cb=self._on_slave_state), 197 self._slave_conf['connect_timeout']) 198 199 except aio.CancelledWithResultError as e: 200 if e.result: 201 await aio.uncancellable(e.result.async_close()) 202 raise
Resource with lifetime control based on Group
.
Inherited Members
- hat.aio.group.Resource
- is_open
- is_closing
- is_closed
- wait_closing
- wait_closed
- close
- async_close