serialdata/ref/uart_filter.py

Mon, 09 Nov 2020 23:05:24 +0100

author
David Douard <david.douard@sdf3.org>
date
Mon, 09 Nov 2020 23:05:24 +0100
changeset 49
c146d19101a3
parent 43
c850674a3101
permissions
-rw-r--r--

Refactor HPSerial to get rid of packet collision misbehavior

completely split the key sending code from the irq-based receiveing logic.
When sending keycodes, disable the RxIrq callback and handle send and recv
of bytes synchronously.

The keycode sending routine rus in a dedicated thread.

import sys
import re
import click

F = 1e6

reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$')  # noqa
# to use with
# sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \
#            --channels RX,TX  -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum
#
# in this configuration:
#  RX: FP --> CPU
#  TX: CPU --> FP

class ProtocolError(Exception):
    pass

class ProtocolReset(Exception):
    pass

pkts = []


class Packet:
    def __init__(self, frame, rxtx, value):
        self.frame = frame
        self.rxtx = rxtx
        self.value = value

    @property
    def timestamp(self):
        return self.frame / F * 1000

    @property
    def relts(self):
        return self.timestamp - pkts[0].timestamp

    def __str__(self):
        return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value)


def read_line(seq):
    rxtx = "rx"
    for line in seq:
        m = reg.match(line)
        if m:
            frame = int(m.group('fr1'))
            v = m.group('value')
            if v == "Start bit":
                rxtx = "tx"
            else:
                v = int(v, base=16)
                pkt = Packet(frame, rxtx, v)
                pkts.append(pkt)
                yield pkt
                rxtx = "rx"
        else:
            print("nope", line)


def read_input(stream=None):
    if stream is None:
        stream = sys.stdin
    for line in stream:
        yield line


def print_input():
    t0 = None
    for pkt in read_line(read_input()):
        c = chr(pkt.value) if 32 <= pkt.value <= 127 else ''
        if t0 is None:
            t0 = pkt.timestamp
        cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP'
        print('[%s] %06.1f: %02X # %s' % (
            cnxdir, (pkt.timestamp-t0), pkt.value, c))


def flipdir(rxtx):
    return 'rx' if rxtx == 'tx' else 'tx'


def check_rxtx(cur, pkt):
    if cur.rxtx == pkt.rxtx:
        raise ProtocolError('Expected %s transmission %02X %02X' %
                            (flipdir(cur.rxtx), cur.value, pkt.value))


def check_sot_ack(ack, sot):
    check_rxtx(ack, sot)
    if ack.value != (0xFF - sot.value):
        if ack.value == 0x66:
            raise ProtocolReset()
        raise ProtocolError('Expected ACK value %02X, got %02X' %
                            (0xFF - sot.value, ack.value))


def wait_for_sot(seq, sot_pkt=None):
    if sot_pkt is None:
        for sot_pkt in seq:
            #print("PKT", sot_pkt)
            if sot_pkt.value not in SOT_VALUES:
                print('Unexpected value %s: %02X' %
                      (sot_pkt.rxtx, sot_pkt.value))
                if sot_pkt.value == 0x76:  # off by one due to sampling freq 
                    print("  off by one on 0x66, fixing value")
                    sot_pkt.value = 0x66
                    break
            else:
                break
    for ack_pkt in seq:
        try:
            check_sot_ack(sot_pkt, ack_pkt)
            #print('New packet %s %s' % (sot_pkt, ack_pkt))
            return sot_pkt, ack_pkt
        except ProtocolReset:
            # print('reset')
            sot_pkt = ack_pkt
    #raise StopIteration()
    return None, None


SOT_VALUES = [0x33, 0x66]


