HP3562A/__init__.py

Wed, 19 Dec 2007 00:19:25 +0100

author
David Douard <david.douard@logilab.fr>
date
Wed, 19 Dec 2007 00:19:25 +0100
changeset 16
de9122b5680a
parent 15
b930440af354
child 39
8becd52c2171
permissions
-rw-r--r--

Several improvements (code refactoring) + add stuff to decode binary coordinate data block ("DCBN" command)

"""
HP3562A
=======

Module for communicating with the HP 3562A Digital Signal Analyzer.

Subpackages
-----------


Constants
---------

"""
import struct
import re
import gpib
import numpy

########################################
# HP3562A internal binary types decoders

def decode_float(s):
    if len(s) == 4:
        i1, i2, e = struct.unpack('>hbb', s)
        i2 = i2 * 2**(-23)
        return (i1/32768. + i2)*2**e
    else:
        i1, i2, i3, i4, e = struct.unpack('>hhhbb', s)
        if i2 < 0:
            i2 = (i2+32768.)*2**(-15)
        else:
            i2 = i2*2**(-15)
        if i3 < 0:
            i3 = (i3+32768.)*2**(-30)
        else:
            i3 = i3*2**(-15)
        i4 = i4 * 2**(-38)
        return (i1+i2+i3+i4)*2**(e-15)
        
# def decode_float(s):
#     assert len(s) in [4,8]
#     # exponential term 
#     e = ord(s[-1])
#     if e & 0x80:
#         e = e - 256

#     # mantissa
#     m = [ord(x) for x in s[:-1]]
#     M = 0.
#     for i in range(len(s)-1):
#         #M += m[i]<<(i*8)
#         M += float(m[i])/2**((i+1)*8)
#     # XXX how do we deal negative numbers?
#     #if m[0] & 0x80:
#     #    M = M - 2^(len(s))
#     return M * 2**(e+1)

def decode_string(s):
    nb = ord(s[0])
    s = s[1:nb+2]
    r = ""
    # XXX why do we need to do this? It's not described in the manual...
    for c in s:
        r += chr(ord(c) & 0x7F)
    return r


###
# Some useful functions
def format_header(header, head_struct, columns=80):
    """
    Pretty print a data block (trace, state or coord) 
    """
    bool_re = re.compile(r'((?P<before>.*) )?(?P<flag>\w+/\w+)( (?P<after>.*))?')
    
    todisp = []
    for row in head_struct:
        key = row[0]
        typ = row[1]
        if typ is None:
            continue
        val = header.get(key, "N/A")
        if isinstance(val, basestring):
            val = repr(val)
        elif typ is bool and isinstance(val, typ):
            m = bool_re.match(key)
            if m:
                d = m.groupdict()
                key = ""
                if d['before']:
                    key += d['before']
                if d['after']:
                    key += d['after']
                key = key.capitalize()
                val = d['flag'].split('/')[not val]
            else:
                val = str(val)
        else:            
            val = str(val)
        todisp.append((key+":", val))
    maxk = max([len(k) for k, v in todisp])
    maxv = max([len(v) for k, v in todisp])
    fmt = "%%-%ds %%-%ds"%(maxk, maxv)
    w = maxk+maxv+4
    ncols = columns/w
    if ncols:
        nrows = len(todisp)/ncols
    else:
        nrows = len(todisp)
    res = ""
    for i in range(nrows):
        res += "| ".join([fmt%todisp[j*nrows+i] for j in range(ncols)]) + "\n"
    return res

def decode_header(data, header_struct, idx=0):    
    d = data
    if d[idx:].startswith('#'):
        # we have a preliminary header here...
        typ = d[idx:idx+2]
        assert typ == "#A"
        idx += 2
        totlen = struct.unpack('>h', d[idx:idx+2])[0]
        idx += 2
    tt=0
    header = {}
    for i, (nam, dtype, fmt, nbytes) in enumerate(header_struct):
        if dtype is None:
            idx += nbytes
            continue
        elif dtype == str:
            val = decode_string(d[idx:])
        else:
            if fmt:
                v = struct.unpack('>'+fmt, d[idx: idx+nbytes])[0]
                if isinstance(dtype, dict):
                    val = dtype.get(int(v), "N/A")
                else:
                    val = dtype(v)
            else:
                val = dtype(d[idx: idx+nbytes])
        header[nam] = val
        idx += nbytes
    return header, idx

