serialdata/uart_filter.py

changeset 23
daf26b083899
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/serialdata/uart_filter.py	Mon Jan 28 21:50:09 2019 +0100
@@ -0,0 +1,279 @@
+import sys
+import re
+import click
+
+F = 1e6
+
+reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$')  # noqa
+
+
+class ProtocolError(Exception):
+    pass
+
+class ProtocolReset(Exception):
+    pass
+
+pkts = []
+
+
+class Packet:
+    def __init__(self, frame, rxtx, value):
+        self.frame = frame
+        self.rxtx = rxtx
+        self.value = value
+
+    @property
+    def timestamp(self):
+        return self.frame / F * 1000
+
+    @property
+    def relts(self):
+        return self.timestamp - pkts[0].timestamp
+
+    def __str__(self):
+        return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)
+
+
+def read_line(seq):
+    for line in seq:
+        m = reg.match(line)
+        if m:
+            frame = int(m.group('fr1'))
+            v = int(m.group('value'), base=16)
+            rxtx = m.group('rxtx')
+            pkt = Packet(frame, rxtx, v)
+            pkts.append(pkt)
+            yield pkt
+
+
+def read_input(stream=None):
+    if stream is None:
+        stream = sys.stdin
+    for line in stream:
+        yield line
+
+
+def print_input():
+    t0 = None
+    for pkt in read_line(read_input()):
+        c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
+        if t0 is None:
+            t0 = pkt.timestamp
+        cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
+        print('[%s] %06.1f: %02X # %s' % (
+            cnxdir, (pkt.timestamp-t0), pkt.value, c))
+
+
+def flipdir(rxtx):
+    return 'rx' if rxtx == 'tx' else 'tx'
+
+
+def check_rxtx(cur, pkt):
+    if cur.rxtx == pkt.rxtx:
+        raise ProtocolError('Expected %s transmission %02X %02X' %
+                            (flipdir(cur.rxtx), cur.value, pkt.value))
+
+
+def check_sot_ack(ack, sot):
+    check_rxtx(ack, sot)
+    if ack.value != (0xFF - sot.value):
+        if ack.value == 0x66:
+            raise ProtocolReset()
+        raise ProtocolError('Expected ACK value %02X, got %02X' %
+                            (0xFF - sot.value, ack.value))
+
+
+def wait_for_sot(seq, sot_pkt=None):
+    if sot_pkt is None:
+        for sot_pkt in seq:
+            if sot_pkt.value not in SOT_VALUES:
+                print('Unexpected value %s: %02X' %
+                      (sot_pkt.rxtx, sot_pkt.value))
+            else:
+                break
+    for ack_pkt in seq:
+        try:
+            check_sot_ack(sot_pkt, ack_pkt)
+            # print('New packet %s' % sot_pkt)
+            return sot_pkt, ack_pkt
+        except ProtocolReset:
+            # print('reset')
+            sot_pkt = ack_pkt
+    raise StopIteration()
+
+
+SOT_VALUES = [0x33, 0x66]
+
+
+def recv_packet(seq):
+    sot_pkt = None
+    while True:
+        sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
+        payload = []
+        recv_char = None
+        may_stop = False
+
+        cur_pkt = ack_pkt
+        for pkt in seq:
+            if pkt.value == 0x66:
+                sot_pkt = pkt
+                break
+
+            check_rxtx(pkt, cur_pkt)
+            if recv_char is None:
+                if pkt.rxtx != sot_pkt.rxtx:
+                    raise ProtocolError()
+                recv_char = pkt.value
+                cur_pkt = pkt
+                if may_stop:
+                    if recv_char == 0x55:
+                        cur_pkt = None
+                        sot_pkt = None
+                        break
+            else:
+                if pkt.rxtx == sot_pkt.rxtx:
+                    raise ProtocolError()
+                if pkt.value == 0x00:
+                    payload.append(recv_char)
+                    if check_payload(sot_pkt, payload):
+                        may_stop = True
+                        yield (sot_pkt, payload)
+                    else:
+                        may_stop = False
+                elif pkt.value != 0xFF:
+                    # if FF, ignore the char (ie. its a NACK, so resend)
+                    raise ProtocolError('Not really but hey %02X [%02X] %s' %
+                                        (pkt.value, recv_char, payload))
+                recv_char = None
+                cur_pkt = pkt
+        else:
+            break
+
+
+def check_payload(sot, payload):
+    if sot.rxtx == 'tx':
+        if sot.value == 0x33:
+            return len(payload) == 2
+        else:
+            return len(payload) == 1
+    else:
+        if len(payload) > 1:
+            return len(payload) == (payload[1]+2)
+    return False
+
+
+KEYCODES = {
+    0x00: 'View',
+    0x01: 'Mon',
+    0x02: 'Sto/Rcl',
+    0x03: 'Scan',
+    0x04: 'Alarm',
+    0x05: 'Mx+B',
+    0x06: 'Measure',
+    0x07: 'Interval',
+    0x08: 'Card Reset',
+    0x09: 'Close',
+    0x0A: 'Open',
+    0x0B: 'Read',
+    0x0C: 'Shift',
+    0x0D: 'Write',
+    0x0E: 'Left',
+    0x0F: 'Right',
+    0x10: 'Advanced',
+    0x11: 'Step',
+    0x80: 'Knob right',
+    0x81: 'Knob left',
+    }
+
+
+FLAGS = [
+    ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
+    ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'],
+    ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
+    ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
+]
+
+
+def flag(payload):
+    flags = []
+    for flg, byte in zip(FLAGS, payload[2:]):
+        for i in range(8):
+            if byte & 2**i:
+                flags.append(flg[i])
+    return ','.join(flags)
+
+
+def flag_by_num(num):
+    for b in FLAGS:
+        for f in b:
+            if num == 0:
+                return f
+            num -= 1
+
+CMDS = {
+    0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
+    0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
+    0x0A: ('FLAGS   ', flag),
+    0x01: ('UNSHIFT ', None),
+    0x86: ('SHUTDOWN', None),
+    0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
+    0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
+    0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
+    0x02: ('RST?    ', None),
+    }
+
+
+def parse_packet(sot, payload):
+    if sot.value == 0x33:
+        return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None')
+    if sot.rxtx == 'tx':
+        kc = payload[0]
+        if kc in (0x80, 0x81):
+            return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
+        if kc & 0x40:
+            kc = kc & 0xBF
+            event = 'RELEASED'
+        else:
+            event = 'PRESSED '
+        return 'KEY %s\t%s' % (click.style(event, fg='green'),
+                               click.style(KEYCODES.get(kc), fg='cyan'))
+    lbl, dspfunc = CMDS[payload[0]]
+    if dspfunc:
+        lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
+    return lbl
+
+
+@click.command()
+@click.option('-p', '--packets/--no-packets', default=False)
+def main(packets):
+    if packets:
+        try:
+            for sot, payload in recv_packet(read_line(read_input())):
+                cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP'
+                print('[%s]: %02X=[%s]' %
+                      (cnxdir, sot.value,
+                       ','.join('%02x' % x for x in payload)))
+        except:
+            print('\n'.join(str(x) for x in pkts[-10:]))
+            raise
+    else:
+        try:
+            t0 = None
+            for sot, payload in recv_packet(read_line(read_input())):
+                cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<'
+                if t0 is None:
+                    t0 = sot.timestamp
+                click.secho('%08.0f ' % sot.relts,
+                            fg='yellow', nl=False)
+                click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
+                            fg='blue', nl=False)
+                click.secho(cnxdir + ' ', fg='red', nl=False)
+                click.secho(parse_packet(sot, payload), fg='white')
+                t0 = sot.timestamp
+        except:
+            print('\n'.join(str(x) for x in pkts[-10:]))
+            raise
+
+
+if __name__ == '__main__':
+    main()

mercurial