serialdata/uart_filter.py

Thu, 12 Nov 2020 20:26:35 +0100

author
David Douard <david.douard@sdf3.org>
date
Thu, 12 Nov 2020 20:26:35 +0100
changeset 53
74e85b34d26b
parent 23
daf26b083899
permissions
-rw-r--r--

Reorganize the display + improvements for dimmed flags

- the whole upper zone is now dediacated to the main character line
- make sure eash flag has a dedicated non-overlaping area
- improve support for dimmed flags (not yet properly functionning since this
dimm state is actually stateful, so some major refactorings are needed to
properly handle this).

import sys
import re
import click

F = 1e6

reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$')  # noqa


class ProtocolError(Exception):
    pass

class ProtocolReset(Exception):
    pass

pkts = []


class Packet:
    def __init__(self, frame, rxtx, value):
        self.frame = frame
        self.rxtx = rxtx
        self.value = value

    @property
    def timestamp(self):
        return self.frame / F * 1000

    @property
    def relts(self):
        return self.timestamp - pkts[0].timestamp

    def __str__(self):
        return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)


def read_line(seq):
    for line in seq:
        m = reg.match(line)
        if m:
            frame = int(m.group('fr1'))
            v = int(m.group('value'), base=16)
            rxtx = m.group('rxtx')
            pkt = Packet(frame, rxtx, v)
            pkts.append(pkt)
            yield pkt


def read_input(stream=None):
    if stream is None:
        stream = sys.stdin
    for line in stream:
        yield line


def print_input():
    t0 = None
    for pkt in read_line(read_input()):
        c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
        if t0 is None:
            t0 = pkt.timestamp
        cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
        print('[%s] %06.1f: %02X # %s' % (
            cnxdir, (pkt.timestamp-t0), pkt.value, c))


def flipdir(rxtx):
    return 'rx' if rxtx == 'tx' else 'tx'


def check_rxtx(cur, pkt):
    if cur.rxtx == pkt.rxtx:
        raise ProtocolError('Expected %s transmission %02X %02X' %
                            (flipdir(cur.rxtx), cur.value, pkt.value))


def check_sot_ack(ack, sot):
    check_rxtx(ack, sot)
    if ack.value != (0xFF - sot.value):
        if ack.value == 0x66:
            raise ProtocolReset()
        raise ProtocolError('Expected ACK value %02X, got %02X' %
                            (0xFF - sot.value, ack.value))


def wait_for_sot(seq, sot_pkt=None):
    if sot_pkt is None:
        for sot_pkt in seq:
            if sot_pkt.value not in SOT_VALUES:
                print('Unexpected value %s: %02X' %
                      (sot_pkt.rxtx, sot_pkt.value))
            else:
                break
    for ack_pkt in seq:
        try:
            check_sot_ack(sot_pkt, ack_pkt)
            # print('New packet %s' % sot_pkt)
            return sot_pkt, ack_pkt
        except ProtocolReset:
            # print('reset')
            sot_pkt = ack_pkt
    raise StopIteration()


SOT_VALUES = [0x33, 0x66]


def recv_packet(seq):
    sot_pkt = None
    while True:
        sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
        payload = []
        recv_char = None
        may_stop = False

        cur_pkt = ack_pkt
        for pkt in seq:
            if pkt.value == 0x66:
                sot_pkt = pkt
                break

            check_rxtx(pkt, cur_pkt)
            if recv_char is None:
                if pkt.rxtx != sot_pkt.rxtx:
                    raise ProtocolError()
                recv_char = pkt.value
                cur_pkt = pkt
                if may_stop:
                    if recv_char == 0x55:
                        cur_pkt = None
                        sot_pkt = None
                        break
            else:
                if pkt.rxtx == sot_pkt.rxtx:
                    raise ProtocolError()
                if pkt.value == 0x00:
                    payload.append(recv_char)
                    if check_payload(sot_pkt, payload):
                        may_stop = True
                        yield (sot_pkt, payload)
                    else:
                        may_stop = False
                elif pkt.value != 0xFF:
                    # if FF, ignore the char (ie. its a NACK, so resend)
                    raise ProtocolError('Not really but hey %02X [%02X] %s' %
                                        (pkt.value, recv_char, payload))
                recv_char = None
                cur_pkt = pkt
        else:
            break


def check_payload(sot, payload):
    if sot.rxtx == 'tx':
        if sot.value == 0x33:
            return len(payload) == 2
        else:
            return len(payload) == 1
    else:
        if len(payload) > 1:
            return len(payload) == (payload[1]+2)
    return False


KEYCODES = {
    0x00: 'View',
    0x01: 'Mon',
    0x02: 'Sto/Rcl',
    0x03: 'Scan',
    0x04: 'Alarm',
    0x05: 'Mx+B',
    0x06: 'Measure',
    0x07: 'Interval',
    0x08: 'Card Reset',
    0x09: 'Close',
    0x0A: 'Open',
    0x0B: 'Read',
    0x0C: 'Shift',
    0x0D: 'Write',
    0x0E: 'Left',
    0x0F: 'Right',
    0x10: 'Advanced',
    0x11: 'Step',
    0x80: 'Knob right',
    0x81: 'Knob left',
    }


FLAGS = [
    ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
    ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'],
    ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
    ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
]


def flag(payload):
    flags = []
    for flg, byte in zip(FLAGS, payload[2:]):
        for i in range(8):
            if byte & 2**i:
                flags.append(flg[i])
    return ','.join(flags)


def flag_by_num(num):
    for b in FLAGS:
        for f in b:
            if num == 0:
                return f
            num -= 1

CMDS = {
    0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0A: ('FLAGS   ', flag),
    0x01: ('UNSHIFT ', None),
    0x86: ('SHUTDOWN', None),
    0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
    0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
    0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
    0x02: ('RST?    ', None),
    }


def parse_packet(sot, payload):
    if sot.value == 0x33:
        return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None')
    if sot.rxtx == 'tx':
        kc = payload[0]
        if kc in (0x80, 0x81):
            return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
        if kc & 0x40:
            kc = kc & 0xBF
            event = 'RELEASED'
        else:
            event = 'PRESSED '
        return 'KEY %s\t%s' % (click.style(event, fg='green'),
                               click.style(KEYCODES.get(kc), fg='cyan'))
    lbl, dspfunc = CMDS[payload[0]]
    if dspfunc:
        lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
    return lbl


@click.command()
@click.option('-p', '--packets/--no-packets', default=False)
def main(packets):
    if packets:
        try:
            for sot, payload in recv_packet(read_line(read_input())):
                cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP'
                print('[%s]: %02X=[%s]' %
                      (cnxdir, sot.value,
                       ','.join('%02x' % x for x in payload)))
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise
    else:
        try:
            t0 = None
            for sot, payload in recv_packet(read_line(read_input())):
                cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<'
                if t0 is None:
                    t0 = sot.timestamp
                click.secho('%08.0f ' % sot.relts,
                            fg='yellow', nl=False)
                click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
                            fg='blue', nl=False)
                click.secho(cnxdir + ' ', fg='red', nl=False)
                click.secho(parse_packet(sot, payload), fg='white')
                t0 = sot.timestamp
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise


if __name__ == '__main__':
    main()

mercurial