Fri, 21 Dec 2007 23:51:36 +0100
added a (very) beginning HPGL parser (with a simplist matplotlib backend)
""" HP3562A ======= Module for communicating with the HP 3562A Digital Signal Analyzer. Subpackages ----------- Constants --------- """ import struct import re import gpib import numpy ######################################## # HP3562A internal binary types decoders def decode_float(s): if len(s) == 4: i1, i2, e = struct.unpack('>hbb', s) i2 = i2 * 2**(-23) return (i1/32768. + i2)*2**e else: i1, i2, i3, i4, e = struct.unpack('>hhhbb', s) if i2 < 0: i2 = (i2+32768.)*2**(-15) else: i2 = i2*2**(-15) if i3 < 0: i3 = (i3+32768.)*2**(-30) else: i3 = i3*2**(-15) i4 = i4 * 2**(-38) return (i1+i2+i3+i4)*2**(e-15) # def decode_float(s): # assert len(s) in [4,8] # # exponential term # e = ord(s[-1]) # if e & 0x80: # e = e - 256 # # mantissa # m = [ord(x) for x in s[:-1]] # M = 0. # for i in range(len(s)-1): # #M += m[i]<<(i*8) # M += float(m[i])/2**((i+1)*8) # # XXX how do we deal negative numbers? # #if m[0] & 0x80: # # M = M - 2^(len(s)) # return M * 2**(e+1) def decode_string(s): nb = ord(s[0]) s = s[1:nb+2] r = "" # XXX why do we need to do this? It's not described in the manual... for c in s: r += chr(ord(c) & 0x7F) return r ### # Some useful functions def format_header(header, head_struct, columns=80): """ Pretty print a data block (trace, state or coord) """ bool_re = re.compile(r'((?P<before>.*) )?(?P<flag>\w+/\w+)( (?P<after>.*))?') todisp = [] for row in head_struct: key = row[0] typ = row[1] if typ is None: continue val = header.get(key, "N/A") if isinstance(val, basestring): val = repr(val) elif typ is bool and isinstance(val, typ): m = bool_re.match(key) if m: d = m.groupdict() key = "" if d['before']: key += d['before'] if d['after']: key += d['after'] key = key.capitalize() val = d['flag'].split('/')[not val] else: val = str(val) else: val = str(val) todisp.append((key+":", val)) maxk = max([len(k) for k, v in todisp]) maxv = max([len(v) for k, v in todisp]) fmt = "%%-%ds %%-%ds"%(maxk, maxv) w = maxk+maxv+4 ncols = columns/w if ncols: nrows = len(todisp)/ncols else: nrows = len(todisp) res = "" for i in range(nrows): res += "| ".join([fmt%todisp[j*nrows+i] for j in range(ncols)]) + "\n" return res def decode_header(data, header_struct, idx=0): d = data if d[idx:].startswith('#'): # we have a preliminary header here... typ = d[idx:idx+2] assert typ == "#A" idx += 2 totlen = struct.unpack('>h', d[idx:idx+2])[0] idx += 2 tt=0 header = {} for i, (nam, dtype, fmt, nbytes) in enumerate(header_struct): if dtype is None: idx += nbytes continue elif dtype == str: val = decode_string(d[idx:]) else: if fmt: v = struct.unpack('>'+fmt, d[idx: idx+nbytes])[0] if isinstance(dtype, dict): val = dtype.get(int(v), "N/A") else: val = dtype(v) else: val = dtype(d[idx: idx+nbytes]) header[nam] = val idx += nbytes return header, idx def read_trace(data, idx, nelts): assert len(data[idx:]) >= (nelts*4), "data[idx:] is too small (%s for %s)"%(len(data[idx:]), (nelts*4)) resu = [] for i in range(nelts): resu.append(decode_float(data[idx: idx+4])) idx += 4 return numpy.array(resu, dtype=float) ##################### # HP3562A constants # GPIB buffer size is 3x80 characters lines class STATUS_BYTE(gpib.Constants): # HP3562A Status Byte, as returned by a serial poll _constants = [(0x40, "RQS", "Request Service"), # when sent in response to a serial poll (0x20, "ERR", "GPIB error"), (0x10, "RDY", "ready to accept GPIB commands"), ] conditions = [(0, "NSR", "No service requested"), (1, "USRQ1", "User SRQ #1"), (2, "USRQ1", "User SRQ #2"), (3, "USRQ1", "User SRQ #3"), (4, "USRQ1", "User SRQ #4"), (5, "USRQ1", "User SRQ #5"), (6, "USRQ1", "User SRQ #6"), (7, "USRQ1", "User SRQ #7"), (8, "USRQ1", "User SRQ #8"), (9, "EOD", "End of disk action"), (10, "EOP", "End of plot action"), (11, "STCH", "Instrument status changed"), # any change in # the status register sets this condition (12, "PWR", "Power on"), (13, "KEY", "Key pressed"), (14, "DCP", "Device clear plotter (listen)"), # ... ] def __init__(self): super(STATUS_BYTE, self).__init__() self._conditions = dict([(x[0], x[1]) for x in self.conditions]) self._rev_conditions = dict([(x[1], x[0]) for x in self.conditions]) self._long_conditions = dict([(x[0], x[2]) for x in self.conditions]) def byte_condition(self, byte): byte = byte & 0x8F return self._conditions.get(byte, "N/A") class IS_REGISTER(gpib.Constants): _constants = [(0x01, "MEASP", "measeurement pause"), (0x02, "ASQP", "Auto sequence pause"), (0X04, "EOM", "End of measurement, capture or throughput"), (0x08, "EOAS", "End of auto sequence"), (0x10, "SWPR", "Sweep point ready"), (0x20, "CH1OV", "Channel 1 overrange"), (0x40, "CH2OV", "Channel 2 overrange"), (0X80, "CH1HR", "Channel 1 half range"), (0x100, "CH2HR", "Channel 2 half range"), (0x200, "SFALT", "Source falt"), (0x400, "RUNL", "Reference unlock"), (0x800, "RMKT", "Remote marker knob turn"), (0x1000, "REKT", "Remote entry knob turn"), (0x2000, "ASRC", "Asctive Status Register changed"), (0x4000, "PWRF", "Power-on test failed"), ] class StatusQuery(gpib.Constants): _command = "STA?" _constants = [(0x01, "N/A", "Not used"), (0x02, "N/A", "Not used"), (0x04, "KEY", "Key pressed"), (0x08, "N/A", "Not used"), (0x10, "RDY", "Ready"), (0x20, "ERR", "Error"), (0x40, "RQS", "Request"), (0x80, "MOS", "Message on screen"), (0x100, "MEASP", "measeurement pause"), (0x200, "ASQP", "Auto sequence pause"), (0X400, "EOM", "End of measurement, capture or throughput"), (0x800, "EOAS", "End of auto sequence"), (0x1000, "SWPR", "Sweep point ready"), (0x2000, "CH1OV", "Channel 1 overrange"), (0x4000, "CH2OV", "Channel 2 overrange"), (0x8000, "MAOV", "Math overflow"), ] class ActivityStatysRegister(gpib.Constants): _command = "AS?" _constants = [(0x01, "CKFL", "Check fault log"), (0x02, "FITR", "Filling time record"), (0x04, "FLTR", "Filters settings"), (0x08, "CFTP", "Curve fir in progress"), (0x10, "MSSM", "Missed sample"), (0x20, "TMPR", "Timed preview"), (0x40, "ACDA", "Accept date"), #... ]