serialdata/uart_filter.py

Thu, 07 Oct 2021 22:24:45 +0200

author
David Douard <david.douard@sdfa3.org>
date
Thu, 07 Oct 2021 22:24:45 +0200
changeset 69
516acbbce05a
parent 23
daf26b083899
permissions
-rw-r--r--

Possibly fix the 'reset FP while running' case

import sys
import re
import click

F = 1e6

reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$')  # noqa


class ProtocolError(Exception):
    pass

class ProtocolReset(Exception):
    pass

pkts = []


class Packet:
    def __init__(self, frame, rxtx, value):
        self.frame = frame
        self.rxtx = rxtx
        self.value = value

    @property
    def timestamp(self):
        return self.frame / F * 1000

    @property
    def relts(self):
        return self.timestamp - pkts[0].timestamp

    def __str__(self):
        return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)


def read_line(seq):
    for line in seq:
        m = reg.match(line)
        if m:
            frame = int(m.group('fr1'))
            v = int(m.group('value'), base=16)
            rxtx = m.group('rxtx')
            pkt = Packet(frame, rxtx, v)
            pkts.append(pkt)
            yield pkt


def read_input(stream=None):
    if stream is None:
        stream = sys.stdin
    for line in stream:
        yield line


def print_input():
    t0 = None
    for pkt in read_line(read_input()):
        c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
        if t0 is None:
            t0 = pkt.timestamp
        cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
        print('[%s] %06.1f: %02X # %s' % (
            cnxdir, (pkt.timestamp-t0), pkt.value, c))


def flipdir(rxtx):
    return 'rx' if rxtx == 'tx' else 'tx'


def check_rxtx(cur, pkt):
    if cur.rxtx == pkt.rxtx:
        raise ProtocolError('Expected %s transmission %02X %02X' %
                            (flipdir(cur.rxtx), cur.value, pkt.value))


def check_sot_ack(ack, sot):
    check_rxtx(ack, sot)
    if ack.value != (0xFF - sot.value):
        if ack.value == 0x66:
            raise ProtocolReset()
        raise ProtocolError('Expected ACK value %02X, got %02X' %
                            (0xFF - sot.value, ack.value))


def wait_for_sot(seq, sot_pkt=None):
    if sot_pkt is None:
        for sot_pkt in seq:
            if sot_pkt.value not in SOT_VALUES:
                print('Unexpected value %s: %02X' %
                      (sot_pkt.rxtx, sot_pkt.value))
            else:
                break
    for ack_pkt in seq:
        try:
            check_sot_ack(sot_pkt, ack_pkt)
            # print('New packet %s' % sot_pkt)
            return sot_pkt, ack_pkt
        except ProtocolReset:
            # print('reset')
            sot_pkt = ack_pkt
    raise StopIteration()


SOT_VALUES = [0x33, 0x66]


def recv_packet(seq):
    sot_pkt = None
    while True:
        sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
        payload = []
        recv_char = None
        may_stop = False

        cur_pkt = ack_pkt
        for pkt in seq:
            if pkt.value == 0x66:
                sot_pkt = pkt
                break

            check_rxtx(pkt, cur_pkt)
            if recv_char is None:
                if pkt.rxtx != sot_pkt.rxtx:
                    raise ProtocolError()
                recv_char = pkt.value
                cur_pkt = pkt
                if may_stop:
                    if recv_char == 0x55:
                        cur_pkt = None
                        sot_pkt = None
                        break
            else:
                if pkt.rxtx == sot_pkt.rxtx:
                    raise ProtocolError()
                if pkt.value == 0x00:
                    payload.append(recv_char)
                    if check_payload(sot_pkt, payload):
                        may_stop = True
                        yield (sot_pkt, payload)
                    else:
                        may_stop = False
                elif pkt.value != 0xFF:
                    # if FF, ignore the char (ie. its a NACK, so resend)
                    raise ProtocolError('Not really but hey %02X [%02X] %s' %
                                        (pkt.value, recv_char, payload))
                recv_char = None
                cur_pkt = pkt
        else:
            break


def check_payload(sot, payload):
    if sot.rxtx == 'tx':
        if sot.value == 0x33:
            return len(payload) == 2
        else:
            return len(payload) == 1
    else:
        if len(payload) > 1:
            return len(payload) == (payload[1]+2)
    return False


KEYCODES = {
    0x00: 'View',
    0x01: 'Mon',
    0x02: 'Sto/Rcl',
    0x03: 'Scan',
    0x04: 'Alarm',
    0x05: 'Mx+B',
    0x06: 'Measure',
    0x07: 'Interval',
    0x08: 'Card Reset',
    0x09: 'Close',
    0x0A: 'Open',
    0x0B: 'Read',
    0x0C: 'Shift',
    0x0D: 'Write',
    0x0E: 'Left',
    0x0F: 'Right',
    0x10: 'Advanced',
    0x11: 'Step',
    0x80: 'Knob right',
    0x81: 'Knob left',
    }


FLAGS = [
    ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
    ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'],
    ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
    ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
]


def flag(payload):
    flags = []
    for flg, byte in zip(FLAGS, payload[2:]):
        for i in range(8):
            if byte & 2**i:
                flags.append(flg[i])
    return ','.join(flags)


def flag_by_num(num):
    for b in FLAGS:
        for f in b:
            if num == 0:
                return f
            num -= 1

CMDS = {
    0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0A: ('FLAGS   ', flag),
    0x01: ('UNSHIFT ', None),
    0x86: ('SHUTDOWN', None),
    0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
    0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
    0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
    0x02: ('RST?    ', None),
    }


def parse_packet(sot, payload):
    if sot.value == 0x33:
        return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None')
    if sot.rxtx == 'tx':
        kc = payload[0]
        if kc in (0x80, 0x81):
            return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
        if kc & 0x40:
            kc = kc & 0xBF
            event = 'RELEASED'
        else:
            event = 'PRESSED '
        return 'KEY %s\t%s' % (click.style(event, fg='green'),
                               click.style(KEYCODES.get(kc), fg='cyan'))
    lbl, dspfunc = CMDS[payload[0]]
    if dspfunc:
        lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
    return lbl


@click.command()
@click.option('-p', '--packets/--no-packets', default=False)
def main(packets):
    if packets:
        try:
            for sot, payload in recv_packet(read_line(read_input())):
                cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP'
                print('[%s]: %02X=[%s]' %
                      (cnxdir, sot.value,
                       ','.join('%02x' % x for x in payload)))
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise
    else:
        try:
            t0 = None
            for sot, payload in recv_packet(read_line(read_input())):
                cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<'
                if t0 is None:
                    t0 = sot.timestamp
                click.secho('%08.0f ' % sot.relts,
                            fg='yellow', nl=False)
                click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
                            fg='blue', nl=False)
                click.secho(cnxdir + ' ', fg='red', nl=False)
                click.secho(parse_packet(sot, payload), fg='white')
                t0 = sot.timestamp
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise


if __name__ == '__main__':
    main()

mercurial