Mon, 04 Jun 2018 22:28:05 +0200
[plotter] add a --mockup option to the demo main function
# This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. """Copyright (c) 2007-2018 David Douard (Paris, FRANCE). https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org prologix emulator ================= Base class for writing an emulator of a Prologix dongle. This aims at writing tests and virtual devices to check tools. """ import os import glob import serial import time import subprocess import tempfile import threading from queue import Queue import logging from logging import debug, info, warning, error # noqa import random logging.basicConfig() logger = logging.getLogger(__name__) SOCAT = '/usr/bin/socat' class PrologixEmulator: params = ( ('mode', 1), ('addr', 5), ('auto', 0), ('eoi', 0), ('eos', 0), ('eot_enable', 0), ('eot_char', 0), ('read_tmo_ms', 1), ('savecfg', 1), ) params_constraints = ( ('mode', [0, 1]), ('auto', [0, 1]), ('eoi', [0, 1]), ('eos', [0, 1]), ('eot_enable', [0, 1]), ('eot_char', [0, 1]), ('read_tmo_ms', (1, 3000)), ('savecfg', [0, 1]), ) def __init__(self): """ """ self.tmpdir = tempfile.mkdtemp() busside = os.path.join(self.tmpdir, 'bus') serialside = os.path.join(self.tmpdir, 'serial') self.socatp = subprocess.Popen([ SOCAT, 'PTY,raw,echo=0,link={}'.format(busside), 'PTY,raw,echo=0,link={}'.format(serialside)]) self.serialurl = serialside time.sleep(0.1) # wait a bit for socat to be started self.cnx = serial.Serial(busside, timeout=0) self.devices = {} self.running = True self.output = Queue() self.mainloop = threading.Thread(target=self.run) self.reset() self.mainloop.start() def attach_device(self, addr, device): self.devices[addr] = device def reset(self): self.output.queue.clear() # should probably protect this... self.params = dict(self.__class__.params) def close(self): self.running = False self.socatp.kill() self.socatp.wait() self.mainloop.join() def run(self): while self.running: data = self.cnx.readline().strip() if data: for row in data.splitlines(): if row[:2] == b'++': self.parse_cmd(row) else: self.parse_data(row) elif not self.output.empty(): data = self.output.get() logger.info('Sending %r' % data) self.cnx.write(data) else: time.sleep(0.01) def send(self, data): for row in data.splitlines(): self.output.put(row) def parse_data(self, row): logger.info('Received %r' % row) addr = self.params['addr'] if addr in self.devices: device = self.devices[addr] device.parse(row) def parse_cmd(self, row): logger.info('Received CMD %r' % row) cmd = row.decode().split() args = cmd[1:] cmd = cmd[0][2:] if hasattr(self, 'parse_cmd_{}'.format(cmd)): return getattr(self, 'parse_cmd_{}'.format(cmd))(args) elif cmd in self.params: self.parse_generic_cmd(cmd, args) else: raise Exception() def parse_generic_cmd(self, cmd, args): if not args: self.output.put(('%s\r\n' % self.params[cmd]).encode()) else: try: if len(args) == 1: value = int(args[0]) if cmd in self.params_constraints: cstr = self.params_constraints[cmd] if isinstance(cstr, list): assert value in cstr elif isinstance(cstr, tuple): assert cstr[0] <= value <= cstr[1] else: raise Exception() logger.info('Set %s=%s' % (cmd, value)) self.params[cmd] = value else: raise Exception() except: self.output.put(b'Error\r\n') def parse_cmd_addr(self, args): if not args: addrs = self.params['addr'] if isinstance(addrs, int): addrs = [addrs, None] addrs = ' '.join(str(x) for x in addrs if x is not None) self.output.put(('%s\r\n' % addrs).encode()) else: try: if len(args) in (1, 2): values = [int(x) for x in args] if len(values) == 1: values.append(None) pad, sad = values assert 0 <= pad <= 30 if sad: assert 96 <= sad <= 126 logger.info('Set addr=%s' % (values)) if sad: self.params['addr'] = values else: self.params['addr'] = pad else: raise Exception() except: self.output.put(b'Error\r\n') def parse_cmd_ifc(self, args): pass class HPGLPlotingDevice: def __init__(self, emulator, address=5): self.plotter_address = address self.filenames = glob.glob('examples/hpgl_plots/*.plt') self.emulator = emulator def plot(self, fname=None): if fname is None: fname = random.choice(self.filenames) with open(fname) as fobj: ret = fobj.read().strip() if ret: self.emulator.send(ret.encode()) def main(): emul = PrologixEmulator() device = HPGLPlotingDevice(emul) emul.attach_device(10, device) from pygpibtoolkit.plotter.gpib_plotter import GPIBplotter plotter = GPIBplotter( device=emul.serialurl, address=5, timeout=0.06) device.plot() while True: plot = plotter.load_plot() if plot: print('PLOT') break emul.close() if __name__ == '__main__': logger.setLevel(logging.DEBUG) main()