|
1 import sys |
|
2 import re |
|
3 import click |
|
4 |
|
5 F = 1e6 |
|
6 |
|
7 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$') # noqa |
|
8 |
|
9 |
|
10 class ProtocolError(Exception): |
|
11 pass |
|
12 |
|
13 class ProtocolReset(Exception): |
|
14 pass |
|
15 |
|
16 pkts = [] |
|
17 |
|
18 |
|
19 class Packet: |
|
20 def __init__(self, frame, rxtx, value): |
|
21 self.frame = frame |
|
22 self.rxtx = rxtx |
|
23 self.value = value |
|
24 |
|
25 @property |
|
26 def timestamp(self): |
|
27 return self.frame / F * 1000 |
|
28 |
|
29 @property |
|
30 def relts(self): |
|
31 return self.timestamp - pkts[0].timestamp |
|
32 |
|
33 def __str__(self): |
|
34 return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) |
|
35 |
|
36 |
|
37 def read_line(seq): |
|
38 for line in seq: |
|
39 m = reg.match(line) |
|
40 if m: |
|
41 frame = int(m.group('fr1')) |
|
42 v = int(m.group('value'), base=16) |
|
43 rxtx = m.group('rxtx') |
|
44 pkt = Packet(frame, rxtx, v) |
|
45 pkts.append(pkt) |
|
46 yield pkt |
|
47 |
|
48 |
|
49 def read_input(stream=None): |
|
50 if stream is None: |
|
51 stream = sys.stdin |
|
52 for line in stream: |
|
53 yield line |
|
54 |
|
55 |
|
56 def print_input(): |
|
57 t0 = None |
|
58 for pkt in read_line(read_input()): |
|
59 c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' |
|
60 if t0 is None: |
|
61 t0 = pkt.timestamp |
|
62 cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' |
|
63 print('[%s] %06.1f: %02X # %s' % ( |
|
64 cnxdir, (pkt.timestamp-t0), pkt.value, c)) |
|
65 |
|
66 |
|
67 def flipdir(rxtx): |
|
68 return 'rx' if rxtx == 'tx' else 'tx' |
|
69 |
|
70 |
|
71 def check_rxtx(cur, pkt): |
|
72 if cur.rxtx == pkt.rxtx: |
|
73 raise ProtocolError('Expected %s transmission %02X %02X' % |
|
74 (flipdir(cur.rxtx), cur.value, pkt.value)) |
|
75 |
|
76 |
|
77 def check_sot_ack(ack, sot): |
|
78 check_rxtx(ack, sot) |
|
79 if ack.value != (0xFF - sot.value): |
|
80 if ack.value == 0x66: |
|
81 raise ProtocolReset() |
|
82 raise ProtocolError('Expected ACK value %02X, got %02X' % |
|
83 (0xFF - sot.value, ack.value)) |
|
84 |
|
85 |
|
86 def wait_for_sot(seq, sot_pkt=None): |
|
87 if sot_pkt is None: |
|
88 for sot_pkt in seq: |
|
89 if sot_pkt.value not in SOT_VALUES: |
|
90 print('Unexpected value %s: %02X' % |
|
91 (sot_pkt.rxtx, sot_pkt.value)) |
|
92 else: |
|
93 break |
|
94 for ack_pkt in seq: |
|
95 try: |
|
96 check_sot_ack(sot_pkt, ack_pkt) |
|
97 # print('New packet %s' % sot_pkt) |
|
98 return sot_pkt, ack_pkt |
|
99 except ProtocolReset: |
|
100 # print('reset') |
|
101 sot_pkt = ack_pkt |
|
102 raise StopIteration() |
|
103 |
|
104 |
|
105 SOT_VALUES = [0x33, 0x66] |
|
106 |
|
107 |
|
108 def recv_packet(seq): |
|
109 sot_pkt = None |
|
110 while True: |
|
111 sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) |
|
112 payload = [] |
|
113 recv_char = None |
|
114 may_stop = False |
|
115 |
|
116 cur_pkt = ack_pkt |
|
117 for pkt in seq: |
|
118 if pkt.value == 0x66: |
|
119 sot_pkt = pkt |
|
120 break |
|
121 |
|
122 check_rxtx(pkt, cur_pkt) |
|
123 if recv_char is None: |
|
124 if pkt.rxtx != sot_pkt.rxtx: |
|
125 raise ProtocolError() |
|
126 recv_char = pkt.value |
|
127 cur_pkt = pkt |
|
128 if may_stop: |
|
129 if recv_char == 0x55: |
|
130 cur_pkt = None |
|
131 sot_pkt = None |
|
132 break |
|
133 else: |
|
134 if pkt.rxtx == sot_pkt.rxtx: |
|
135 raise ProtocolError() |
|
136 if pkt.value == 0x00: |
|
137 payload.append(recv_char) |
|
138 if check_payload(sot_pkt, payload): |
|
139 may_stop = True |
|
140 yield (sot_pkt, payload) |
|
141 else: |
|
142 may_stop = False |
|
143 elif pkt.value != 0xFF: |
|
144 # if FF, ignore the char (ie. its a NACK, so resend) |
|
145 raise ProtocolError('Not really but hey %02X [%02X] %s' % |
|
146 (pkt.value, recv_char, payload)) |
|
147 recv_char = None |
|
148 cur_pkt = pkt |
|
149 else: |
|
150 break |
|
151 |
|
152 |
|
153 def check_payload(sot, payload): |
|
154 if sot.rxtx == 'tx': |
|
155 if sot.value == 0x33: |
|
156 return len(payload) == 2 |
|
157 else: |
|
158 return len(payload) == 1 |
|
159 else: |
|
160 if len(payload) > 1: |
|
161 return len(payload) == (payload[1]+2) |
|
162 return False |
|
163 |
|
164 |
|
165 KEYCODES = { |
|
166 0x00: 'View', |
|
167 0x01: 'Mon', |
|
168 0x02: 'Sto/Rcl', |
|
169 0x03: 'Scan', |
|
170 0x04: 'Alarm', |
|
171 0x05: 'Mx+B', |
|
172 0x06: 'Measure', |
|
173 0x07: 'Interval', |
|
174 0x08: 'Card Reset', |
|
175 0x09: 'Close', |
|
176 0x0A: 'Open', |
|
177 0x0B: 'Read', |
|
178 0x0C: 'Shift', |
|
179 0x0D: 'Write', |
|
180 0x0E: 'Left', |
|
181 0x0F: 'Right', |
|
182 0x10: 'Advanced', |
|
183 0x11: 'Step', |
|
184 0x80: 'Knob right', |
|
185 0x81: 'Knob left', |
|
186 } |
|
187 |
|
188 |
|
189 FLAGS = [ |
|
190 ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], |
|
191 ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'], |
|
192 ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], |
|
193 ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] |
|
194 ] |
|
195 |
|
196 |
|
197 def flag(payload): |
|
198 flags = [] |
|
199 for flg, byte in zip(FLAGS, payload[2:]): |
|
200 for i in range(8): |
|
201 if byte & 2**i: |
|
202 flags.append(flg[i]) |
|
203 return ','.join(flags) |
|
204 |
|
205 |
|
206 def flag_by_num(num): |
|
207 for b in FLAGS: |
|
208 for f in b: |
|
209 if num == 0: |
|
210 return f |
|
211 num -= 1 |
|
212 |
|
213 CMDS = { |
|
214 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), |
|
215 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), |
|
216 0x0A: ('FLAGS ', flag), |
|
217 0x01: ('UNSHIFT ', None), |
|
218 0x86: ('SHUTDOWN', None), |
|
219 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), |
|
220 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), |
|
221 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), |
|
222 0x02: ('RST? ', None), |
|
223 } |
|
224 |
|
225 |
|
226 def parse_packet(sot, payload): |
|
227 if sot.value == 0x33: |
|
228 return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None') |
|
229 if sot.rxtx == 'tx': |
|
230 kc = payload[0] |
|
231 if kc in (0x80, 0x81): |
|
232 return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') |
|
233 if kc & 0x40: |
|
234 kc = kc & 0xBF |
|
235 event = 'RELEASED' |
|
236 else: |
|
237 event = 'PRESSED ' |
|
238 return 'KEY %s\t%s' % (click.style(event, fg='green'), |
|
239 click.style(KEYCODES.get(kc), fg='cyan')) |
|
240 lbl, dspfunc = CMDS[payload[0]] |
|
241 if dspfunc: |
|
242 lbl += '\t%s' % click.style(dspfunc(payload), fg='green') |
|
243 return lbl |
|
244 |
|
245 |
|
246 @click.command() |
|
247 @click.option('-p', '--packets/--no-packets', default=False) |
|
248 def main(packets): |
|
249 if packets: |
|
250 try: |
|
251 for sot, payload in recv_packet(read_line(read_input())): |
|
252 cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP' |
|
253 print('[%s]: %02X=[%s]' % |
|
254 (cnxdir, sot.value, |
|
255 ','.join('%02x' % x for x in payload))) |
|
256 except: |
|
257 print('\n'.join(str(x) for x in pkts[-10:])) |
|
258 raise |
|
259 else: |
|
260 try: |
|
261 t0 = None |
|
262 for sot, payload in recv_packet(read_line(read_input())): |
|
263 cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<' |
|
264 if t0 is None: |
|
265 t0 = sot.timestamp |
|
266 click.secho('%08.0f ' % sot.relts, |
|
267 fg='yellow', nl=False) |
|
268 click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), |
|
269 fg='blue', nl=False) |
|
270 click.secho(cnxdir + ' ', fg='red', nl=False) |
|
271 click.secho(parse_packet(sot, payload), fg='white') |
|
272 t0 = sot.timestamp |
|
273 except: |
|
274 print('\n'.join(str(x) for x in pkts[-10:])) |
|
275 raise |
|
276 |
|
277 |
|
278 if __name__ == '__main__': |
|
279 main() |