hat.monitor.observer.common

 1from hat.monitor.common import *  # NOQA
 2
 3from hat import json
 4from hat import sbs
 5from hat.drivers import chatter
 6
 7from hat.monitor.common import (BlessingReq,
 8                                BlessingRes,
 9                                ComponentInfo,
10                                sbs_repo)
11
12
13async def send_msg(conn: chatter.Connection,
14                   msg_type: str,
15                   msg_data: sbs.Data):
16    """Send Observer message"""
17    msg = sbs_repo.encode(msg_type, msg_data)
18    await conn.send(chatter.Data(msg_type, msg))
19
20
21async def receive_msg(conn: chatter.Connection) -> tuple[str, sbs.Data]:
22    """Receive Observer message"""
23    msg = await conn.receive()
24    msg_data = sbs_repo.decode(msg.data.type, msg.data.data)
25    return msg.data.type, msg_data
26
27
28def blessing_req_to_sbs(blessing: BlessingReq) -> sbs.Data:
29    """Convert blessing request to SBS data"""
30    return {'token': _value_to_sbs_optional(blessing.token),
31            'timestamp': _value_to_sbs_optional(blessing.timestamp)}
32
33
34def blessing_req_from_sbs(data: sbs.Data) -> BlessingReq:
35    """Convert SBS data to blessing request"""
36    return BlessingReq(token=_value_from_sbs_maybe(data['token']),
37                       timestamp=_value_from_sbs_maybe(data['timestamp']))
38
39
40def blessing_res_to_sbs(res: BlessingRes) -> sbs.Data:
41    """Convert blessing response to SBS data"""
42    return {'token': _value_to_sbs_optional(res.token),
43            'ready': res.ready}
44
45
46def blessing_res_from_sbs(data: sbs.Data) -> BlessingRes:
47    """Convert SBS data to blessing response"""
48    return BlessingRes(token=_value_from_sbs_maybe(data['token']),
49                       ready=data['ready'])
50
51
52def component_info_to_sbs(info: ComponentInfo) -> sbs.Data:
53    """Convert component info to SBS data"""
54    return {'cid': info.cid,
55            'mid': info.mid,
56            'name': _value_to_sbs_optional(info.name),
57            'group': _value_to_sbs_optional(info.group),
58            'data': json.encode(info.data),
59            'rank': info.rank,
60            'blessingReq': blessing_req_to_sbs(info.blessing_req),
61            'blessingRes': blessing_res_to_sbs(info.blessing_res)}
62
63
64def component_info_from_sbs(data: sbs.Data) -> ComponentInfo:
65    """Convert SBS data to component info"""
66    return ComponentInfo(
67        cid=data['cid'],
68        mid=data['mid'],
69        name=_value_from_sbs_maybe(data['name']),
70        group=_value_from_sbs_maybe(data['group']),
71        data=json.decode(data['data']),
72        rank=data['rank'],
73        blessing_req=blessing_req_from_sbs(data['blessingReq']),
74        blessing_res=blessing_res_from_sbs(data['blessingRes']))
75
76
77def _value_to_sbs_optional(value):
78    return ('value', value) if value is not None else ('none', None)
79
80
81def _value_from_sbs_maybe(maybe):
82    return maybe[1]
async def send_msg( conn: hat.drivers.chatter.Connection, msg_type: str, msg_data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]):
14async def send_msg(conn: chatter.Connection,
15                   msg_type: str,
16                   msg_data: sbs.Data):
17    """Send Observer message"""
18    msg = sbs_repo.encode(msg_type, msg_data)
19    await conn.send(chatter.Data(msg_type, msg))

Send Observer message

async def receive_msg( conn: hat.drivers.chatter.Connection) -> tuple[str, typing.Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, typing.List[ForwardRef('Data')], typing.Dict[str, ForwardRef('Data')], typing.Tuple[str, ForwardRef('Data')]]]:
22async def receive_msg(conn: chatter.Connection) -> tuple[str, sbs.Data]:
23    """Receive Observer message"""
24    msg = await conn.receive()
25    msg_data = sbs_repo.decode(msg.data.type, msg.data.data)
26    return msg.data.type, msg_data

Receive Observer message

def blessing_req_to_sbs( blessing: hat.monitor.common.BlessingReq) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
29def blessing_req_to_sbs(blessing: BlessingReq) -> sbs.Data:
30    """Convert blessing request to SBS data"""
31    return {'token': _value_to_sbs_optional(blessing.token),
32            'timestamp': _value_to_sbs_optional(blessing.timestamp)}

Convert blessing request to SBS data

def blessing_req_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.BlessingReq:
35def blessing_req_from_sbs(data: sbs.Data) -> BlessingReq:
36    """Convert SBS data to blessing request"""
37    return BlessingReq(token=_value_from_sbs_maybe(data['token']),
38                       timestamp=_value_from_sbs_maybe(data['timestamp']))

Convert SBS data to blessing request

def blessing_res_to_sbs( res: hat.monitor.common.BlessingRes) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
41def blessing_res_to_sbs(res: BlessingRes) -> sbs.Data:
42    """Convert blessing response to SBS data"""
43    return {'token': _value_to_sbs_optional(res.token),
44            'ready': res.ready}

Convert blessing response to SBS data

def blessing_res_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.BlessingRes:
47def blessing_res_from_sbs(data: sbs.Data) -> BlessingRes:
48    """Convert SBS data to blessing response"""
49    return BlessingRes(token=_value_from_sbs_maybe(data['token']),
50                       ready=data['ready'])

Convert SBS data to blessing response

def component_info_to_sbs( info: hat.monitor.common.ComponentInfo) -> Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]:
53def component_info_to_sbs(info: ComponentInfo) -> sbs.Data:
54    """Convert component info to SBS data"""
55    return {'cid': info.cid,
56            'mid': info.mid,
57            'name': _value_to_sbs_optional(info.name),
58            'group': _value_to_sbs_optional(info.group),
59            'data': json.encode(info.data),
60            'rank': info.rank,
61            'blessingReq': blessing_req_to_sbs(info.blessing_req),
62            'blessingRes': blessing_res_to_sbs(info.blessing_res)}

Convert component info to SBS data

def component_info_from_sbs( data: Union[NoneType, bool, int, float, str, bytes, bytearray, memoryview, List[ForwardRef('Data')], Dict[str, ForwardRef('Data')], Tuple[str, ForwardRef('Data')]]) -> hat.monitor.common.ComponentInfo:
65def component_info_from_sbs(data: sbs.Data) -> ComponentInfo:
66    """Convert SBS data to component info"""
67    return ComponentInfo(
68        cid=data['cid'],
69        mid=data['mid'],
70        name=_value_from_sbs_maybe(data['name']),
71        group=_value_from_sbs_maybe(data['group']),
72        data=json.decode(data['data']),
73        rank=data['rank'],
74        blessing_req=blessing_req_from_sbs(data['blessingReq']),
75        blessing_res=blessing_res_from_sbs(data['blessingRes']))

Convert SBS data to component info