Thu, 12 Nov 2020 20:26:35 +0100
Reorganize the display + improvements for dimmed flags
- the whole upper zone is now dediacated to the main character line
- make sure eash flag has a dedicated non-overlaping area
- improve support for dimmed flags (not yet properly functionning since this
dimm state is actually stateful, so some major refactorings are needed to
properly handle this).
import sys import re import click F = 1e6 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$') # noqa # to use with # sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \ # --channels RX,TX -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum # # in this configuration: # RX: FP --> CPU # TX: CPU --> FP class ProtocolError(Exception): pass class ProtocolReset(Exception): pass pkts = [] class Packet: def __init__(self, frame, rxtx, value): self.frame = frame self.rxtx = rxtx self.value = value @property def timestamp(self): return self.frame / F * 1000 @property def relts(self): return self.timestamp - pkts[0].timestamp def __str__(self): return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) def read_line(seq): rxtx = "rx" for line in seq: m = reg.match(line) if m: frame = int(m.group('fr1')) v = m.group('value') if v == "Start bit": rxtx = "tx" else: v = int(v, base=16) pkt = Packet(frame, rxtx, v) pkts.append(pkt) yield pkt rxtx = "rx" else: print("nope", line) def read_input(stream=None): if stream is None: stream = sys.stdin for line in stream: yield line def print_input(): t0 = None for pkt in read_line(read_input()): c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' if t0 is None: t0 = pkt.timestamp cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' print('[%s] %06.1f: %02X # %s' % ( cnxdir, (pkt.timestamp-t0), pkt.value, c)) def flipdir(rxtx): return 'rx' if rxtx == 'tx' else 'tx' def check_rxtx(cur, pkt): if cur.rxtx == pkt.rxtx: raise ProtocolError('Expected %s transmission %02X %02X' % (flipdir(cur.rxtx), cur.value, pkt.value)) def check_sot_ack(ack, sot): check_rxtx(ack, sot) if ack.value != (0xFF - sot.value): if ack.value == 0x66: raise ProtocolReset() raise ProtocolError('Expected ACK value %02X, got %02X' % (0xFF - sot.value, ack.value)) def wait_for_sot(seq, sot_pkt=None): if sot_pkt is None: for sot_pkt in seq: #print("PKT", sot_pkt) if sot_pkt.value not in SOT_VALUES: print('Unexpected value %s: %02X' % (sot_pkt.rxtx, sot_pkt.value)) if sot_pkt.value == 0x76: # off by one due to sampling freq print(" off by one on 0x66, fixing value") sot_pkt.value = 0x66 break else: break for ack_pkt in seq: try: check_sot_ack(sot_pkt, ack_pkt) #print('New packet %s %s' % (sot_pkt, ack_pkt)) return sot_pkt, ack_pkt except ProtocolReset: # print('reset') sot_pkt = ack_pkt #raise StopIteration() return None, None SOT_VALUES = [0x33, 0x66] def recv_packet(seq): sot_pkt = None while True: sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) if sot_pkt is None: break payload = [] recv_char = None may_stop = False cur_pkt = ack_pkt for pkt in seq: if pkt.value == 0x66: sot_pkt = pkt break check_rxtx(pkt, cur_pkt) if recv_char is None: if pkt.rxtx != sot_pkt.rxtx: raise ProtocolError() recv_char = pkt.value cur_pkt = pkt if may_stop: if recv_char == 0x55: cur_pkt = None sot_pkt = None break else: if pkt.rxtx == sot_pkt.rxtx: raise ProtocolError() if pkt.value == 0x00: # ack, previous recv-char is valid payload.append(recv_char) if check_payload(sot_pkt, payload): may_stop = True yield (sot_pkt, payload) else: may_stop = False elif pkt.value != 0xFF: # really? # if FF, ignore the char (ie. its a NACK, so resend) raise ProtocolError('Not really but hey %02X [%02X] %s' % (pkt.value, recv_char, payload)) recv_char = None cur_pkt = pkt else: break def check_payload(sot, payload): if sot.rxtx == 'rx': if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars: if len(payload) > 1: if payload[1] == 0xFF: # should be [0x02, 0xFF, <KEY>] return len(payload) == 3 else: # should be [0x02, 0x00] return len(payload) == 2 else: return len(payload) == 1 else: if len(payload) > 1: return len(payload) == (payload[1]+2) return False KEYCODES = { 0x00: 'View', 0x01: 'Mon', 0x02: 'Sto/Rcl', 0x03: 'Scan', 0x04: 'Alarm', 0x05: 'Mx+B', 0x06: 'Measure', 0x07: 'Interval', 0x08: 'Card Reset', 0x09: 'Close', 0x0A: 'Open', 0x0B: 'Read', 0x0C: 'Shift', 0x0D: 'Write', 0x0E: 'Left', 0x0F: 'Right', 0x10: 'Advanced', 0x11: 'Step', 0x80: 'Knob right', 0x81: 'Knob left', } FLAGS = [ ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'], ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] ] def flag(payload): flags = [] for flg, byte in zip(FLAGS, payload[2:]): for i in range(8): if byte & 2**i: flags.append(flg[i]) return ','.join(flags) def flag_by_num(num): for b in FLAGS: for f in b: if num == 0: return f num -= 1 CMDS = { 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0A: ('FLAGS ', flag), 0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])), 0x86: ('SHUTDOWN', None), 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), 0x02: ('RST? ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))), } def parse_packet(sot, payload): if sot.value == 0x33: if len(payload) == 2: return 'INIT NO KEY' else: return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan') if sot.rxtx == 'rx': kc = payload[0] if kc in (0x80, 0x81): return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') if kc & 0x40: kc = kc & 0xBF event = 'RELEASED' else: event = 'PRESSED ' return 'KEY\t%s\t%s' % (click.style(event, fg='green'), click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan')) lbl, dspfunc = CMDS[payload[0]] if dspfunc: lbl += '\t%s' % click.style(dspfunc(payload), fg='green') return lbl @click.command() @click.option('-p', '--packets/--no-packets', default=False) @click.option('-f', '--filename', default=None) def main(packets, filename): if filename: input_stream = open(filename) else: input_stream = None # == stdin if packets: try: for sot, payload in recv_packet(read_line(read_input(input_stream))): cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP' print('[%s]: %02X=[%s]' % (cnxdir, sot.value, ','.join('%02x' % x for x in payload))) except: print('\n'.join(str(x) for x in pkts[-10:])) raise else: click.secho("Transmission CPU <--> DP", fg="red") try: t0 = None for sot, payload in recv_packet(read_line(read_input(input_stream))): cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<' if t0 is None: t0 = sot.timestamp click.secho('%08.0f ' % sot.relts, fg='yellow', nl=False) click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), fg='blue', nl=False) click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False) click.secho(parse_packet(sot, payload), fg='white') t0 = sot.timestamp except: print('\n'.join(str(x) for x in pkts[-10:])) raise if __name__ == '__main__': main()