def recv_packet(seq):
    sot_pkt = None
    while True:
        sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt)
        if sot_pkt is None:
            break
        payload = []
        recv_char = None
        may_stop = False

        cur_pkt = ack_pkt
        for pkt in seq:
            if pkt.value == 0x66:
                sot_pkt = pkt
                break

            check_rxtx(pkt, cur_pkt)
            if recv_char is None:
                if pkt.rxtx != sot_pkt.rxtx:
                    raise ProtocolError()
                recv_char = pkt.value
                cur_pkt = pkt
                if may_stop:
                    if recv_char == 0x55:
                        cur_pkt = None
                        sot_pkt = None
                        break
            else:
                if pkt.rxtx == sot_pkt.rxtx:
                    raise ProtocolError()
                if pkt.value == 0x00:  # ack, previous recv-char is valid
                    payload.append(recv_char)
                    if check_payload(sot_pkt, payload):
                        may_stop = True
                        yield (sot_pkt, payload)
                    else:
                        may_stop = False
                elif pkt.value != 0xFF:  # really?
                    # if FF, ignore the char (ie. its a NACK, so resend)
                    raise ProtocolError('Not really but hey %02X [%02X] %s' %
                                        (pkt.value, recv_char, payload))
                recv_char = None
                cur_pkt = pkt
        else:
            break


def check_payload(sot, payload):
    if sot.rxtx == 'rx':
        if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars: 
            if len(payload) > 1:
                if payload[1] == 0xFF:  # should be [0x02, 0xFF, <KEY>]
                    return len(payload) == 3
                else:  # should be [0x02, 0x00]
                    return len(payload) == 2
        else:
            return len(payload) == 1
    else:
        if len(payload) > 1:
            return len(payload) == (payload[1]+2)
    return False


KEYCODES = {
    0x00: 'View',
    0x01: 'Mon',
    0x02: 'Sto/Rcl',
    0x03: 'Scan',
    0x04: 'Alarm',
    0x05: 'Mx+B',
    0x06: 'Measure',
    0x07: 'Interval',
    0x08: 'Card Reset',
    0x09: 'Close',
    0x0A: 'Open',
    0x0B: 'Read',
    0x0C: 'Shift',
    0x0D: 'Write',
    0x0E: 'Left',
    0x0F: 'Right',
    0x10: 'Advanced',
    0x11: 'Step',
    0x80: 'Knob right',
    0x81: 'Knob left',
    }


FLAGS = [
    ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'],
    ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'],
    ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'],
    ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', '']
]


def flag(payload):
    flags = []
    for flg, byte in zip(FLAGS, payload[2:]):
        for i in range(8):
            if byte & 2**i:
                flags.append(flg[i])
    return ','.join(flags)


def flag_by_num(num):
    for b in FLAGS:
        for f in b:
            if num == 0:
                return f
            num -= 1

CMDS = {
    0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])),
    0x0A: ('FLAGS   ', flag),
    0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])),
    0x86: ('SHUTDOWN', None),
    0x0D: ('CHAR LO ', lambda pl: str(pl[2])),
    0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])),
    0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])),
    0x02: ('RST?    ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))),
    }


def parse_packet(sot, payload):
    if sot.value == 0x33:
        if len(payload) == 2:
            return 'INIT NO KEY'
        else:
            return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan')
    if sot.rxtx == 'rx':
        kc = payload[0]
        if kc in (0x80, 0x81):
            return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta')
        if kc & 0x40:
            kc = kc & 0xBF
            event = 'RELEASED'
        else:
            event = 'PRESSED '
        return 'KEY\t%s\t%s' % (click.style(event, fg='green'),
                               click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan'))
    lbl, dspfunc = CMDS[payload[0]]
    if dspfunc:
        lbl += '\t%s' % click.style(dspfunc(payload), fg='green')
    return lbl


@click.command()
@click.option('-p', '--packets/--no-packets', default=False)
@click.option('-f', '--filename', default=None)
def main(packets, filename):
    if filename:
        input_stream = open(filename)
    else:
        input_stream = None # == stdin
    if packets:
        try:
            for sot, payload in recv_packet(read_line(read_input(input_stream))):
                cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP'
                print('[%s]: %02X=[%s]' %
                      (cnxdir, sot.value,
                       ','.join('%02x' % x for x in payload)))
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise
    else:
        click.secho("Transmission CPU <--> DP", fg="red") 
        try:
            t0 = None
            for sot, payload in recv_packet(read_line(read_input(input_stream))):
                cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<'
                if t0 is None:
                    t0 = sot.timestamp
                click.secho('%08.0f ' % sot.relts,
                            fg='yellow', nl=False)
                click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.),
                            fg='blue', nl=False)
                click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False)
                click.secho(parse_packet(sot, payload), fg='white')
                t0 = sot.timestamp
        except:
            print('\n'.join(str(x) for x in pkts[-10:]))
            raise


if __name__ == '__main__':
    main()

mercurial