Sun, 01 Nov 2020 22:16:33 +0100
Add a series of reference serial data sessions from a working 34970A unit.
These sessions do have a few glitches, however (random off by one bit,
probably due to a sampling freq a bit too low).
Comes with an adapted version of the uart_filter.py script to interpret the
communication protocol.
import sys import re import click F = 1e6 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$') # noqa # to use with # sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \ # --channels RX,TX -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum # # in this configuration: # RX: FP --> CPU # TX: CPU --> FP class ProtocolError(Exception): pass class ProtocolReset(Exception): pass pkts = [] class Packet: def __init__(self, frame, rxtx, value): self.frame = frame self.rxtx = rxtx self.value = value @property def timestamp(self): return self.frame / F * 1000 @property def relts(self): return self.timestamp - pkts[0].timestamp def __str__(self): return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) def read_line(seq): rxtx = "rx" for line in seq: m = reg.match(line) if m: frame = int(m.group('fr1')) v = m.group('value') if v == "Start bit": rxtx = "tx" else: v = int(v, base=16) pkt = Packet(frame, rxtx, v) pkts.append(pkt) yield pkt rxtx = "rx" else: print("nope", line) def read_input(stream=None): if stream is None: stream = sys.stdin for line in stream: yield line def print_input(): t0 = None for pkt in read_line(read_input()): c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' if t0 is None: t0 = pkt.timestamp cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' print('[%s] %06.1f: %02X # %s' % ( cnxdir, (pkt.timestamp-t0), pkt.value, c)) def flipdir(rxtx): return 'rx' if rxtx == 'tx' else 'tx' def check_rxtx(cur, pkt): if cur.rxtx == pkt.rxtx: raise ProtocolError('Expected %s transmission %02X %02X' % (flipdir(cur.rxtx), cur.value, pkt.value)) def check_sot_ack(ack, sot): check_rxtx(ack, sot) if ack.value != (0xFF - sot.value): if ack.value == 0x66: raise ProtocolReset() raise ProtocolError('Expected ACK value %02X, got %02X' % (0xFF - sot.value, ack.value)) def wait_for_sot(seq, sot_pkt=None): if sot_pkt is None: for sot_pkt in seq: #print("PKT", sot_pkt) if sot_pkt.value not in SOT_VALUES: print('Unexpected value %s: %02X' % (sot_pkt.rxtx, sot_pkt.value)) if sot_pkt.value == 0x76: # off by one due to sampling freq print(" off by one on 0x66, fixing value") sot_pkt.value = 0x66 break else: break for ack_pkt in seq: try: check_sot_ack(sot_pkt, ack_pkt) #print('New packet %s %s' % (sot_pkt, ack_pkt)) return sot_pkt, ack_pkt except ProtocolReset: # print('reset') sot_pkt = ack_pkt #raise StopIteration() return None, None SOT_VALUES = [0x33, 0x66] def recv_packet(seq): sot_pkt = None while True: sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) if sot_pkt is None: break payload = [] recv_char = None may_stop = False cur_pkt = ack_pkt for pkt in seq: if pkt.value == 0x66: sot_pkt = pkt break check_rxtx(pkt, cur_pkt) if recv_char is None: if pkt.rxtx != sot_pkt.rxtx: raise ProtocolError() recv_char = pkt.value cur_pkt = pkt if may_stop: if recv_char == 0x55: cur_pkt = None sot_pkt = None break else: if pkt.rxtx == sot_pkt.rxtx: raise ProtocolError() if pkt.value == 0x00: # ack, previous recv-char is valid payload.append(recv_char) if check_payload(sot_pkt, payload): may_stop = True yield (sot_pkt, payload) else: may_stop = False elif pkt.value != 0xFF: # really? # if FF, ignore the char (ie. its a NACK, so resend) raise ProtocolError('Not really but hey %02X [%02X] %s' % (pkt.value, recv_char, payload)) recv_char = None cur_pkt = pkt else: break def check_payload(sot, payload): if sot.rxtx == 'rx': if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars: if len(payload) > 1: if payload[1] == 0xFF: # should be [0x02, 0xFF, <KEY>] return len(payload) == 3 else: # should be [0x02, 0x00] return len(payload) == 2 else: return len(payload) == 1 else: if len(payload) > 1: return len(payload) == (payload[1]+2) return False KEYCODES = { 0x00: 'View', 0x01: 'Mon', 0x02: 'Sto/Rcl', 0x03: 'Scan', 0x04: 'Alarm', 0x05: 'Mx+B', 0x06: 'Measure', 0x07: 'Interval', 0x08: 'Card Reset', 0x09: 'Close', 0x0A: 'Open', 0x0B: 'Read', 0x0C: 'Shift', 0x0D: 'Write', 0x0E: 'Left', 0x0F: 'Right', 0x10: 'Advanced', 0x11: 'Step', 0x80: 'Knob right', 0x81: 'Knob left', } FLAGS = [ ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'], ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] ] def flag(payload): flags = [] for flg, byte in zip(FLAGS, payload[2:]): for i in range(8): if byte & 2**i: flags.append(flg[i]) return ','.join(flags) def flag_by_num(num): for b in FLAGS: for f in b: if num == 0: return f num -= 1 CMDS = { 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0A: ('FLAGS ', flag), 0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])), 0x86: ('SHUTDOWN', None), 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), 0x02: ('RST? ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))), } def parse_packet(sot, payload): if sot.value == 0x33: if len(payload) == 2: return 'INIT NO KEY' else: return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan') if sot.rxtx == 'rx': kc = payload[0] if kc in (0x80, 0x81): return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') if kc & 0x40: kc = kc & 0xBF event = 'RELEASED' else: event = 'PRESSED ' return 'KEY\t%s\t%s' % (click.style(event, fg='green'), click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan')) lbl, dspfunc = CMDS[payload[0]] if dspfunc: lbl += '\t%s' % click.style(dspfunc(payload), fg='green') return lbl @click.command() @click.option('-p', '--packets/--no-packets', default=False) @click.option('-f', '--filename', default=None) def main(packets, filename): if filename: input_stream = open(filename) else: input_stream = None # == stdin if packets: try: for sot, payload in recv_packet(read_line(read_input(input_stream))): cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP' print('[%s]: %02X=[%s]' % (cnxdir, sot.value, ','.join('%02x' % x for x in payload))) except: print('\n'.join(str(x) for x in pkts[-10:])) raise else: click.secho("Transmission CPU <--> DP", fg="red") try: t0 = None for sot, payload in recv_packet(read_line(read_input(input_stream))): cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<' if t0 is None: t0 = sot.timestamp click.secho('%08.0f ' % sot.relts, fg='yellow', nl=False) click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), fg='blue', nl=False) click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False) click.secho(parse_packet(sot, payload), fg='white') t0 = sot.timestamp except: print('\n'.join(str(x) for x in pkts[-10:])) raise if __name__ == '__main__': main()