HP3562A/__init__.py

changeset 40
1bbea188a7e5
parent 39
8becd52c2171
child 41
9a453b2479c5
equal deleted inserted replaced
39:8becd52c2171 40:1bbea188a7e5
1 """
2 HP3562A
3 =======
4
5 Module for communicating with the HP 3562A Digital Signal Analyzer.
6
7 Subpackages
8 -----------
9
10
11 Constants
12 ---------
13
14 """
15 import struct
16 import re
17 import pygpib as gpib
18 import numpy
19
20 ########################################
21 # HP3562A internal binary types decoders
22
23 def decode_float(s):
24 if len(s) == 4:
25 i1, i2, e = struct.unpack('>hbb', s)
26 i2 = i2 * 2**(-23)
27 return (i1/32768. + i2)*2**e
28 else:
29 i1, i2, i3, i4, e = struct.unpack('>hhhbb', s)
30 if i2 < 0:
31 i2 = (i2+32768.)*2**(-15)
32 else:
33 i2 = i2*2**(-15)
34 if i3 < 0:
35 i3 = (i3+32768.)*2**(-30)
36 else:
37 i3 = i3*2**(-15)
38 i4 = i4 * 2**(-38)
39 return (i1+i2+i3+i4)*2**(e-15)
40
41 # def decode_float(s):
42 # assert len(s) in [4,8]
43 # # exponential term
44 # e = ord(s[-1])
45 # if e & 0x80:
46 # e = e - 256
47
48 # # mantissa
49 # m = [ord(x) for x in s[:-1]]
50 # M = 0.
51 # for i in range(len(s)-1):
52 # #M += m[i]<<(i*8)
53 # M += float(m[i])/2**((i+1)*8)
54 # # XXX how do we deal negative numbers?
55 # #if m[0] & 0x80:
56 # # M = M - 2^(len(s))
57 # return M * 2**(e+1)
58
59 def decode_string(s):
60 nb = ord(s[0])
61 s = s[1:nb+2]
62 r = ""
63 # XXX why do we need to do this? It's not described in the manual...
64 for c in s:
65 r += chr(ord(c) & 0x7F)
66 return r
67
68
69 ###
70 # Some useful functions
71 def format_header(header, head_struct, columns=80):
72 """
73 Pretty print a data block (trace, state or coord)
74 """
75 bool_re = re.compile(r'((?P<before>.*) )?(?P<flag>\w+/\w+)( (?P<after>.*))?')
76
77 todisp = []
78 for row in head_struct:
79 key = row[0]
80 typ = row[1]
81 if typ is None:
82 continue
83 val = header.get(key, "N/A")
84 if isinstance(val, basestring):
85 val = repr(val)
86 elif typ is bool and isinstance(val, typ):
87 m = bool_re.match(key)
88 if m:
89 d = m.groupdict()
90 key = ""
91 if d['before']:
92 key += d['before']
93 if d['after']:
94 key += d['after']
95 key = key.capitalize()
96 val = d['flag'].split('/')[not val]
97 else:
98 val = str(val)
99 else:
100 val = str(val)
101 todisp.append((key+":", val))
102 maxk = max([len(k) for k, v in todisp])
103 maxv = max([len(v) for k, v in todisp])
104 fmt = "%%-%ds %%-%ds"%(maxk, maxv)
105 w = maxk+maxv+4
106 ncols = columns/w
107 if ncols:
108 nrows = len(todisp)/ncols
109 else:
110 nrows = len(todisp)
111 res = ""
112 for i in range(nrows):
113 res += "| ".join([fmt%todisp[j*nrows+i] for j in range(ncols)]) + "\n"
114 return res
115
116 def decode_header(data, header_struct, idx=0):
117 d = data
118 if d[idx:].startswith('#'):
119 # we have a preliminary header here...
120 typ = d[idx:idx+2]
121 assert typ == "#A"
122 idx += 2
123 totlen = struct.unpack('>h', d[idx:idx+2])[0]
124 idx += 2
125 tt=0
126 header = {}
127 for i, (nam, dtype, fmt, nbytes) in enumerate(header_struct):
128 if dtype is None:
129 idx += nbytes
130 continue
131 elif dtype == str:
132 val = decode_string(d[idx:])
133 else:
134 if fmt:
135 v = struct.unpack('>'+fmt, d[idx: idx+nbytes])[0]
136 if isinstance(dtype, dict):
137 val = dtype.get(int(v), "N/A")
138 else:
139 val = dtype(v)
140 else:
141 val = dtype(d[idx: idx+nbytes])
142 header[nam] = val
143 idx += nbytes
144 return header, idx
145
146 def read_trace(data, idx, nelts):
147 assert len(data[idx:]) >= (nelts*4), "data[idx:] is too small (%s for %s)"%(len(data[idx:]), (nelts*4))
148 resu = []
149 for i in range(nelts):
150 resu.append(decode_float(data[idx: idx+4]))
151 idx += 4
152 return numpy.array(resu, dtype=float)
153
154 #####################
155 # HP3562A constants
156
157 # GPIB buffer size is 3x80 characters lines
158 class STATUS_BYTE(gpib.Constants):
159 # HP3562A Status Byte, as returned by a serial poll
160 _constants = [(0x40, "RQS", "Request Service"), # when sent in response to a serial poll
161 (0x20, "ERR", "GPIB error"),
162 (0x10, "RDY", "ready to accept GPIB commands"),
163 ]
164
165 conditions = [(0, "NSR", "No service requested"),
166 (1, "USRQ1", "User SRQ #1"),
167 (2, "USRQ1", "User SRQ #2"),
168 (3, "USRQ1", "User SRQ #3"),
169 (4, "USRQ1", "User SRQ #4"),
170 (5, "USRQ1", "User SRQ #5"),
171 (6, "USRQ1", "User SRQ #6"),
172 (7, "USRQ1", "User SRQ #7"),
173 (8, "USRQ1", "User SRQ #8"),
174 (9, "EOD", "End of disk action"),
175 (10, "EOP", "End of plot action"),
176 (11, "STCH", "Instrument status changed"), # any change in
177 # the status register sets this condition
178 (12, "PWR", "Power on"),
179 (13, "KEY", "Key pressed"),
180 (14, "DCP", "Device clear plotter (listen)"),
181 # ...
182 ]
183 def __init__(self):
184 super(STATUS_BYTE, self).__init__()
185 self._conditions = dict([(x[0], x[1]) for x in self.conditions])
186 self._rev_conditions = dict([(x[1], x[0]) for x in self.conditions])
187 self._long_conditions = dict([(x[0], x[2]) for x in self.conditions])
188
189 def byte_condition(self, byte):
190 byte = byte & 0x8F
191 return self._conditions.get(byte, "N/A")
192
193 class IS_REGISTER(gpib.Constants):
194 _constants = [(0x01, "MEASP", "measeurement pause"),
195 (0x02, "ASQP", "Auto sequence pause"),
196 (0X04, "EOM", "End of measurement, capture or throughput"),
197 (0x08, "EOAS", "End of auto sequence"),
198 (0x10, "SWPR", "Sweep point ready"),
199 (0x20, "CH1OV", "Channel 1 overrange"),
200 (0x40, "CH2OV", "Channel 2 overrange"),
201 (0X80, "CH1HR", "Channel 1 half range"),
202 (0x100, "CH2HR", "Channel 2 half range"),
203 (0x200, "SFALT", "Source falt"),
204 (0x400, "RUNL", "Reference unlock"),
205 (0x800, "RMKT", "Remote marker knob turn"),
206 (0x1000, "REKT", "Remote entry knob turn"),
207 (0x2000, "ASRC", "Asctive Status Register changed"),
208 (0x4000, "PWRF", "Power-on test failed"),
209 ]
210
211 class StatusQuery(gpib.Constants):
212 _command = "STA?"
213 _constants = [(0x01, "N/A", "Not used"),
214 (0x02, "N/A", "Not used"),
215 (0x04, "KEY", "Key pressed"),
216 (0x08, "N/A", "Not used"),
217 (0x10, "RDY", "Ready"),
218 (0x20, "ERR", "Error"),
219 (0x40, "RQS", "Request"),
220 (0x80, "MOS", "Message on screen"),
221 (0x100, "MEASP", "measeurement pause"),
222 (0x200, "ASQP", "Auto sequence pause"),
223 (0X400, "EOM", "End of measurement, capture or throughput"),
224 (0x800, "EOAS", "End of auto sequence"),
225 (0x1000, "SWPR", "Sweep point ready"),
226 (0x2000, "CH1OV", "Channel 1 overrange"),
227 (0x4000, "CH2OV", "Channel 2 overrange"),
228 (0x8000, "MAOV", "Math overflow"),
229 ]
230 class ActivityStatysRegister(gpib.Constants):
231 _command = "AS?"
232 _constants = [(0x01, "CKFL", "Check fault log"),
233 (0x02, "FITR", "Filling time record"),
234 (0x04, "FLTR", "Filters settings"),
235 (0x08, "CFTP", "Curve fir in progress"),
236 (0x10, "MSSM", "Missed sample"),
237 (0x20, "TMPR", "Timed preview"),
238 (0x40, "ACDA", "Accept date"),
239 #...
240 ]
241
242

mercurial