def read_trace(data, idx, nelts):
    assert len(data[idx:]) >= (nelts*4), "data[idx:] is too small (%s for %s)"%(len(data[idx:]), (nelts*4))
    resu = []
    for i in range(nelts):
        resu.append(decode_float(data[idx: idx+4]))
        idx += 4
    return numpy.array(resu, dtype=float)
    
#####################
# HP3562A constants

# GPIB buffer size is 3x80 characters lines
class STATUS_BYTE(gpib.Constants):
    # HP3562A Status Byte, as returned by a serial poll
    _constants = [(0x40, "RQS", "Request Service"), # when sent in response to a serial poll
                  (0x20, "ERR", "GPIB error"),
                  (0x10, "RDY", "ready to accept GPIB commands"),
                  ]

    conditions = [(0, "NSR", "No service requested"),
                  (1, "USRQ1", "User SRQ #1"),
                  (2, "USRQ1", "User SRQ #2"),
                  (3, "USRQ1", "User SRQ #3"),
                  (4, "USRQ1", "User SRQ #4"),
                  (5, "USRQ1", "User SRQ #5"),
                  (6, "USRQ1", "User SRQ #6"),
                  (7, "USRQ1", "User SRQ #7"),
                  (8, "USRQ1", "User SRQ #8"),
                  (9, "EOD", "End of disk action"),
                  (10, "EOP", "End of plot action"),
                  (11, "STCH", "Instrument status changed"), # any change in 
                  # the status register sets this condition
                  (12, "PWR", "Power on"),
                  (13, "KEY", "Key pressed"),
                  (14, "DCP", "Device clear plotter (listen)"),
                  # ...
                  ]
    def __init__(self):
        super(STATUS_BYTE, self).__init__()
        self._conditions = dict([(x[0], x[1]) for x in self.conditions])
        self._rev_conditions = dict([(x[1], x[0]) for x in self.conditions])
        self._long_conditions = dict([(x[0], x[2]) for x in self.conditions])
               
    def byte_condition(self, byte):
        byte = byte & 0x8F
        return self._conditions.get(byte, "N/A")

class IS_REGISTER(gpib.Constants):
    _constants = [(0x01, "MEASP", "measeurement pause"),
                  (0x02, "ASQP", "Auto sequence pause"),
                  (0X04, "EOM", "End of measurement, capture or throughput"),
                  (0x08, "EOAS", "End of auto sequence"),
                  (0x10, "SWPR", "Sweep point ready"),
                  (0x20, "CH1OV", "Channel 1 overrange"),
                  (0x40, "CH2OV", "Channel 2 overrange"),
                  (0X80, "CH1HR", "Channel 1 half range"),
                  (0x100, "CH2HR", "Channel 2 half range"),
                  (0x200, "SFALT", "Source falt"),
                  (0x400, "RUNL", "Reference unlock"),
                  (0x800, "RMKT", "Remote marker knob turn"),
                  (0x1000, "REKT", "Remote entry knob turn"),
                  (0x2000, "ASRC", "Asctive Status Register changed"),
                  (0x4000, "PWRF", "Power-on test failed"),
                  ]
    
class StatusQuery(gpib.Constants):
    _command = "STA?"
    _constants = [(0x01, "N/A", "Not used"),
                  (0x02, "N/A", "Not used"),
                  (0x04, "KEY", "Key pressed"),
                  (0x08, "N/A", "Not used"),
                  (0x10, "RDY", "Ready"),
                  (0x20, "ERR", "Error"),
                  (0x40, "RQS", "Request"),
                  (0x80, "MOS", "Message on screen"),
                  (0x100, "MEASP", "measeurement pause"),
                  (0x200, "ASQP", "Auto sequence pause"),
                  (0X400, "EOM", "End of measurement, capture or throughput"),
                  (0x800, "EOAS", "End of auto sequence"),
                  (0x1000, "SWPR", "Sweep point ready"),
                  (0x2000, "CH1OV", "Channel 1 overrange"),
                  (0x4000, "CH2OV", "Channel 2 overrange"),
                  (0x8000, "MAOV", "Math overflow"),
                  ]
class ActivityStatysRegister(gpib.Constants):
    _command = "AS?"
    _constants = [(0x01, "CKFL", "Check fault log"),
                  (0x02, "FITR", "Filling time record"),
                  (0x04, "FLTR", "Filters settings"),
                  (0x08, "CFTP", "Curve fir in progress"),
                  (0x10, "MSSM", "Missed sample"),
                  (0x20, "TMPR", "Timed preview"),
                  (0x40, "ACDA", "Accept date"),
                  #...
                  ]

    

mercurial