hat.monitor.server.main
Monitor server main
1"""Monitor server main""" 2 3from pathlib import Path 4import argparse 5import asyncio 6import contextlib 7import logging.config 8import sys 9 10import appdirs 11 12from hat import aio 13from hat import json 14 15from hat.monitor import common 16import hat.monitor.server.runner 17 18 19mlog: logging.Logger = logging.getLogger('hat.monitor.server.main') 20"""Module logger""" 21 22user_conf_dir: Path = Path(appdirs.user_config_dir('hat')) 23"""User configuration directory path""" 24 25 26def create_argument_parser() -> argparse.ArgumentParser: 27 """Create argument parser""" 28 parser = argparse.ArgumentParser() 29 parser.add_argument( 30 '--conf', metavar='PATH', type=Path, default=None, 31 help="configuration defined by hat-monitor://server.yaml " 32 "(default $XDG_CONFIG_HOME/hat/monitor.{yaml|yml|toml|json})") 33 return parser 34 35 36def main(): 37 """Monitor Server""" 38 parser = create_argument_parser() 39 args = parser.parse_args() 40 conf = json.read_conf(args.conf, user_conf_dir / 'monitor') 41 sync_main(conf) 42 43 44def sync_main(conf: json.Data): 45 """Sync main entry point""" 46 aio.init_asyncio() 47 48 common.json_schema_repo.validate('hat-monitor://server.yaml', conf) 49 50 log_conf = conf.get('log') 51 if log_conf: 52 logging.config.dictConfig(log_conf) 53 54 with contextlib.suppress(asyncio.CancelledError): 55 aio.run_asyncio(async_main(conf)) 56 57 58async def async_main(conf: json.Data): 59 """Async main entry point""" 60 runner = None 61 62 async def cleanup(): 63 if runner: 64 await runner.async_close() 65 66 await asyncio.sleep(0.1) 67 68 try: 69 runner = await hat.monitor.server.runner.create(conf) 70 await runner.wait_closing() 71 72 except Exception as e: 73 mlog.warning('async main error: %s', e, exc_info=e) 74 75 finally: 76 await aio.uncancellable(cleanup()) 77 78 79if __name__ == '__main__': 80 sys.argv[0] = 'hat-monitor' 81 sys.exit(main())
Module logger
user_conf_dir: pathlib.Path =
PosixPath('/home/runner/.config/hat')
User configuration directory path
def
create_argument_parser() -> argparse.ArgumentParser:
27def create_argument_parser() -> argparse.ArgumentParser: 28 """Create argument parser""" 29 parser = argparse.ArgumentParser() 30 parser.add_argument( 31 '--conf', metavar='PATH', type=Path, default=None, 32 help="configuration defined by hat-monitor://server.yaml " 33 "(default $XDG_CONFIG_HOME/hat/monitor.{yaml|yml|toml|json})") 34 return parser
Create argument parser
def
main():
37def main(): 38 """Monitor Server""" 39 parser = create_argument_parser() 40 args = parser.parse_args() 41 conf = json.read_conf(args.conf, user_conf_dir / 'monitor') 42 sync_main(conf)
Monitor Server
def
sync_main( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
45def sync_main(conf: json.Data): 46 """Sync main entry point""" 47 aio.init_asyncio() 48 49 common.json_schema_repo.validate('hat-monitor://server.yaml', conf) 50 51 log_conf = conf.get('log') 52 if log_conf: 53 logging.config.dictConfig(log_conf) 54 55 with contextlib.suppress(asyncio.CancelledError): 56 aio.run_asyncio(async_main(conf))
Sync main entry point
async def
async_main( conf: None | bool | int | float | str | list[ForwardRef('Data')] | dict[str, ForwardRef('Data')]):
59async def async_main(conf: json.Data): 60 """Async main entry point""" 61 runner = None 62 63 async def cleanup(): 64 if runner: 65 await runner.async_close() 66 67 await asyncio.sleep(0.1) 68 69 try: 70 runner = await hat.monitor.server.runner.create(conf) 71 await runner.wait_closing() 72 73 except Exception as e: 74 mlog.warning('async main error: %s', e, exc_info=e) 75 76 finally: 77 await aio.uncancellable(cleanup())
Async main entry point