serialdata/ref/uart_filter.py

Thu, 12 Nov 2020 20:26:35 +0100

author
David Douard <david.douard@sdf3.org>
date
Thu, 12 Nov 2020 20:26:35 +0100
changeset 53
74e85b34d26b
parent 43
c850674a3101
permissions
-rw-r--r--

Reorganize the display + improvements for dimmed flags

- the whole upper zone is now dediacated to the main character line
- make sure eash flag has a dedicated non-overlaping area
- improve support for dimmed flags (not yet properly functionning since this
dimm state is actually stateful, so some major refactorings are needed to
properly handle this).

import sys
import re
import click

F = 1e6

reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$')  # noqa
# to use with
# sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \
#            --channels RX,TX  -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum
#
# in this configuration:
#  RX: FP --> CPU
#  TX: CPU --> FP

class ProtocolError(Exception):
    pass

class ProtocolReset(Exception):
    pass

pkts = []


class Packet:
    def __init__(self, frame, rxtx, value):
        self.frame = frame
        self.rxtx = rxtx
        self.value = value

    @property
    def timestamp(self):
        return self.frame / F * 1000

    @property
    def relts(self):
        return self.timestamp - pkts[0].timestamp

    def __str__(self):
        return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)


def read_line(seq):
    rxtx = "rx"
    for line in seq:
        m = reg.match(line)
        if m:
            frame = int(m.group('fr1'))
            v = m.group('value')
            if v == "Start bit":
                rxtx = "tx"
            else:
                v = int(v, base=16)
                pkt = Packet(frame, rxtx, v)
                pkts.append(pkt)
                yield pkt
                rxtx = "rx"
        else:
            print("nope", line)


def read_input(stream=None):
    if stream is None:
        stream = sys.stdin
    for line in stream:
        yield line


def print_input():
    t0 = None
    for pkt in read_line(read_input()):
        c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
        if t0 is None:
            t0 = pkt.timestamp
        cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
        print('[%s] %06.1f: %02X # %s' % (
            cnxdir, (pkt.timestamp-t0), pkt.value, c))


def flipdir(rxtx):
    return 'rx' if rxtx == 'tx' else 'tx'


def check_rxtx(cur, pkt):
    if cur.rxtx == pkt.rxtx:
        raise ProtocolError('Expected %s transmission %02X %02X' %
                            (flipdir(cur.rxtx), cur.value, pkt.value))


def check_sot_ack(ack, sot):
    check_rxtx(ack, sot)
    if ack.value != (0xFF - sot.value):
        if ack.value == 0x66:
            raise ProtocolReset()
        raise ProtocolError('Expected ACK value %02X, got %02X' %
                            (0xFF - sot.value, ack.value))


def wait_for_sot(seq, sot_pkt=None):
    if sot_pkt is None:
        for sot_pkt in seq:
            #print("PKT", sot_pkt)
            if sot_pkt.value not in SOT_VALUES:
                print('Unexpected value %s: %02X' %
                      (sot_pkt.rxtx, sot_pkt.value))
                if sot_pkt.value == 0x76:  # off by one due to sampling freq 
                    print("  off by one on 0x66, fixing value")
                    sot_pkt.value = 0x66
                    break
            else:
                break
    for ack_pkt in seq:
        try:
            check_sot_ack(sot_pkt, ack_pkt)
            #print('New packet %s %s' % (sot_pkt, ack_pkt))
            return sot_pkt, ack_pkt
        except ProtocolReset:
            # print('reset')
            sot_pkt = ack_pkt
    #raise StopIteration()
    return None, None


SOT_VALUES = [0x33, 0x66]


