Mon, 23 Nov 2020 21:57:06 +0100
Do not go to the next line if the char to display does not fit
23 | 1 | import sys |
2 | import re | |
3 | import click | |
4 | ||
5 | F = 1e6 | |
6 | ||
7 | reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$') # noqa | |
8 | ||
9 | ||
10 | class ProtocolError(Exception): | |
11 | pass | |
12 | ||
13 | class ProtocolReset(Exception): | |
14 | pass | |
15 | ||
16 | pkts = [] | |
17 | ||
18 | ||
19 | class Packet: | |
20 | def __init__(self, frame, rxtx, value): | |
21 | self.frame = frame | |
22 | self.rxtx = rxtx | |
23 | self.value = value | |
24 | ||
25 | @property | |
26 | def timestamp(self): | |
27 | return self.frame / F * 1000 | |
28 | ||
29 | @property | |
30 | def relts(self): | |
31 | return self.timestamp - pkts[0].timestamp | |
32 | ||
33 | def __str__(self): | |
34 | return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) | |
35 | ||
36 | ||
37 | def read_line(seq): | |
38 | for line in seq: | |
39 | m = reg.match(line) | |
40 | if m: | |
41 | frame = int(m.group('fr1')) | |
42 | v = int(m.group('value'), base=16) | |
43 | rxtx = m.group('rxtx') | |
44 | pkt = Packet(frame, rxtx, v) | |
45 | pkts.append(pkt) | |
46 | yield pkt | |
47 | ||
48 | ||
49 | def read_input(stream=None): | |
50 | if stream is None: | |
51 | stream = sys.stdin | |
52 | for line in stream: | |
53 | yield line | |
54 | ||
55 | ||
56 | def print_input(): | |
57 | t0 = None | |
58 | for pkt in read_line(read_input()): | |
59 | c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' | |
60 | if t0 is None: | |
61 | t0 = pkt.timestamp | |
62 | cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' | |
63 | print('[%s] %06.1f: %02X # %s' % ( | |
64 | cnxdir, (pkt.timestamp-t0), pkt.value, c)) | |
65 | ||
66 | ||
67 | def flipdir(rxtx): | |
68 | return 'rx' if rxtx == 'tx' else 'tx' | |
69 | ||
70 | ||
71 | def check_rxtx(cur, pkt): | |
72 | if cur.rxtx == pkt.rxtx: | |
73 | raise ProtocolError('Expected %s transmission %02X %02X' % | |
74 | (flipdir(cur.rxtx), cur.value, pkt.value)) | |
75 | ||
76 | ||
77 | def check_sot_ack(ack, sot): | |
78 | check_rxtx(ack, sot) | |
79 | if ack.value != (0xFF - sot.value): | |
80 | if ack.value == 0x66: | |
81 | raise ProtocolReset() | |
82 | raise ProtocolError('Expected ACK value %02X, got %02X' % | |
83 | (0xFF - sot.value, ack.value)) | |
84 | ||
85 | ||
86 | def wait_for_sot(seq, sot_pkt=None): | |
87 | if sot_pkt is None: | |
88 | for sot_pkt in seq: | |
89 | if sot_pkt.value not in SOT_VALUES: | |
90 | print('Unexpected value %s: %02X' % | |
91 | (sot_pkt.rxtx, sot_pkt.value)) | |
92 | else: | |
93 | break | |
94 | for ack_pkt in seq: | |
95 | try: | |
96 | check_sot_ack(sot_pkt, ack_pkt) | |
97 | # print('New packet %s' % sot_pkt) | |
98 | return sot_pkt, ack_pkt | |
99 | except ProtocolReset: | |
100 | # print('reset') | |
101 | sot_pkt = ack_pkt | |
102 | raise StopIteration() | |
103 | ||
104 | ||
105 | SOT_VALUES = [0x33, 0x66] | |
106 | ||
107 | ||
108 | def recv_packet(seq): | |
109 | sot_pkt = None | |
110 | while True: | |
111 | sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) | |
112 | payload = [] | |
113 | recv_char = None | |
114 | may_stop = False | |
115 | ||
116 | cur_pkt = ack_pkt | |
117 | for pkt in seq: | |
118 | if pkt.value == 0x66: | |
119 | sot_pkt = pkt | |
120 | break | |
121 | ||
122 | check_rxtx(pkt, cur_pkt) | |
123 | if recv_char is None: | |
124 | if pkt.rxtx != sot_pkt.rxtx: | |
125 | raise ProtocolError() | |
126 | recv_char = pkt.value | |
127 | cur_pkt = pkt | |
128 | if may_stop: | |
129 | if recv_char == 0x55: | |
130 | cur_pkt = None | |
131 | sot_pkt = None | |
132 | break | |
133 | else: | |
134 | if pkt.rxtx == sot_pkt.rxtx: | |
135 | raise ProtocolError() | |
136 | if pkt.value == 0x00: | |
137 | payload.append(recv_char) | |
138 | if check_payload(sot_pkt, payload): | |
139 | may_stop = True | |
140 | yield (sot_pkt, payload) | |
141 | else: | |
142 | may_stop = False | |
143 | elif pkt.value != 0xFF: | |
144 | # if FF, ignore the char (ie. its a NACK, so resend) | |
145 | raise ProtocolError('Not really but hey %02X [%02X] %s' % | |
146 | (pkt.value, recv_char, payload)) | |
147 | recv_char = None | |
148 | cur_pkt = pkt | |
149 | else: | |
150 | break | |
151 | ||
152 | ||
153 | def check_payload(sot, payload): | |
154 | if sot.rxtx == 'tx': | |
155 | if sot.value == 0x33: | |
156 | return len(payload) == 2 | |
157 | else: | |
158 | return len(payload) == 1 | |
159 | else: | |
160 | if len(payload) > 1: | |
161 | return len(payload) == (payload[1]+2) | |
162 | return False | |
163 | ||
164 | ||
165 | KEYCODES = { | |
166 | 0x00: 'View', | |
167 | 0x01: 'Mon', | |
168 | 0x02: 'Sto/Rcl', | |
169 | 0x03: 'Scan', | |
170 | 0x04: 'Alarm', | |
171 | 0x05: 'Mx+B', | |
172 | 0x06: 'Measure', | |
173 | 0x07: 'Interval', | |
174 | 0x08: 'Card Reset', | |
175 | 0x09: 'Close', | |
176 | 0x0A: 'Open', | |
177 | 0x0B: 'Read', | |
178 | 0x0C: 'Shift', | |
179 | 0x0D: 'Write', | |
180 | 0x0E: 'Left', | |
181 | 0x0F: 'Right', | |
182 | 0x10: 'Advanced', | |
183 | 0x11: 'Step', | |
184 | 0x80: 'Knob right', | |
185 | 0x81: 'Knob left', | |
186 | } | |
187 | ||
188 | ||
189 | FLAGS = [ | |
190 | ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], | |
191 | ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'], | |
192 | ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], | |
193 | ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] | |
194 | ] | |
195 | ||
196 | ||
197 | def flag(payload): | |
198 | flags = [] | |
199 | for flg, byte in zip(FLAGS, payload[2:]): | |
200 | for i in range(8): | |
201 | if byte & 2**i: | |
202 | flags.append(flg[i]) | |
203 | return ','.join(flags) | |
204 | ||
205 | ||
206 | def flag_by_num(num): | |
207 | for b in FLAGS: | |
208 | for f in b: | |
209 | if num == 0: | |
210 | return f | |
211 | num -= 1 | |
212 | ||
213 | CMDS = { | |
214 | 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), | |
215 | 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), | |
216 | 0x0A: ('FLAGS ', flag), | |
217 | 0x01: ('UNSHIFT ', None), | |
218 | 0x86: ('SHUTDOWN', None), | |
219 | 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), | |
220 | 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), | |
221 | 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), | |
222 | 0x02: ('RST? ', None), | |
223 | } | |
224 | ||
225 | ||
226 | def parse_packet(sot, payload): | |
227 | if sot.value == 0x33: | |
228 | return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None') | |
229 | if sot.rxtx == 'tx': | |
230 | kc = payload[0] | |
231 | if kc in (0x80, 0x81): | |
232 | return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') | |
233 | if kc & 0x40: | |
234 | kc = kc & 0xBF | |
235 | event = 'RELEASED' | |
236 | else: | |
237 | event = 'PRESSED ' | |
238 | return 'KEY %s\t%s' % (click.style(event, fg='green'), | |
239 | click.style(KEYCODES.get(kc), fg='cyan')) | |
240 | lbl, dspfunc = CMDS[payload[0]] | |
241 | if dspfunc: | |
242 | lbl += '\t%s' % click.style(dspfunc(payload), fg='green') | |
243 | return lbl | |
244 | ||
245 | ||
246 | @click.command() | |
247 | @click.option('-p', '--packets/--no-packets', default=False) | |
248 | def main(packets): | |
249 | if packets: | |
250 | try: | |
251 | for sot, payload in recv_packet(read_line(read_input())): | |
252 | cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP' | |
253 | print('[%s]: %02X=[%s]' % | |
254 | (cnxdir, sot.value, | |
255 | ','.join('%02x' % x for x in payload))) | |
256 | except: | |
257 | print('\n'.join(str(x) for x in pkts[-10:])) | |
258 | raise | |
259 | else: | |
260 | try: | |
261 | t0 = None | |
262 | for sot, payload in recv_packet(read_line(read_input())): | |
263 | cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<' | |
264 | if t0 is None: | |
265 | t0 = sot.timestamp | |
266 | click.secho('%08.0f ' % sot.relts, | |
267 | fg='yellow', nl=False) | |
268 | click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), | |
269 | fg='blue', nl=False) | |
270 | click.secho(cnxdir + ' ', fg='red', nl=False) | |
271 | click.secho(parse_packet(sot, payload), fg='white') | |
272 | t0 = sot.timestamp | |
273 | except: | |
274 | print('\n'.join(str(x) for x in pkts[-10:])) | |
275 | raise | |
276 | ||
277 | ||
278 | if __name__ == '__main__': | |
279 | main() |