Thu, 12 Nov 2020 20:26:35 +0100
Reorganize the display + improvements for dimmed flags
- the whole upper zone is now dediacated to the main character line
- make sure eash flag has a dedicated non-overlaping area
- improve support for dimmed flags (not yet properly functionning since this
dimm state is actually stateful, so some major refactorings are needed to
properly handle this).
import sys import re import click F = 1e6 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$') # noqa class ProtocolError(Exception): pass class ProtocolReset(Exception): pass pkts = [] class Packet: def __init__(self, frame, rxtx, value): self.frame = frame self.rxtx = rxtx self.value = value @property def timestamp(self): return self.frame / F * 1000 @property def relts(self): return self.timestamp - pkts[0].timestamp def __str__(self): return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) def read_line(seq): for line in seq: m = reg.match(line) if m: frame = int(m.group('fr1')) v = int(m.group('value'), base=16) rxtx = m.group('rxtx') pkt = Packet(frame, rxtx, v) pkts.append(pkt) yield pkt def read_input(stream=None): if stream is None: stream = sys.stdin for line in stream: yield line def print_input(): t0 = None for pkt in read_line(read_input()): c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' if t0 is None: t0 = pkt.timestamp cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' print('[%s] %06.1f: %02X # %s' % ( cnxdir, (pkt.timestamp-t0), pkt.value, c)) def flipdir(rxtx): return 'rx' if rxtx == 'tx' else 'tx' def check_rxtx(cur, pkt): if cur.rxtx == pkt.rxtx: raise ProtocolError('Expected %s transmission %02X %02X' % (flipdir(cur.rxtx), cur.value, pkt.value)) def check_sot_ack(ack, sot): check_rxtx(ack, sot) if ack.value != (0xFF - sot.value): if ack.value == 0x66: raise ProtocolReset() raise ProtocolError('Expected ACK value %02X, got %02X' % (0xFF - sot.value, ack.value)) def wait_for_sot(seq, sot_pkt=None): if sot_pkt is None: for sot_pkt in seq: if sot_pkt.value not in SOT_VALUES: print('Unexpected value %s: %02X' % (sot_pkt.rxtx, sot_pkt.value)) else: break for ack_pkt in seq: try: check_sot_ack(sot_pkt, ack_pkt) # print('New packet %s' % sot_pkt) return sot_pkt, ack_pkt except ProtocolReset: # print('reset') sot_pkt = ack_pkt raise StopIteration() SOT_VALUES = [0x33, 0x66] def recv_packet(seq): sot_pkt = None while True: sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) payload = [] recv_char = None may_stop = False cur_pkt = ack_pkt for pkt in seq: if pkt.value == 0x66: sot_pkt = pkt break check_rxtx(pkt, cur_pkt) if recv_char is None: if pkt.rxtx != sot_pkt.rxtx: raise ProtocolError() recv_char = pkt.value cur_pkt = pkt if may_stop: if recv_char == 0x55: cur_pkt = None sot_pkt = None break else: if pkt.rxtx == sot_pkt.rxtx: raise ProtocolError() if pkt.value == 0x00: payload.append(recv_char) if check_payload(sot_pkt, payload): may_stop = True yield (sot_pkt, payload) else: may_stop = False elif pkt.value != 0xFF: # if FF, ignore the char (ie. its a NACK, so resend) raise ProtocolError('Not really but hey %02X [%02X] %s' % (pkt.value, recv_char, payload)) recv_char = None cur_pkt = pkt else: break def check_payload(sot, payload): if sot.rxtx == 'tx': if sot.value == 0x33: return len(payload) == 2 else: return len(payload) == 1 else: if len(payload) > 1: return len(payload) == (payload[1]+2) return False KEYCODES = { 0x00: 'View', 0x01: 'Mon', 0x02: 'Sto/Rcl', 0x03: 'Scan', 0x04: 'Alarm', 0x05: 'Mx+B', 0x06: 'Measure', 0x07: 'Interval', 0x08: 'Card Reset', 0x09: 'Close', 0x0A: 'Open', 0x0B: 'Read', 0x0C: 'Shift', 0x0D: 'Write', 0x0E: 'Left', 0x0F: 'Right', 0x10: 'Advanced', 0x11: 'Step', 0x80: 'Knob right', 0x81: 'Knob left', } FLAGS = [ ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'], ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] ] def flag(payload): flags = [] for flg, byte in zip(FLAGS, payload[2:]): for i in range(8): if byte & 2**i: flags.append(flg[i]) return ','.join(flags) def flag_by_num(num): for b in FLAGS: for f in b: if num == 0: return f num -= 1 CMDS = { 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0A: ('FLAGS ', flag), 0x01: ('UNSHIFT ', None), 0x86: ('SHUTDOWN', None), 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), 0x02: ('RST? ', None), } def parse_packet(sot, payload): if sot.value == 0x33: return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None') if sot.rxtx == 'tx': kc = payload[0] if kc in (0x80, 0x81): return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') if kc & 0x40: kc = kc & 0xBF event = 'RELEASED' else: event = 'PRESSED ' return 'KEY %s\t%s' % (click.style(event, fg='green'), click.style(KEYCODES.get(kc), fg='cyan')) lbl, dspfunc = CMDS[payload[0]] if dspfunc: lbl += '\t%s' % click.style(dspfunc(payload), fg='green') return lbl @click.command() @click.option('-p', '--packets/--no-packets', default=False) def main(packets): if packets: try: for sot, payload in recv_packet(read_line(read_input())): cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP' print('[%s]: %02X=[%s]' % (cnxdir, sot.value, ','.join('%02x' % x for x in payload))) except: print('\n'.join(str(x) for x in pkts[-10:])) raise else: try: t0 = None for sot, payload in recv_packet(read_line(read_input())): cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<' if t0 is None: t0 = sot.timestamp click.secho('%08.0f ' % sot.relts, fg='yellow', nl=False) click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), fg='blue', nl=False) click.secho(cnxdir + ' ', fg='red', nl=False) click.secho(parse_packet(sot, payload), fg='white') t0 = sot.timestamp except: print('\n'.join(str(x) for x in pkts[-10:])) raise if __name__ == '__main__': main()