21 This aims at writing tests and virtual devices to check tools. |
21 This aims at writing tests and virtual devices to check tools. |
22 |
22 |
23 """ |
23 """ |
24 |
24 |
25 import os |
25 import os |
|
26 import glob |
26 import serial |
27 import serial |
27 import time |
28 import time |
28 import subprocess |
29 import subprocess |
29 import tempfile |
30 import tempfile |
30 import threading |
31 import threading |
31 from queue import Queue |
32 from queue import Queue |
32 import logging |
33 import logging |
33 from logging import debug, info, warning, error # noqa |
34 from logging import debug, info, warning, error # noqa |
|
35 import random |
34 |
36 |
35 |
37 |
36 logging.basicConfig() |
38 logging.basicConfig() |
37 logger = logging.getLogger(__name__) |
39 logger = logging.getLogger(__name__) |
38 |
40 |
39 SOCAT = '/usr/bin/socat' |
41 SOCAT = '/usr/bin/socat' |
40 |
42 |
41 |
43 |
42 class BasePrologixEmulator: |
44 class PrologixEmulator: |
|
45 params = ( |
|
46 ('mode', 1), |
|
47 ('addr', 5), |
|
48 ('auto', 0), |
|
49 ('eoi', 0), |
|
50 ('eos', 0), |
|
51 ('eot_enable', 0), |
|
52 ('eot_char', 0), |
|
53 ('read_tmo_ms', 1), |
|
54 ('savecfg', 1), |
|
55 ) |
|
56 params_constraints = ( |
|
57 ('mode', [0, 1]), |
|
58 ('auto', [0, 1]), |
|
59 ('eoi', [0, 1]), |
|
60 ('eos', [0, 1]), |
|
61 ('eot_enable', [0, 1]), |
|
62 ('eot_char', [0, 1]), |
|
63 ('read_tmo_ms', (1, 3000)), |
|
64 ('savecfg', [0, 1]), |
|
65 ) |
43 |
66 |
44 def __init__(self): |
67 def __init__(self): |
45 """ |
68 """ |
46 """ |
69 """ |
47 self.tmpdir = tempfile.mkdtemp() |
70 self.tmpdir = tempfile.mkdtemp() |
53 'PTY,raw,echo=0,link={}'.format(serialside)]) |
76 'PTY,raw,echo=0,link={}'.format(serialside)]) |
54 self.serialurl = serialside |
77 self.serialurl = serialside |
55 |
78 |
56 time.sleep(0.1) # wait a bit for socat to be started |
79 time.sleep(0.1) # wait a bit for socat to be started |
57 self.cnx = serial.Serial(busside, timeout=0) |
80 self.cnx = serial.Serial(busside, timeout=0) |
|
81 self.devices = {} |
58 |
82 |
59 self.running = True |
83 self.running = True |
60 self.output = Queue() |
84 self.output = Queue() |
61 self.mainloop = threading.Thread(target=self.run) |
85 self.mainloop = threading.Thread(target=self.run) |
62 self.reset() |
86 self.reset() |
63 |
87 |
64 self.mainloop.start() |
88 self.mainloop.start() |
65 |
89 |
|
90 def attach_device(self, addr, device): |
|
91 self.devices[addr] = device |
|
92 |
66 def reset(self): |
93 def reset(self): |
67 self.output.queue.clear() # should probably protect this... |
94 self.output.queue.clear() # should probably protect this... |
68 self.mode = 1 |
95 self.params = dict(self.__class__.params) |
69 self.addr = 5 |
|
70 |
96 |
71 def close(self): |
97 def close(self): |
72 self.running = False |
98 self.running = False |
73 self.socatp.kill() |
99 self.socatp.kill() |
74 self.socatp.wait() |
100 self.socatp.wait() |
93 for row in data.splitlines(): |
119 for row in data.splitlines(): |
94 self.output.put(row) |
120 self.output.put(row) |
95 |
121 |
96 def parse_data(self, row): |
122 def parse_data(self, row): |
97 logger.info('Received %r' % row) |
123 logger.info('Received %r' % row) |
98 self.output.put(b'OK\r\n') |
124 addr = self.params['addr'] |
|
125 if addr in self.devices: |
|
126 device = self.devices[addr] |
|
127 device.parse(row) |
99 |
128 |
100 def parse_cmd(self, row): |
129 def parse_cmd(self, row): |
101 logger.info('Received CMD %r' % row) |
130 logger.info('Received CMD %r' % row) |
102 cmd = row.decode().split() |
131 cmd = row.decode().split() |
103 args = cmd[1:] |
132 args = cmd[1:] |
104 cmd = cmd[0][2:] |
133 cmd = cmd[0][2:] |
105 getattr(self, 'parse_cmd_{}'.format(cmd))(args) |
134 if hasattr(self, 'parse_cmd_{}'.format(cmd)): |
106 |
135 return getattr(self, 'parse_cmd_{}'.format(cmd))(args) |
107 def parse_cmd_mode(self, args): |
136 elif cmd in self.params: |
|
137 self.parse_generic_cmd(cmd, args) |
|
138 else: |
|
139 raise Exception() |
|
140 |
|
141 def parse_generic_cmd(self, cmd, args): |
108 if not args: |
142 if not args: |
109 self.output.put(('%s\r\n' % self.mode).encode()) |
143 self.output.put(('%s\r\n' % self.params[cmd]).encode()) |
110 else: |
144 else: |
111 try: |
145 try: |
112 if len(args) == 1: |
146 if len(args) == 1: |
113 mode = int(args[0]) |
147 value = int(args[0]) |
114 assert mode in (0, 1) |
148 if cmd in self.params_constraints: |
115 logger.info('Set mode=%s' % mode) |
149 cstr = self.params_constraints[cmd] |
116 self.mode = mode |
150 if isinstance(cstr, list): |
|
151 assert value in cstr |
|
152 elif isinstance(cstr, tuple): |
|
153 assert cstr[0] <= value <= cstr[1] |
|
154 else: |
|
155 raise Exception() |
|
156 logger.info('Set %s=%s' % (cmd, value)) |
|
157 self.params[cmd] = value |
117 else: |
158 else: |
118 raise Exception() |
159 raise Exception() |
119 except: |
160 except: |
120 self.output.put(b'Error\r\n') |
161 self.output.put(b'Error\r\n') |
121 |
162 |
122 def parse_cmd_addr(self, args): |
163 def parse_cmd_addr(self, args): |
123 if not args: |
164 if not args: |
124 self.output.put(('%s\r\n' % self.addr).encode()) |
165 addrs = self.params['addr'] |
|
166 if isinstance(addrs, int): |
|
167 addrs = [addrs, None] |
|
168 addrs = ' '.join(str(x) for x in addrs if x is not None) |
|
169 self.output.put(('%s\r\n' % addrs).encode()) |
125 else: |
170 else: |
126 try: |
171 try: |
127 if len(args) == 1: |
172 if len(args) in (1, 2): |
128 addr = int(args[0]) |
173 values = [int(x) for x in args] |
129 assert 0 <= addr <= 31 |
174 if len(values) == 1: |
130 logger.info('Set addr=%s' % addr) |
175 values.append(None) |
131 self.addr = addr |
176 pad, sad = values |
|
177 assert 0 <= pad <= 30 |
|
178 if sad: |
|
179 assert 96 <= sad <= 126 |
|
180 logger.info('Set addr=%s' % (values)) |
|
181 if sad: |
|
182 self.params['addr'] = values |
|
183 else: |
|
184 self.params['addr'] = pad |
132 else: |
185 else: |
133 raise Exception() |
186 raise Exception() |
134 except: |
187 except: |
135 self.output.put(b'Error\r\n') |
188 self.output.put(b'Error\r\n') |
136 |
189 |
137 |
190 |
|
191 class HPGLPlotingDevice: |
|
192 def __init__(self, emulator, address=5): |
|
193 self.plotter_address = address |
|
194 self.filenames = glob.glob('examples/hpgl_plots/*.plt') |
|
195 self.emulator = emulator |
|
196 |
|
197 def plot(self, fname=None): |
|
198 if fname is None: |
|
199 fname = random.choice(self.filenames) |
|
200 with open(fname) as fobj: |
|
201 ret = fobj.read().strip() |
|
202 if ret: |
|
203 self.emulator.send(ret) |
|
204 |
|
205 |
138 def main(): |
206 def main(): |
139 emul = BasePrologixEmulator() |
207 emul = PrologixEmulator() |
140 cnx = serial.Serial(emul.serialurl, timeout=0) |
208 cnx = serial.Serial(emul.serialurl, timeout=0) |
141 for i in range(10): |
209 for i in range(10): |
142 data = '++cmd %s' % i |
210 data = '++cmd %s' % i |
143 print('SEND', data) |
211 print('SEND', data) |
144 cnx.write(data.encode() + b'\r\n') |
212 cnx.write(data.encode() + b'\r\n') |