|
1 # This program is free software; you can redistribute it and/or modify it under |
|
2 # the terms of the GNU General Public License as published by the Free Software |
|
3 # Foundation; either version 2 of the License, or (at your option) any later |
|
4 # version. |
|
5 # |
|
6 # This program is distributed in the hope that it will be useful, but WITHOUT |
|
7 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS |
|
8 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details |
|
9 # |
|
10 # You should have received a copy of the GNU General Public License along with |
|
11 # this program; if not, write to the Free Software Foundation, Inc., |
|
12 # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
|
13 """Copyright (c) 2007-2018 David Douard (Paris, FRANCE). |
|
14 https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org |
|
15 |
|
16 prologix emulator |
|
17 ================= |
|
18 |
|
19 Base class for writing an emulator of a Prologix dongle. |
|
20 |
|
21 This aims at writing tests and virtual devices to check tools. |
|
22 |
|
23 """ |
|
24 |
|
25 import os |
|
26 import serial |
|
27 import time |
|
28 import subprocess |
|
29 import tempfile |
|
30 import threading |
|
31 from queue import Queue |
|
32 import logging |
|
33 from logging import debug, info, warning, error # noqa |
|
34 |
|
35 |
|
36 logging.basicConfig() |
|
37 logger = logging.getLogger(__name__) |
|
38 |
|
39 SOCAT = '/usr/bin/socat' |
|
40 |
|
41 |
|
42 class BasePrologixEmulator: |
|
43 |
|
44 def __init__(self): |
|
45 """ |
|
46 """ |
|
47 self.tmpdir = tempfile.mkdtemp() |
|
48 busside = os.path.join(self.tmpdir, 'bus') |
|
49 serialside = os.path.join(self.tmpdir, 'serial') |
|
50 self.socatp = subprocess.Popen([ |
|
51 SOCAT, |
|
52 'PTY,raw,echo=0,link={}'.format(busside), |
|
53 'PTY,raw,echo=0,link={}'.format(serialside)]) |
|
54 self.serialurl = serialside |
|
55 |
|
56 time.sleep(0.1) # wait a bit for socat to be started |
|
57 self.cnx = serial.Serial(busside, timeout=0) |
|
58 |
|
59 self.running = True |
|
60 self.output = Queue() |
|
61 self.mainloop = threading.Thread(target=self.run) |
|
62 self.reset() |
|
63 |
|
64 self.mainloop.start() |
|
65 |
|
66 def reset(self): |
|
67 self.output.queue.clear() # should probably protect this... |
|
68 self.mode = 1 |
|
69 self.addr = 5 |
|
70 |
|
71 def close(self): |
|
72 self.running = False |
|
73 self.socatp.kill() |
|
74 self.socatp.wait() |
|
75 self.mainloop.join() |
|
76 |
|
77 def run(self): |
|
78 while self.running: |
|
79 data = self.cnx.readline().strip() |
|
80 if data: |
|
81 if data[:2] == b'++': |
|
82 self.parse_cmd(data) |
|
83 else: |
|
84 self.parse_data(data) |
|
85 elif not self.output.empty(): |
|
86 data = self.output.get() |
|
87 logger.info('Sending %r' % data) |
|
88 self.cnx.write(data) |
|
89 else: |
|
90 time.sleep(0.01) |
|
91 |
|
92 def send(self, data): |
|
93 for row in data.splitlines(): |
|
94 self.output.put(row) |
|
95 |
|
96 def parse_data(self, row): |
|
97 logger.info('Received %r' % row) |
|
98 self.output.put(b'OK\r\n') |
|
99 |
|
100 def parse_cmd(self, row): |
|
101 logger.info('Received CMD %r' % row) |
|
102 cmd = row.decode().split() |
|
103 args = cmd[1:] |
|
104 cmd = cmd[0][2:] |
|
105 getattr(self, 'parse_cmd_{}'.format(cmd))(args) |
|
106 |
|
107 def parse_cmd_mode(self, args): |
|
108 if not args: |
|
109 self.output.put(('%s\r\n' % self.mode).encode()) |
|
110 else: |
|
111 try: |
|
112 if len(args) == 1: |
|
113 mode = int(args[0]) |
|
114 assert mode in (0, 1) |
|
115 logger.info('Set mode=%s' % mode) |
|
116 self.mode = mode |
|
117 else: |
|
118 raise Exception() |
|
119 except: |
|
120 self.output.put(b'Error\r\n') |
|
121 |
|
122 def parse_cmd_addr(self, args): |
|
123 if not args: |
|
124 self.output.put(('%s\r\n' % self.addr).encode()) |
|
125 else: |
|
126 try: |
|
127 if len(args) == 1: |
|
128 addr = int(args[0]) |
|
129 assert 0 <= addr <= 31 |
|
130 logger.info('Set addr=%s' % addr) |
|
131 self.addr = addr |
|
132 else: |
|
133 raise Exception() |
|
134 except: |
|
135 self.output.put(b'Error\r\n') |
|
136 |
|
137 |
|
138 def main(): |
|
139 emul = BasePrologixEmulator() |
|
140 cnx = serial.Serial(emul.serialurl, timeout=0) |
|
141 for i in range(10): |
|
142 data = '++cmd %s' % i |
|
143 print('SEND', data) |
|
144 cnx.write(data.encode() + b'\r\n') |
|
145 time.sleep(0.1) |
|
146 print('GOT %r' % cnx.readall()) |
|
147 |
|
148 emul.close() |
|
149 |
|
150 |
|
151 if __name__ == '__main__': |
|
152 logger.setLevel(logging.DEBUG) |
|
153 main() |