hat.monitor.observer.common
1from hat.monitor.common import * # NOQA 2 3from hat import json 4from hat import sbs 5from hat.drivers import chatter 6 7from hat.monitor.common import (BlessingReq, 8 BlessingRes, 9 ComponentInfo, 10 sbs_repo) 11 12 13async def send_msg(conn: chatter.Connection, 14 msg_type: str, 15 msg_data: sbs.Data): 16 """Send Observer message""" 17 msg = sbs_repo.encode(msg_type, msg_data) 18 await conn.send(chatter.Data(msg_type, msg)) 19 20 21async def receive_msg(conn: chatter.Connection) -> tuple[str, sbs.Data]: 22 """Receive Observer message""" 23 msg = await conn.receive() 24 msg_data = sbs_repo.decode(msg.data.type, msg.data.data) 25 return msg.data.type, msg_data 26 27 28def blessing_req_to_sbs(blessing: BlessingReq) -> sbs.Data: 29 """Convert blessing request to SBS data""" 30 return {'token': _value_to_sbs_optional(blessing.token), 31 'timestamp': _value_to_sbs_optional(blessing.timestamp)} 32 33 34def blessing_req_from_sbs(data: sbs.Data) -> BlessingReq: 35 """Convert SBS data to blessing request""" 36 return BlessingReq(token=_value_from_sbs_maybe(data['token']), 37 timestamp=_value_from_sbs_maybe(data['timestamp'])) 38 39 40def blessing_res_to_sbs(res: BlessingRes) -> sbs.Data: 41 """Convert blessing response to SBS data""" 42 return {'token': _value_to_sbs_optional(res.token), 43 'ready': res.ready} 44 45 46def blessing_res_from_sbs(data: sbs.Data) -> BlessingRes: 47 """Convert SBS data to blessing response""" 48 return BlessingRes(token=_value_from_sbs_maybe(data['token']), 49 ready=data['ready']) 50 51 52def component_info_to_sbs(info: ComponentInfo) -> sbs.Data: 53 """Convert component info to SBS data""" 54 return {'cid': info.cid, 55 'mid': info.mid, 56 'name': _value_to_sbs_optional(info.name), 57 'group': _value_to_sbs_optional(info.group), 58 'data': json.encode(info.data), 59 'rank': info.rank, 60 'blessingReq': blessing_req_to_sbs(info.blessing_req), 61 'blessingRes': blessing_res_to_sbs(info.blessing_res)} 62 63 64def component_info_from_sbs(data: sbs.Data) -> ComponentInfo: 65 """Convert SBS data to component info""" 66 return ComponentInfo( 67 cid=data['cid'], 68 mid=data['mid'], 69 name=_value_from_sbs_maybe(data['name']), 70 group=_value_from_sbs_maybe(data['group']), 71 data=json.decode(data['data']), 72 rank=data['rank'], 73 blessing_req=blessing_req_from_sbs(data['blessingReq']), 74 blessing_res=blessing_res_from_sbs(data['blessingRes'])) 75 76 77def _value_to_sbs_optional(value): 78 return ('value', value) if value is not None else ('none', None) 79 80 81def _value_from_sbs_maybe(maybe): 82 return maybe[1]
async def
send_msg( conn: hat.drivers.chatter.Connection, msg_type: str, msg_data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]):
14async def send_msg(conn: chatter.Connection, 15 msg_type: str, 16 msg_data: sbs.Data): 17 """Send Observer message""" 18 msg = sbs_repo.encode(msg_type, msg_data) 19 await conn.send(chatter.Data(msg_type, msg))
Send Observer message
async def
receive_msg( conn: hat.drivers.chatter.Connection) -> tuple[str, typing.Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]]:
22async def receive_msg(conn: chatter.Connection) -> tuple[str, sbs.Data]: 23 """Receive Observer message""" 24 msg = await conn.receive() 25 msg_data = sbs_repo.decode(msg.data.type, msg.data.data) 26 return msg.data.type, msg_data
Receive Observer message
def
blessing_req_to_sbs( blessing: hat.monitor.common.BlessingReq) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
29def blessing_req_to_sbs(blessing: BlessingReq) -> sbs.Data: 30 """Convert blessing request to SBS data""" 31 return {'token': _value_to_sbs_optional(blessing.token), 32 'timestamp': _value_to_sbs_optional(blessing.timestamp)}
Convert blessing request to SBS data
def
blessing_req_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.BlessingReq:
35def blessing_req_from_sbs(data: sbs.Data) -> BlessingReq: 36 """Convert SBS data to blessing request""" 37 return BlessingReq(token=_value_from_sbs_maybe(data['token']), 38 timestamp=_value_from_sbs_maybe(data['timestamp']))
Convert SBS data to blessing request
def
blessing_res_to_sbs( res: hat.monitor.common.BlessingRes) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
41def blessing_res_to_sbs(res: BlessingRes) -> sbs.Data: 42 """Convert blessing response to SBS data""" 43 return {'token': _value_to_sbs_optional(res.token), 44 'ready': res.ready}
Convert blessing response to SBS data
def
blessing_res_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.BlessingRes:
47def blessing_res_from_sbs(data: sbs.Data) -> BlessingRes: 48 """Convert SBS data to blessing response""" 49 return BlessingRes(token=_value_from_sbs_maybe(data['token']), 50 ready=data['ready'])
Convert SBS data to blessing response
def
component_info_to_sbs( info: hat.monitor.common.ComponentInfo) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
53def component_info_to_sbs(info: ComponentInfo) -> sbs.Data: 54 """Convert component info to SBS data""" 55 return {'cid': info.cid, 56 'mid': info.mid, 57 'name': _value_to_sbs_optional(info.name), 58 'group': _value_to_sbs_optional(info.group), 59 'data': json.encode(info.data), 60 'rank': info.rank, 61 'blessingReq': blessing_req_to_sbs(info.blessing_req), 62 'blessingRes': blessing_res_to_sbs(info.blessing_res)}
Convert component info to SBS data
def
component_info_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.ComponentInfo:
65def component_info_from_sbs(data: sbs.Data) -> ComponentInfo: 66 """Convert SBS data to component info""" 67 return ComponentInfo( 68 cid=data['cid'], 69 mid=data['mid'], 70 name=_value_from_sbs_maybe(data['name']), 71 group=_value_from_sbs_maybe(data['group']), 72 data=json.decode(data['data']), 73 rank=data['rank'], 74 blessing_req=blessing_req_from_sbs(data['blessingReq']), 75 blessing_res=blessing_res_from_sbs(data['blessingRes']))
Convert SBS data to component info