Sat, 19 May 2018 00:07:15 +0200
[prologix] beginning of a proper prologix dongle emulator
# This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """Copyright (c) 2007-2018 David Douard (Paris, FRANCE). https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org prologix emulator ================= Base class for writing an emulator of a Prologix dongle. This aims at writing tests and virtual devices to check tools. """ import os import serial import time import subprocess import tempfile import threading from queue import Queue import logging from logging import debug, info, warning, error # noqa logging.basicConfig() logger = logging.getLogger(__name__) SOCAT = '/usr/bin/socat' class BasePrologixEmulator: def __init__(self): """ """ self.tmpdir = tempfile.mkdtemp() busside = os.path.join(self.tmpdir, 'bus') serialside = os.path.join(self.tmpdir, 'serial') self.socatp = subprocess.Popen([ SOCAT, 'PTY,raw,echo=0,link={}'.format(busside), 'PTY,raw,echo=0,link={}'.format(serialside)]) self.serialurl = serialside time.sleep(0.1) # wait a bit for socat to be started self.cnx = serial.Serial(busside, timeout=0) self.running = True self.output = Queue() self.mainloop = threading.Thread(target=self.run) self.reset() self.mainloop.start() def reset(self): self.output.queue.clear() # should probably protect this... self.mode = 1 self.addr = 5 def close(self): self.running = False self.socatp.kill() self.socatp.wait() self.mainloop.join() def run(self): while self.running: data = self.cnx.readline().strip() if data: if data[:2] == b'++': self.parse_cmd(data) else: self.parse_data(data) elif not self.output.empty(): data = self.output.get() logger.info('Sending %r' % data) self.cnx.write(data) else: time.sleep(0.01) def send(self, data): for row in data.splitlines(): self.output.put(row) def parse_data(self, row): logger.info('Received %r' % row) self.output.put(b'OK\r\n') def parse_cmd(self, row): logger.info('Received CMD %r' % row) cmd = row.decode().split() args = cmd[1:] cmd = cmd[0][2:] getattr(self, 'parse_cmd_{}'.format(cmd))(args) def parse_cmd_mode(self, args): if not args: self.output.put(('%s\r\n' % self.mode).encode()) else: try: if len(args) == 1: mode = int(args[0]) assert mode in (0, 1) logger.info('Set mode=%s' % mode) self.mode = mode else: raise Exception() except: self.output.put(b'Error\r\n') def parse_cmd_addr(self, args): if not args: self.output.put(('%s\r\n' % self.addr).encode()) else: try: if len(args) == 1: addr = int(args[0]) assert 0 <= addr <= 31 logger.info('Set addr=%s' % addr) self.addr = addr else: raise Exception() except: self.output.put(b'Error\r\n') def main(): emul = BasePrologixEmulator() cnx = serial.Serial(emul.serialurl, timeout=0) for i in range(10): data = '++cmd %s' % i print('SEND', data) cnx.write(data.encode() + b'\r\n') time.sleep(0.1) print('GOT %r' % cnx.readall()) emul.close() if __name__ == '__main__': logger.setLevel(logging.DEBUG) main()