pygpibtoolkit/prologix_emulator.py

changeset 101
975576e55563
child 102
91713944ebb0
equal deleted inserted replaced
100:709ffa28cf7e 101:975576e55563
1 # This program is free software; you can redistribute it and/or modify it under
2 # the terms of the GNU General Public License as published by the Free Software
3 # Foundation; either version 2 of the License, or (at your option) any later
4 # version.
5 #
6 # This program is distributed in the hope that it will be useful, but WITHOUT
7 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
8 # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
9 #
10 # You should have received a copy of the GNU General Public License along with
11 # this program; if not, write to the Free Software Foundation, Inc.,
12 # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
13 """Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
14 https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org
15
16 prologix emulator
17 =================
18
19 Base class for writing an emulator of a Prologix dongle.
20
21 This aims at writing tests and virtual devices to check tools.
22
23 """
24
25 import os
26 import serial
27 import time
28 import subprocess
29 import tempfile
30 import threading
31 from queue import Queue
32 import logging
33 from logging import debug, info, warning, error # noqa
34
35
36 logging.basicConfig()
37 logger = logging.getLogger(__name__)
38
39 SOCAT = '/usr/bin/socat'
40
41
42 class BasePrologixEmulator:
43
44 def __init__(self):
45 """
46 """
47 self.tmpdir = tempfile.mkdtemp()
48 busside = os.path.join(self.tmpdir, 'bus')
49 serialside = os.path.join(self.tmpdir, 'serial')
50 self.socatp = subprocess.Popen([
51 SOCAT,
52 'PTY,raw,echo=0,link={}'.format(busside),
53 'PTY,raw,echo=0,link={}'.format(serialside)])
54 self.serialurl = serialside
55
56 time.sleep(0.1) # wait a bit for socat to be started
57 self.cnx = serial.Serial(busside, timeout=0)
58
59 self.running = True
60 self.output = Queue()
61 self.mainloop = threading.Thread(target=self.run)
62 self.reset()
63
64 self.mainloop.start()
65
66 def reset(self):
67 self.output.queue.clear() # should probably protect this...
68 self.mode = 1
69 self.addr = 5
70
71 def close(self):
72 self.running = False
73 self.socatp.kill()
74 self.socatp.wait()
75 self.mainloop.join()
76
77 def run(self):
78 while self.running:
79 data = self.cnx.readline().strip()
80 if data:
81 if data[:2] == b'++':
82 self.parse_cmd(data)
83 else:
84 self.parse_data(data)
85 elif not self.output.empty():
86 data = self.output.get()
87 logger.info('Sending %r' % data)
88 self.cnx.write(data)
89 else:
90 time.sleep(0.01)
91
92 def send(self, data):
93 for row in data.splitlines():
94 self.output.put(row)
95
96 def parse_data(self, row):
97 logger.info('Received %r' % row)
98 self.output.put(b'OK\r\n')
99
100 def parse_cmd(self, row):
101 logger.info('Received CMD %r' % row)
102 cmd = row.decode().split()
103 args = cmd[1:]
104 cmd = cmd[0][2:]
105 getattr(self, 'parse_cmd_{}'.format(cmd))(args)
106
107 def parse_cmd_mode(self, args):
108 if not args:
109 self.output.put(('%s\r\n' % self.mode).encode())
110 else:
111 try:
112 if len(args) == 1:
113 mode = int(args[0])
114 assert mode in (0, 1)
115 logger.info('Set mode=%s' % mode)
116 self.mode = mode
117 else:
118 raise Exception()
119 except:
120 self.output.put(b'Error\r\n')
121
122 def parse_cmd_addr(self, args):
123 if not args:
124 self.output.put(('%s\r\n' % self.addr).encode())
125 else:
126 try:
127 if len(args) == 1:
128 addr = int(args[0])
129 assert 0 <= addr <= 31
130 logger.info('Set addr=%s' % addr)
131 self.addr = addr
132 else:
133 raise Exception()
134 except:
135 self.output.put(b'Error\r\n')
136
137
138 def main():
139 emul = BasePrologixEmulator()
140 cnx = serial.Serial(emul.serialurl, timeout=0)
141 for i in range(10):
142 data = '++cmd %s' % i
143 print('SEND', data)
144 cnx.write(data.encode() + b'\r\n')
145 time.sleep(0.1)
146 print('GOT %r' % cnx.readall())
147
148 emul.close()
149
150
151 if __name__ == '__main__':
152 logger.setLevel(logging.DEBUG)
153 main()

mercurial