pygpibtoolkit/prologix_emulator.py

changeset 102
91713944ebb0
parent 101
975576e55563
child 105
89123c2af2fd
equal deleted inserted replaced
101:975576e55563 102:91713944ebb0
21 This aims at writing tests and virtual devices to check tools. 21 This aims at writing tests and virtual devices to check tools.
22 22
23 """ 23 """
24 24
25 import os 25 import os
26 import glob
26 import serial 27 import serial
27 import time 28 import time
28 import subprocess 29 import subprocess
29 import tempfile 30 import tempfile
30 import threading 31 import threading
31 from queue import Queue 32 from queue import Queue
32 import logging 33 import logging
33 from logging import debug, info, warning, error # noqa 34 from logging import debug, info, warning, error # noqa
35 import random
34 36
35 37
36 logging.basicConfig() 38 logging.basicConfig()
37 logger = logging.getLogger(__name__) 39 logger = logging.getLogger(__name__)
38 40
39 SOCAT = '/usr/bin/socat' 41 SOCAT = '/usr/bin/socat'
40 42
41 43
42 class BasePrologixEmulator: 44 class PrologixEmulator:
45 params = (
46 ('mode', 1),
47 ('addr', 5),
48 ('auto', 0),
49 ('eoi', 0),
50 ('eos', 0),
51 ('eot_enable', 0),
52 ('eot_char', 0),
53 ('read_tmo_ms', 1),
54 ('savecfg', 1),
55 )
56 params_constraints = (
57 ('mode', [0, 1]),
58 ('auto', [0, 1]),
59 ('eoi', [0, 1]),
60 ('eos', [0, 1]),
61 ('eot_enable', [0, 1]),
62 ('eot_char', [0, 1]),
63 ('read_tmo_ms', (1, 3000)),
64 ('savecfg', [0, 1]),
65 )
43 66
44 def __init__(self): 67 def __init__(self):
45 """ 68 """
46 """ 69 """
47 self.tmpdir = tempfile.mkdtemp() 70 self.tmpdir = tempfile.mkdtemp()
53 'PTY,raw,echo=0,link={}'.format(serialside)]) 76 'PTY,raw,echo=0,link={}'.format(serialside)])
54 self.serialurl = serialside 77 self.serialurl = serialside
55 78
56 time.sleep(0.1) # wait a bit for socat to be started 79 time.sleep(0.1) # wait a bit for socat to be started
57 self.cnx = serial.Serial(busside, timeout=0) 80 self.cnx = serial.Serial(busside, timeout=0)
81 self.devices = {}
58 82
59 self.running = True 83 self.running = True
60 self.output = Queue() 84 self.output = Queue()
61 self.mainloop = threading.Thread(target=self.run) 85 self.mainloop = threading.Thread(target=self.run)
62 self.reset() 86 self.reset()
63 87
64 self.mainloop.start() 88 self.mainloop.start()
65 89
90 def attach_device(self, addr, device):
91 self.devices[addr] = device
92
66 def reset(self): 93 def reset(self):
67 self.output.queue.clear() # should probably protect this... 94 self.output.queue.clear() # should probably protect this...
68 self.mode = 1 95 self.params = dict(self.__class__.params)
69 self.addr = 5
70 96
71 def close(self): 97 def close(self):
72 self.running = False 98 self.running = False
73 self.socatp.kill() 99 self.socatp.kill()
74 self.socatp.wait() 100 self.socatp.wait()
93 for row in data.splitlines(): 119 for row in data.splitlines():
94 self.output.put(row) 120 self.output.put(row)
95 121
96 def parse_data(self, row): 122 def parse_data(self, row):
97 logger.info('Received %r' % row) 123 logger.info('Received %r' % row)
98 self.output.put(b'OK\r\n') 124 addr = self.params['addr']
125 if addr in self.devices:
126 device = self.devices[addr]
127 device.parse(row)
99 128
100 def parse_cmd(self, row): 129 def parse_cmd(self, row):
101 logger.info('Received CMD %r' % row) 130 logger.info('Received CMD %r' % row)
102 cmd = row.decode().split() 131 cmd = row.decode().split()
103 args = cmd[1:] 132 args = cmd[1:]
104 cmd = cmd[0][2:] 133 cmd = cmd[0][2:]
105 getattr(self, 'parse_cmd_{}'.format(cmd))(args) 134 if hasattr(self, 'parse_cmd_{}'.format(cmd)):
106 135 return getattr(self, 'parse_cmd_{}'.format(cmd))(args)
107 def parse_cmd_mode(self, args): 136 elif cmd in self.params:
137 self.parse_generic_cmd(cmd, args)
138 else:
139 raise Exception()
140
141 def parse_generic_cmd(self, cmd, args):
108 if not args: 142 if not args:
109 self.output.put(('%s\r\n' % self.mode).encode()) 143 self.output.put(('%s\r\n' % self.params[cmd]).encode())
110 else: 144 else:
111 try: 145 try:
112 if len(args) == 1: 146 if len(args) == 1:
113 mode = int(args[0]) 147 value = int(args[0])
114 assert mode in (0, 1) 148 if cmd in self.params_constraints:
115 logger.info('Set mode=%s' % mode) 149 cstr = self.params_constraints[cmd]
116 self.mode = mode 150 if isinstance(cstr, list):
151 assert value in cstr
152 elif isinstance(cstr, tuple):
153 assert cstr[0] <= value <= cstr[1]
154 else:
155 raise Exception()
156 logger.info('Set %s=%s' % (cmd, value))
157 self.params[cmd] = value
117 else: 158 else:
118 raise Exception() 159 raise Exception()
119 except: 160 except:
120 self.output.put(b'Error\r\n') 161 self.output.put(b'Error\r\n')
121 162
122 def parse_cmd_addr(self, args): 163 def parse_cmd_addr(self, args):
123 if not args: 164 if not args:
124 self.output.put(('%s\r\n' % self.addr).encode()) 165 addrs = self.params['addr']
166 if isinstance(addrs, int):
167 addrs = [addrs, None]
168 addrs = ' '.join(str(x) for x in addrs if x is not None)
169 self.output.put(('%s\r\n' % addrs).encode())
125 else: 170 else:
126 try: 171 try:
127 if len(args) == 1: 172 if len(args) in (1, 2):
128 addr = int(args[0]) 173 values = [int(x) for x in args]
129 assert 0 <= addr <= 31 174 if len(values) == 1:
130 logger.info('Set addr=%s' % addr) 175 values.append(None)
131 self.addr = addr 176 pad, sad = values
177 assert 0 <= pad <= 30
178 if sad:
179 assert 96 <= sad <= 126
180 logger.info('Set addr=%s' % (values))
181 if sad:
182 self.params['addr'] = values
183 else:
184 self.params['addr'] = pad
132 else: 185 else:
133 raise Exception() 186 raise Exception()
134 except: 187 except:
135 self.output.put(b'Error\r\n') 188 self.output.put(b'Error\r\n')
136 189
137 190
191 class HPGLPlotingDevice:
192 def __init__(self, emulator, address=5):
193 self.plotter_address = address
194 self.filenames = glob.glob('examples/hpgl_plots/*.plt')
195 self.emulator = emulator
196
197 def plot(self, fname=None):
198 if fname is None:
199 fname = random.choice(self.filenames)
200 with open(fname) as fobj:
201 ret = fobj.read().strip()
202 if ret:
203 self.emulator.send(ret)
204
205
138 def main(): 206 def main():
139 emul = BasePrologixEmulator() 207 emul = PrologixEmulator()
140 cnx = serial.Serial(emul.serialurl, timeout=0) 208 cnx = serial.Serial(emul.serialurl, timeout=0)
141 for i in range(10): 209 for i in range(10):
142 data = '++cmd %s' % i 210 data = '++cmd %s' % i
143 print('SEND', data) 211 print('SEND', data)
144 cnx.write(data.encode() + b'\r\n') 212 cnx.write(data.encode() + b'\r\n')

mercurial