Sun, 01 Nov 2020 22:16:33 +0100
Add a series of reference serial data sessions from a working 34970A unit.
These sessions do have a few glitches, however (random off by one bit,
probably due to a sampling freq a bit too low).
Comes with an adapted version of the uart_filter.py script to interpret the
communication protocol.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/serialdata/ref/README Sun Nov 01 22:16:33 2020 +0100 @@ -0,0 +1,13 @@ +Sessions of UART capture of the reference HP34970A unit. + +In these capture frames: +- RX: FP -> CPU +- TX: CPU -> FP + +Example extraction of data: + + sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \ + --channels RX,TX -A uart=rx-data:tx-data:rx-start + +Start-bit: following code is a RX, so generated by the FP. +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/serialdata/ref/uart_filter.py Sun Nov 01 22:16:33 2020 +0100 @@ -0,0 +1,313 @@ +import sys +import re +import click + +F = 1e6 + +reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) uart-1: (?P<value>.+)$') # noqa +# to use with +# sigrok-cli -i startup-no-key -P uart:baudrate=187500:parity=even:rx=RX:tx=TX \ +# --channels RX,TX -A uart=rx-data:tx-data:tx-start --protocol-decoder-samplenum +# +# in this configuration: +# RX: FP --> CPU +# TX: CPU --> FP + +class ProtocolError(Exception): + pass + +class ProtocolReset(Exception): + pass + +pkts = [] + + +class Packet: + def __init__(self, frame, rxtx, value): + self.frame = frame + self.rxtx = rxtx + self.value = value + + @property + def timestamp(self): + return self.frame / F * 1000 + + @property + def relts(self): + return self.timestamp - pkts[0].timestamp + + def __str__(self): + return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) + + +def read_line(seq): + rxtx = "rx" + for line in seq: + m = reg.match(line) + if m: + frame = int(m.group('fr1')) + v = m.group('value') + if v == "Start bit": + rxtx = "tx" + else: + v = int(v, base=16) + pkt = Packet(frame, rxtx, v) + pkts.append(pkt) + yield pkt + rxtx = "rx" + else: + print("nope", line) + + +def read_input(stream=None): + if stream is None: + stream = sys.stdin + for line in stream: + yield line + + +def print_input(): + t0 = None + for pkt in read_line(read_input()): + c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' + if t0 is None: + t0 = pkt.timestamp + cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' + print('[%s] %06.1f: %02X # %s' % ( + cnxdir, (pkt.timestamp-t0), pkt.value, c)) + + +def flipdir(rxtx): + return 'rx' if rxtx == 'tx' else 'tx' + + +def check_rxtx(cur, pkt): + if cur.rxtx == pkt.rxtx: + raise ProtocolError('Expected %s transmission %02X %02X' % + (flipdir(cur.rxtx), cur.value, pkt.value)) + + +def check_sot_ack(ack, sot): + check_rxtx(ack, sot) + if ack.value != (0xFF - sot.value): + if ack.value == 0x66: + raise ProtocolReset() + raise ProtocolError('Expected ACK value %02X, got %02X' % + (0xFF - sot.value, ack.value)) + + +def wait_for_sot(seq, sot_pkt=None): + if sot_pkt is None: + for sot_pkt in seq: + #print("PKT", sot_pkt) + if sot_pkt.value not in SOT_VALUES: + print('Unexpected value %s: %02X' % + (sot_pkt.rxtx, sot_pkt.value)) + if sot_pkt.value == 0x76: # off by one due to sampling freq + print(" off by one on 0x66, fixing value") + sot_pkt.value = 0x66 + break + else: + break + for ack_pkt in seq: + try: + check_sot_ack(sot_pkt, ack_pkt) + #print('New packet %s %s' % (sot_pkt, ack_pkt)) + return sot_pkt, ack_pkt + except ProtocolReset: + # print('reset') + sot_pkt = ack_pkt + #raise StopIteration() + return None, None + + +SOT_VALUES = [0x33, 0x66] + + +def recv_packet(seq): + sot_pkt = None + while True: + sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) + if sot_pkt is None: + break + payload = [] + recv_char = None + may_stop = False + + cur_pkt = ack_pkt + for pkt in seq: + if pkt.value == 0x66: + sot_pkt = pkt + break + + check_rxtx(pkt, cur_pkt) + if recv_char is None: + if pkt.rxtx != sot_pkt.rxtx: + raise ProtocolError() + recv_char = pkt.value + cur_pkt = pkt + if may_stop: + if recv_char == 0x55: + cur_pkt = None + sot_pkt = None + break + else: + if pkt.rxtx == sot_pkt.rxtx: + raise ProtocolError() + if pkt.value == 0x00: # ack, previous recv-char is valid + payload.append(recv_char) + if check_payload(sot_pkt, payload): + may_stop = True + yield (sot_pkt, payload) + else: + may_stop = False + elif pkt.value != 0xFF: # really? + # if FF, ignore the char (ie. its a NACK, so resend) + raise ProtocolError('Not really but hey %02X [%02X] %s' % + (pkt.value, recv_char, payload)) + recv_char = None + cur_pkt = pkt + else: + break + + +def check_payload(sot, payload): + if sot.rxtx == 'rx': + if sot.value == 0x33: # init sequence, payload can be 2 or 3 chars: + if len(payload) > 1: + if payload[1] == 0xFF: # should be [0x02, 0xFF, <KEY>] + return len(payload) == 3 + else: # should be [0x02, 0x00] + return len(payload) == 2 + else: + return len(payload) == 1 + else: + if len(payload) > 1: + return len(payload) == (payload[1]+2) + return False + + +KEYCODES = { + 0x00: 'View', + 0x01: 'Mon', + 0x02: 'Sto/Rcl', + 0x03: 'Scan', + 0x04: 'Alarm', + 0x05: 'Mx+B', + 0x06: 'Measure', + 0x07: 'Interval', + 0x08: 'Card Reset', + 0x09: 'Close', + 0x0A: 'Open', + 0x0B: 'Read', + 0x0C: 'Shift', + 0x0D: 'Write', + 0x0E: 'Left', + 0x0F: 'Right', + 0x10: 'Advanced', + 0x11: 'Step', + 0x80: 'Knob right', + 0x81: 'Knob left', + } + + +FLAGS = [ + ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], + ['2', '4', '3', '1', '4W', 'OC', 'SHIFT?', 'AVG'], + ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], + ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] +] + + +def flag(payload): + flags = [] + for flg, byte in zip(FLAGS, payload[2:]): + for i in range(8): + if byte & 2**i: + flags.append(flg[i]) + return ','.join(flags) + + +def flag_by_num(num): + for b in FLAGS: + for f in b: + if num == 0: + return f + num -= 1 + +CMDS = { + 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), + 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), + 0x0A: ('FLAGS ', flag), + 0x01: ('CLR FLAG', lambda pl: flag_by_num(pl[2])), + 0x86: ('SHUTDOWN', None), + 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), + 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), + 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), + 0x02: ('RST? ', lambda pl: "[%s]" % (",".join("%02X"%c for c in pl))), + } + + +def parse_packet(sot, payload): + if sot.value == 0x33: + if len(payload) == 2: + return 'INIT NO KEY' + else: + return 'INIT WITH KEY\t\t%s' % click.style(KEYCODES.get(payload[-1], 'None'), fg='cyan') + if sot.rxtx == 'rx': + kc = payload[0] + if kc in (0x80, 0x81): + return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') + if kc & 0x40: + kc = kc & 0xBF + event = 'RELEASED' + else: + event = 'PRESSED ' + return 'KEY\t%s\t%s' % (click.style(event, fg='green'), + click.style(KEYCODES.get(kc, "[%02X]"%kc), fg='cyan')) + lbl, dspfunc = CMDS[payload[0]] + if dspfunc: + lbl += '\t%s' % click.style(dspfunc(payload), fg='green') + return lbl + + +@click.command() +@click.option('-p', '--packets/--no-packets', default=False) +@click.option('-f', '--filename', default=None) +def main(packets, filename): + if filename: + input_stream = open(filename) + else: + input_stream = None # == stdin + if packets: + try: + for sot, payload in recv_packet(read_line(read_input(input_stream))): + cnxdir = 'CPU=>DP' if sot.rxtx == 'tx' else 'CPU<-DP' + print('[%s]: %02X=[%s]' % + (cnxdir, sot.value, + ','.join('%02x' % x for x in payload))) + except: + print('\n'.join(str(x) for x in pkts[-10:])) + raise + else: + click.secho("Transmission CPU <--> DP", fg="red") + try: + t0 = None + for sot, payload in recv_packet(read_line(read_input(input_stream))): + cnxdir = '>>>' if sot.rxtx == 'tx' else '<<<' + if t0 is None: + t0 = sot.timestamp + click.secho('%08.0f ' % sot.relts, + fg='yellow', nl=False) + click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), + fg='blue', nl=False) + click.secho(cnxdir + ' ', fg='red' if cnxdir == ">>>" else 'green', nl=False) + click.secho(parse_packet(sot, payload), fg='white') + t0 = sot.timestamp + except: + print('\n'.join(str(x) for x in pkts[-10:])) + raise + + +if __name__ == '__main__': + main()