Mon, 28 Jan 2019 21:50:09 +0100
Add the serialdata tool
to read and decode serial data from the hp34970 using the dslogic.
import sys import re import click F = 1e6 reg = re.compile(r'^(?P<fr1>\d+)-(?P<fr2>\d+) .*: (?P<rxtx>rx|tx)-data: "(?P<value>..)"$') # noqa class ProtocolError(Exception): pass class ProtocolReset(Exception): pass pkts = [] class Packet: def __init__(self, frame, rxtx, value): self.frame = frame self.rxtx = rxtx self.value = value @property def timestamp(self): return self.frame / F * 1000 @property def relts(self): return self.timestamp - pkts[0].timestamp def __str__(self): return '[%06.2f] %s: %02X' % (self.timestamp, self.rxtx, self.value) def read_line(seq): for line in seq: m = reg.match(line) if m: frame = int(m.group('fr1')) v = int(m.group('value'), base=16) rxtx = m.group('rxtx') pkt = Packet(frame, rxtx, v) pkts.append(pkt) yield pkt def read_input(stream=None): if stream is None: stream = sys.stdin for line in stream: yield line def print_input(): t0 = None for pkt in read_line(read_input()): c = chr(pkt.value) if 32 <= pkt.value <= 127 else '' if t0 is None: t0 = pkt.timestamp cnxdir = 'CPU-->DP' if pkt.rxtx == 'rx' else 'CPU<--DP' print('[%s] %06.1f: %02X # %s' % ( cnxdir, (pkt.timestamp-t0), pkt.value, c)) def flipdir(rxtx): return 'rx' if rxtx == 'tx' else 'tx' def check_rxtx(cur, pkt): if cur.rxtx == pkt.rxtx: raise ProtocolError('Expected %s transmission %02X %02X' % (flipdir(cur.rxtx), cur.value, pkt.value)) def check_sot_ack(ack, sot): check_rxtx(ack, sot) if ack.value != (0xFF - sot.value): if ack.value == 0x66: raise ProtocolReset() raise ProtocolError('Expected ACK value %02X, got %02X' % (0xFF - sot.value, ack.value)) def wait_for_sot(seq, sot_pkt=None): if sot_pkt is None: for sot_pkt in seq: if sot_pkt.value not in SOT_VALUES: print('Unexpected value %s: %02X' % (sot_pkt.rxtx, sot_pkt.value)) else: break for ack_pkt in seq: try: check_sot_ack(sot_pkt, ack_pkt) # print('New packet %s' % sot_pkt) return sot_pkt, ack_pkt except ProtocolReset: # print('reset') sot_pkt = ack_pkt raise StopIteration() SOT_VALUES = [0x33, 0x66] def recv_packet(seq): sot_pkt = None while True: sot_pkt, ack_pkt = wait_for_sot(seq, sot_pkt) payload = [] recv_char = None may_stop = False cur_pkt = ack_pkt for pkt in seq: if pkt.value == 0x66: sot_pkt = pkt break check_rxtx(pkt, cur_pkt) if recv_char is None: if pkt.rxtx != sot_pkt.rxtx: raise ProtocolError() recv_char = pkt.value cur_pkt = pkt if may_stop: if recv_char == 0x55: cur_pkt = None sot_pkt = None break else: if pkt.rxtx == sot_pkt.rxtx: raise ProtocolError() if pkt.value == 0x00: payload.append(recv_char) if check_payload(sot_pkt, payload): may_stop = True yield (sot_pkt, payload) else: may_stop = False elif pkt.value != 0xFF: # if FF, ignore the char (ie. its a NACK, so resend) raise ProtocolError('Not really but hey %02X [%02X] %s' % (pkt.value, recv_char, payload)) recv_char = None cur_pkt = pkt else: break def check_payload(sot, payload): if sot.rxtx == 'tx': if sot.value == 0x33: return len(payload) == 2 else: return len(payload) == 1 else: if len(payload) > 1: return len(payload) == (payload[1]+2) return False KEYCODES = { 0x00: 'View', 0x01: 'Mon', 0x02: 'Sto/Rcl', 0x03: 'Scan', 0x04: 'Alarm', 0x05: 'Mx+B', 0x06: 'Measure', 0x07: 'Interval', 0x08: 'Card Reset', 0x09: 'Close', 0x0A: 'Open', 0x0B: 'Read', 0x0C: 'Shift', 0x0D: 'Write', 0x0E: 'Left', 0x0F: 'Right', 0x10: 'Advanced', 0x11: 'Step', 0x80: 'Knob right', 0x81: 'Knob left', } FLAGS = [ ['<Bell>', 'Mx+B', 'Ch. frame', 'Channel', 'LO', 'Alarm', 'HI', 'Al. frame'], ['2', '4', '3', '1', '4W', 'OC', '', 'AVG'], ['MAX', 'MIN', 'LAST', 'MEM', '', 'ONCE', 'EXT', 'ERROR'], ['REMOTE', 'ADRS', '*', 'VIEW', 'MON', 'SCAN', 'CONFIG', ''] ] def flag(payload): flags = [] for flg, byte in zip(FLAGS, payload[2:]): for i in range(8): if byte & 2**i: flags.append(flg[i]) return ','.join(flags) def flag_by_num(num): for b in FLAGS: for f in b: if num == 0: return f num -= 1 CMDS = { 0x00: ('DISPLAY ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0C: ('CHANNEL ', lambda pl: ''.join(chr(x) for x in pl[2:])), 0x0A: ('FLAGS ', flag), 0x01: ('UNSHIFT ', None), 0x86: ('SHUTDOWN', None), 0x0D: ('CHAR LO ', lambda pl: str(pl[2])), 0x08: ('FLAG LO ', lambda pl: flag_by_num(pl[2])), 0x09: ('FLAG HI ', lambda pl: flag_by_num(pl[2])), 0x02: ('RST? ', None), } def parse_packet(sot, payload): if sot.value == 0x33: return 'INIT KEY=%s' % KEYCODES.get(payload[1], 'None') if sot.rxtx == 'tx': kc = payload[0] if kc in (0x80, 0x81): return 'KNOB\t\t%s' % click.style(KEYCODES.get(kc), fg='magenta') if kc & 0x40: kc = kc & 0xBF event = 'RELEASED' else: event = 'PRESSED ' return 'KEY %s\t%s' % (click.style(event, fg='green'), click.style(KEYCODES.get(kc), fg='cyan')) lbl, dspfunc = CMDS[payload[0]] if dspfunc: lbl += '\t%s' % click.style(dspfunc(payload), fg='green') return lbl @click.command() @click.option('-p', '--packets/--no-packets', default=False) def main(packets): if packets: try: for sot, payload in recv_packet(read_line(read_input())): cnxdir = 'CPU=>DP' if sot.rxtx == 'rx' else 'CPU<-DP' print('[%s]: %02X=[%s]' % (cnxdir, sot.value, ','.join('%02x' % x for x in payload))) except: print('\n'.join(str(x) for x in pkts[-10:])) raise else: try: t0 = None for sot, payload in recv_packet(read_line(read_input())): cnxdir = '>>>' if sot.rxtx == 'rx' else '<<<' if t0 is None: t0 = sot.timestamp click.secho('%08.0f ' % sot.relts, fg='yellow', nl=False) click.secho('[+%04.2f] ' % ((sot.timestamp - t0)/1000.), fg='blue', nl=False) click.secho(cnxdir + ' ', fg='red', nl=False) click.secho(parse_packet(sot, payload), fg='white') t0 = sot.timestamp except: print('\n'.join(str(x) for x in pkts[-10:])) raise if __name__ == '__main__': main()