serialdata/ref/uart_filter.py

changeset 43
c850674a3101
equal deleted inserted replaced
42:13bc915b657b 43:c850674a3101
1 import sys
2 import re
3 import click
4
5 F = 1e6
6
7 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$') # noqa
8 # to use with
9 # sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \
10 # --channels RX,TX -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum
11 #
12 # in this configuration:
13 # RX: FP --> CPU
14 # TX: CPU --> FP
15
16 class ProtocolError(Exception):
17 pass
18
19 class ProtocolReset(Exception):
20 pass
21
22 pkts = []
23
24
25 class Packet:
26 def __init__(self, frame, rxtx, value):
27 self.frame = frame
28 self.rxtx = rxtx
29 self.value = value
30
31 @property
32 def timestamp(self):
33 return self.frame / F * 1000
34
35 @property
36 def relts(self):
37 return self.timestamp - pkts[0].timestamp
38
39 def __str__(self):
40 return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)
41
42
43 def read_line(seq):
44 rxtx = "rx"
45 for line in seq:
46 m = reg.match(line)
47 if m:
48 frame = int(m.group('fr1'))
49 v = m.group('value')
50 if v == "Start bit":
51 rxtx = "tx"
52 else:
53 v = int(v, base=16)
54 pkt = Packet(frame, rxtx, v)
55 pkts.append(pkt)
56 yield pkt
57 rxtx = "rx"
58 else:
59 print("nope", line)
60
61
62 def read_input(stream=None):
63 if stream is None:
64 stream = sys.stdin
65 for line in stream:
66 yield line
67
68
69 def print_input():
70 t0 = None
71 for pkt in read_line(read_input()):
72 c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
73 if t0 is None:
74 t0 = pkt.timestamp
75 cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
76 print('[%s] %06.1f: %02X # %s' % (
77 cnxdir, (pkt.timestamp-t0), pkt.value, c))
78
79
80 def flipdir(rxtx):
81 return 'rx' if rxtx == 'tx' else 'tx'
82
83
84 def check_rxtx(cur, pkt):
85 if cur.rxtx == pkt.rxtx:
86 raise ProtocolError('Expected %s transmission %02X %02X' %
87 (flipdir(cur.rxtx), cur.value, pkt.value))
88
89
90 def check_sot_ack(ack, sot):
91 check_rxtx(ack, sot)
92 if ack.value != (0xFF - sot.value):
93 if ack.value == 0x66:
94 raise ProtocolReset()
95 raise ProtocolError('Expected ACK value %02X, got %02X' %
96 (0xFF - sot.value, ack.value))
97
98
99 def wait_for_sot(seq, sot_pkt=None):
100 if sot_pkt is None:
101 for sot_pkt in seq:
102 #print("PKT", sot_pkt)
103 if sot_pkt.value not in SOT_VALUES:
104 print('Unexpected value %s: %02X' %
105 (sot_pkt.rxtx, sot_pkt.value))
106 if sot_pkt.value == 0x76: # off by one due to sampling freq
107 print(" off by one on 0x66, fixing value")
108 sot_pkt.value = 0x66
109 break
110 else:
111 break
112 for ack_pkt in seq:
113 try:
114 check_sot_ack(sot_pkt, ack_pkt)
115 #print('New packet %s %s' % (sot_pkt, ack_pkt))
116 return sot_pkt, ack_pkt
117 except ProtocolReset:
118 # print('reset')
119 sot_pkt = ack_pkt
120 #raise StopIteration()
121 return None, None
122
123
124 SOT_VALUES = [0x33, 0x66]
125
126
127 def recv_packet(seq):
128 sot_pkt = None
129 while True:
130 sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
131 if sot_pkt is None:
132 break
133 payload = []
134 recv_char = None
135 may_stop = False
136
137 cur_pkt = ack_pkt
138 for pkt in seq:
139 if pkt.value == 0x66:
140 sot_pkt = pkt
141 break
142
143 check_rxtx(pkt, cur_pkt)
144 if recv_char is None:
145 if pkt.rxtx != sot_pkt.rxtx:
146 raise ProtocolError()
147 recv_char = pkt.value
148 cur_pkt = pkt
149 if may_stop:
150 if recv_char == 0x55:
151 cur_pkt = None
152 sot_pkt = None
153 break
154 else:
155 if pkt.rxtx == sot_pkt.rxtx:
156 raise ProtocolError()
157 if pkt.value == 0x00: # ack, previous recv-char is valid
158 payload.append(recv_char)
159 if check_payload(sot_pkt, payload):
160 may_stop = True
161 yield (sot_pkt, payload)
162 else:
163 may_stop = False
164 elif pkt.value != 0xFF: # really?
165 # if FF, ignore the char (ie. its a NACK, so resend)
166 raise ProtocolError('Not really but hey %02X [%02X] %s' %
167 (pkt.value, recv_char, payload))
168 recv_char = None
169 cur_pkt = pkt
170 else:
171 break
172
173
174 def check_payload(sot, payload):
175 if sot.rxtx == 'rx':
176 if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars:
177 if len(payload) > 1:
178 if payload[1] == 0xFF: # should be [0x02, 0xFF, <KEY>]
179 return len(payload) == 3
180 else: # should be [0x02, 0x00]
181 return len(payload) == 2
182 else:
183 return len(payload) == 1
184 else:
185 if len(payload) > 1:
186 return len(payload) == (payload[1]+2)
187 return False
188
189
190 KEYCODES = {
191 0x00: 'View',
192 0x01: 'Mon',
193 0x02: 'Sto/Rcl',
194 0x03: 'Scan',
195 0x04: 'Alarm',
196 0x05: 'Mx+B',
197 0x06: 'Measure',
198 0x07: 'Interval',
199 0x08: 'Card Reset',
200 0x09: 'Close',
201 0x0A: 'Open',
202 0x0B: 'Read',
203 0x0C: 'Shift',
204 0x0D: 'Write',
205 0x0E: 'Left',
206 0x0F: 'Right',
207 0x10: 'Advanced',
208 0x11: 'Step',
209 0x80: 'Knob right',
210 0x81: 'Knob left',
211 }
212
213
214 FLAGS = [
215 ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
216 ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'],
217 ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
218 ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
219 ]
220
221
222 def flag(payload):
223 flags = []
224 for flg, byte in zip(FLAGS, payload[2:]):
225 for i in range(8):
226 if byte & 2**i:
227 flags.append(flg[i])
228 return ','.join(flags)
229
230
231 def flag_by_num(num):
232 for b in FLAGS:
233 for f in b:
234 if num == 0:
235 return f
236 num -= 1
237
238 CMDS = {
239 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
240 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
241 0x0A: ('FLAGS ', flag),
242 0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])),
243 0x86: ('SHUTDOWN', None),
244 0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
245 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
246 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
247 0x02: ('RST? ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))),
248 }
249
250
251 def parse_packet(sot, payload):
252 if sot.value == 0x33:
253 if len(payload) == 2:
254 return 'INIT NO KEY'
255 else:
256 return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan')
257 if sot.rxtx == 'rx':
258 kc = payload[0]
259 if kc in (0x80, 0x81):
260 return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
261 if kc & 0x40:
262 kc = kc & 0xBF
263 event = 'RELEASED'
264 else:
265 event = 'PRESSED '
266 return 'KEY\t%s\t%s' % (click.style(event, fg='green'),
267 click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan'))
268 lbl, dspfunc = CMDS[payload[0]]
269 if dspfunc:
270 lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
271 return lbl
272
273
274 @click.command()
275 @click.option('-p', '--packets/--no-packets', default=False)
276 @click.option('-f', '--filename', default=None)
277 def main(packets, filename):
278 if filename:
279 input_stream = open(filename)
280 else:
281 input_stream = None # == stdin
282 if packets:
283 try:
284 for sot, payload in recv_packet(read_line(read_input(input_stream))):
285 cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP'
286 print('[%s]: %02X=[%s]' %
287 (cnxdir, sot.value,
288 ','.join('%02x' % x for x in payload)))
289 except:
290 print('\n'.join(str(x) for x in pkts[-10:]))
291 raise
292 else:
293 click.secho("Transmission CPU <--> DP", fg="red")
294 try:
295 t0 = None
296 for sot, payload in recv_packet(read_line(read_input(input_stream))):
297 cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<'
298 if t0 is None:
299 t0 = sot.timestamp
300 click.secho('%08.0f ' % sot.relts,
301 fg='yellow', nl=False)
302 click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
303 fg='blue', nl=False)
304 click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False)
305 click.secho(parse_packet(sot, payload), fg='white')
306 t0 = sot.timestamp
307 except:
308 print('\n'.join(str(x) for x in pkts[-10:]))
309 raise
310
311
312 if __name__ == '__main__':
313 main()

mercurial