def recv_packet(seq):
    sot_pkt = None
    while True:
        sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
        if sot_pkt is None:
            break
        payload = []
        recv_char = None
        may_stop = False

        cur_pkt = ack_pkt
        for pkt in seq:
            if pkt.value == 0x66:
                sot_pkt = pkt
                break

            check_rxtx(pkt, cur_pkt)
            if recv_char is None:
                if pkt.rxtx != sot_pkt.rxtx:
                    raise ProtocolError()
                recv_char = pkt.value
                cur_pkt = pkt
                if may_stop:
                    if recv_char == 0x55:
                        cur_pkt = None
                        sot_pkt = None
                        break
            else:
                if pkt.rxtx == sot_pkt.rxtx:
                    raise ProtocolError()
                if pkt.value == 0x00:  # ack, previous recv-char is valid
                    payload.append(recv_char)
                    if check_payload(sot_pkt, payload):
                        may_stop = True
                        yield (sot_pkt, payload)
                    else:
                        may_stop = False
                elif pkt.value != 0xFF:  # really?
                    # if FF, ignore the char (ie. its a NACK, so resend)
                    raise ProtocolError('Not really but hey %02X [%02X] %s' %
                                        (pkt.value, recv_char, payload))
                recv_char = None
                cur_pkt = pkt
        else:
            break


def check_payload(sot, payload):
    if sot.rxtx == 'rx':
        if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars: 
            if len(payload) > 1:
                if payload[1] == 0xFF:  # should be [0x02, 0xFF, <KEY>]
                    return len(payload) == 3
                else:  # should be [0x02, 0x00]
                    return len(payload) == 2
        else:
            return len(payload) == 1
    else:
        if len(payload) > 1:
            return len(payload) == (payload[1]+2)
    return False


KEYCODES = {
    0x00: 'View',
    0x01: 'Mon',
    0x02: 'Sto/Rcl',
    0x03: 'Scan',
    0x04: 'Alarm',
    0x05: 'Mx+B',
    0x06: 'Measure',
    0x07: 'Interval',
    0x08: 'Card Reset',
    0x09: 'Close',
    0x0A: 'Open',
    0x0B: 'Read',
    0x0C: 'Shift',
    0x0D: 'Write',
    0x0E: 'Left',
    0x0F: 'Right',
    0x10: 'Advanced',
    0x11: 'Step',
    0x80: 'Knob right',
    0x81: 'Knob left',
    }


FLAGS = [
    ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
    ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'],
    ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
    ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
]


def flag(payload):
    flags = []
    for flg, byte in zip(FLAGS, payload[2:]):
        for i in range(8):
            if byte & 2**i:
                flags.append(flg[i])
    return ','.join(flags)


def flag_by_num(num):
    for b in FLAGS:
        for f in b:
            if num == 0:
                return f
            num -= 1

CMDS = {
    0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0A: ('FLAGS   ', flag),
    0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])),
    0x86: ('SHUTDOWN', None),
    0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
    0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
    0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
    0x02: ('RST?    ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))),
    }


def parse_packet(sot, payload):
    if sot.value == 0x33:
        if len(payload) == 2:
            return 'INIT NO KEY'
        else:
            return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan')
    if sot.rxtx == 'rx':
        kc = payload[0]
        if kc in (0x80, 0x81):
            return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
        if kc & 0x40:
            kc = kc & 0xBF
            event = 'RELEASED'
        else:
            event = 'PRESSED '
        return 'KEY\t%s\t%s' % (click.style(event, fg='green'),
                               click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan'))
    lbl, dspfunc = CMDS[payload[0]]
    if dspfunc:
        lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
    return lbl


@click.command()
@click.option('-p', '--packets/--no-packets', default=False)
@click.option('-f', '--filename', default=None)
def main(packets, filename):
    if filename:
        input_stream = open(filename)
    else:
        input_stream = None # == stdin
    if packets:
        try:
            for sot, payload in recv_packet(read_line(read_input(input_stream))):
                cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP'
                print('[%s]: %02X=[%s]' %
                      (cnxdir, sot.value,
                       ','.join('%02x' % x for x in payload)))
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise
    else:
        click.secho("Transmission CPU <--> DP", fg="red") 
        try:
            t0 = None
            for sot, payload in recv_packet(read_line(read_input(input_stream))):
                cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<'
                if t0 is None:
                    t0 = sot.timestamp
                click.secho('%08.0f ' % sot.relts,
                            fg='yellow', nl=False)
                click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
                            fg='blue', nl=False)
                click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False)
                click.secho(parse_packet(sot, payload), fg='white')
                t0 = sot.timestamp
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise


if __name__ == '__main__':
    main()

mercurial