serialdata/uart_filter.py

changeset 23
daf26b083899
equal deleted inserted replaced
22:2f51221af82d 23:daf26b083899
1 import sys
2 import re
3 import click
4
5 F = 1e6
6
7 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$') # noqa
8
9
10 class ProtocolError(Exception):
11 pass
12
13 class ProtocolReset(Exception):
14 pass
15
16 pkts = []
17
18
19 class Packet:
20 def __init__(self, frame, rxtx, value):
21 self.frame = frame
22 self.rxtx = rxtx
23 self.value = value
24
25 @property
26 def timestamp(self):
27 return self.frame / F * 1000
28
29 @property
30 def relts(self):
31 return self.timestamp - pkts[0].timestamp
32
33 def __str__(self):
34 return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)
35
36
37 def read_line(seq):
38 for line in seq:
39 m = reg.match(line)
40 if m:
41 frame = int(m.group('fr1'))
42 v = int(m.group('value'), base=16)
43 rxtx = m.group('rxtx')
44 pkt = Packet(frame, rxtx, v)
45 pkts.append(pkt)
46 yield pkt
47
48
49 def read_input(stream=None):
50 if stream is None:
51 stream = sys.stdin
52 for line in stream:
53 yield line
54
55
56 def print_input():
57 t0 = None
58 for pkt in read_line(read_input()):
59 c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
60 if t0 is None:
61 t0 = pkt.timestamp
62 cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
63 print('[%s] %06.1f: %02X # %s' % (
64 cnxdir, (pkt.timestamp-t0), pkt.value, c))
65
66
67 def flipdir(rxtx):
68 return 'rx' if rxtx == 'tx' else 'tx'
69
70
71 def check_rxtx(cur, pkt):
72 if cur.rxtx == pkt.rxtx:
73 raise ProtocolError('Expected %s transmission %02X %02X' %
74 (flipdir(cur.rxtx), cur.value, pkt.value))
75
76
77 def check_sot_ack(ack, sot):
78 check_rxtx(ack, sot)
79 if ack.value != (0xFF - sot.value):
80 if ack.value == 0x66:
81 raise ProtocolReset()
82 raise ProtocolError('Expected ACK value %02X, got %02X' %
83 (0xFF - sot.value, ack.value))
84
85
86 def wait_for_sot(seq, sot_pkt=None):
87 if sot_pkt is None:
88 for sot_pkt in seq:
89 if sot_pkt.value not in SOT_VALUES:
90 print('Unexpected value %s: %02X' %
91 (sot_pkt.rxtx, sot_pkt.value))
92 else:
93 break
94 for ack_pkt in seq:
95 try:
96 check_sot_ack(sot_pkt, ack_pkt)
97 # print('New packet %s' % sot_pkt)
98 return sot_pkt, ack_pkt
99 except ProtocolReset:
100 # print('reset')
101 sot_pkt = ack_pkt
102 raise StopIteration()
103
104
105 SOT_VALUES = [0x33, 0x66]
106
107
108 def recv_packet(seq):
109 sot_pkt = None
110 while True:
111 sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
112 payload = []
113 recv_char = None
114 may_stop = False
115
116 cur_pkt = ack_pkt
117 for pkt in seq:
118 if pkt.value == 0x66:
119 sot_pkt = pkt
120 break
121
122 check_rxtx(pkt, cur_pkt)
123 if recv_char is None:
124 if pkt.rxtx != sot_pkt.rxtx:
125 raise ProtocolError()
126 recv_char = pkt.value
127 cur_pkt = pkt
128 if may_stop:
129 if recv_char == 0x55:
130 cur_pkt = None
131 sot_pkt = None
132 break
133 else:
134 if pkt.rxtx == sot_pkt.rxtx:
135 raise ProtocolError()
136 if pkt.value == 0x00:
137 payload.append(recv_char)
138 if check_payload(sot_pkt, payload):
139 may_stop = True
140 yield (sot_pkt, payload)
141 else:
142 may_stop = False
143 elif pkt.value != 0xFF:
144 # if FF, ignore the char (ie. its a NACK, so resend)
145 raise ProtocolError('Not really but hey %02X [%02X] %s' %
146 (pkt.value, recv_char, payload))
147 recv_char = None
148 cur_pkt = pkt
149 else:
150 break
151
152
153 def check_payload(sot, payload):
154 if sot.rxtx == 'tx':
155 if sot.value == 0x33:
156 return len(payload) == 2
157 else:
158 return len(payload) == 1
159 else:
160 if len(payload) > 1:
161 return len(payload) == (payload[1]+2)
162 return False
163
164
165 KEYCODES = {
166 0x00: 'View',
167 0x01: 'Mon',
168 0x02: 'Sto/Rcl',
169 0x03: 'Scan',
170 0x04: 'Alarm',
171 0x05: 'Mx+B',
172 0x06: 'Measure',
173 0x07: 'Interval',
174 0x08: 'Card Reset',
175 0x09: 'Close',
176 0x0A: 'Open',
177 0x0B: 'Read',
178 0x0C: 'Shift',
179 0x0D: 'Write',
180 0x0E: 'Left',
181 0x0F: 'Right',
182 0x10: 'Advanced',
183 0x11: 'Step',
184 0x80: 'Knob right',
185 0x81: 'Knob left',
186 }
187
188
189 FLAGS = [
190 ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
191 ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'],
192 ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
193 ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
194 ]
195
196
197 def flag(payload):
198 flags = []
199 for flg, byte in zip(FLAGS, payload[2:]):
200 for i in range(8):
201 if byte & 2**i:
202 flags.append(flg[i])
203 return ','.join(flags)
204
205
206 def flag_by_num(num):
207 for b in FLAGS:
208 for f in b:
209 if num == 0:
210 return f
211 num -= 1
212
213 CMDS = {
214 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
215 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
216 0x0A: ('FLAGS ', flag),
217 0x01: ('UNSHIFT ', None),
218 0x86: ('SHUTDOWN', None),
219 0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
220 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
221 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
222 0x02: ('RST? ', None),
223 }
224
225
226 def parse_packet(sot, payload):
227 if sot.value == 0x33:
228 return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None')
229 if sot.rxtx == 'tx':
230 kc = payload[0]
231 if kc in (0x80, 0x81):
232 return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
233 if kc & 0x40:
234 kc = kc & 0xBF
235 event = 'RELEASED'
236 else:
237 event = 'PRESSED '
238 return 'KEY %s\t%s' % (click.style(event, fg='green'),
239 click.style(KEYCODES.get(kc), fg='cyan'))
240 lbl, dspfunc = CMDS[payload[0]]
241 if dspfunc:
242 lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
243 return lbl
244
245
246 @click.command()
247 @click.option('-p', '--packets/--no-packets', default=False)
248 def main(packets):
249 if packets:
250 try:
251 for sot, payload in recv_packet(read_line(read_input())):
252 cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP'
253 print('[%s]: %02X=[%s]' %
254 (cnxdir, sot.value,
255 ','.join('%02x' % x for x in payload)))
256 except:
257 print('\n'.join(str(x) for x in pkts[-10:]))
258 raise
259 else:
260 try:
261 t0 = None
262 for sot, payload in recv_packet(read_line(read_input())):
263 cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<'
264 if t0 is None:
265 t0 = sot.timestamp
266 click.secho('%08.0f ' % sot.relts,
267 fg='yellow', nl=False)
268 click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
269 fg='blue', nl=False)
270 click.secho(cnxdir + ' ', fg='red', nl=False)
271 click.secho(parse_packet(sot, payload), fg='white')
272 t0 = sot.timestamp
273 except:
274 print('\n'.join(str(x) for x in pkts[-10:]))
275 raise
276
277
278 if __name__ == '__main__':
279 main()

mercurial