Sat, 19 May 2018 00:07:15 +0200
[prologix] beginning of a proper prologix dongle emulator
--- a/.hgignore Sat May 19 00:06:38 2018 +0200 +++ b/.hgignore Sat May 19 00:07:15 2018 +0200 @@ -5,3 +5,4 @@ *.pyo *~ *.orig +.pytest_cache
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pygpibtoolkit/prologix_emulator.py Sat May 19 00:07:15 2018 +0200 @@ -0,0 +1,153 @@ +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +"""Copyright (c) 2007-2018 David Douard (Paris, FRANCE). +https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org + +prologix emulator +================= + +Base class for writing an emulator of a Prologix dongle. + +This aims at writing tests and virtual devices to check tools. + +""" + +import os +import serial +import time +import subprocess +import tempfile +import threading +from queue import Queue +import logging +from logging import debug, info, warning, error # noqa + + +logging.basicConfig() +logger = logging.getLogger(__name__) + +SOCAT = '/usr/bin/socat' + + +class BasePrologixEmulator: + + def __init__(self): + """ + """ + self.tmpdir = tempfile.mkdtemp() + busside = os.path.join(self.tmpdir, 'bus') + serialside = os.path.join(self.tmpdir, 'serial') + self.socatp = subprocess.Popen([ + SOCAT, + 'PTY,raw,echo=0,link={}'.format(busside), + 'PTY,raw,echo=0,link={}'.format(serialside)]) + self.serialurl = serialside + + time.sleep(0.1) # wait a bit for socat to be started + self.cnx = serial.Serial(busside, timeout=0) + + self.running = True + self.output = Queue() + self.mainloop = threading.Thread(target=self.run) + self.reset() + + self.mainloop.start() + + def reset(self): + self.output.queue.clear() # should probably protect this... + self.mode = 1 + self.addr = 5 + + def close(self): + self.running = False + self.socatp.kill() + self.socatp.wait() + self.mainloop.join() + + def run(self): + while self.running: + data = self.cnx.readline().strip() + if data: + if data[:2] == b'++': + self.parse_cmd(data) + else: + self.parse_data(data) + elif not self.output.empty(): + data = self.output.get() + logger.info('Sending %r' % data) + self.cnx.write(data) + else: + time.sleep(0.01) + + def send(self, data): + for row in data.splitlines(): + self.output.put(row) + + def parse_data(self, row): + logger.info('Received %r' % row) + self.output.put(b'OK\r\n') + + def parse_cmd(self, row): + logger.info('Received CMD %r' % row) + cmd = row.decode().split() + args = cmd[1:] + cmd = cmd[0][2:] + getattr(self, 'parse_cmd_{}'.format(cmd))(args) + + def parse_cmd_mode(self, args): + if not args: + self.output.put(('%s\r\n' % self.mode).encode()) + else: + try: + if len(args) == 1: + mode = int(args[0]) + assert mode in (0, 1) + logger.info('Set mode=%s' % mode) + self.mode = mode + else: + raise Exception() + except: + self.output.put(b'Error\r\n') + + def parse_cmd_addr(self, args): + if not args: + self.output.put(('%s\r\n' % self.addr).encode()) + else: + try: + if len(args) == 1: + addr = int(args[0]) + assert 0 <= addr <= 31 + logger.info('Set addr=%s' % addr) + self.addr = addr + else: + raise Exception() + except: + self.output.put(b'Error\r\n') + + +def main(): + emul = BasePrologixEmulator() + cnx = serial.Serial(emul.serialurl, timeout=0) + for i in range(10): + data = '++cmd %s' % i + print('SEND', data) + cnx.write(data.encode() + b'\r\n') + time.sleep(0.1) + print('GOT %r' % cnx.readall()) + + emul.close() + + +if __name__ == '__main__': + logger.setLevel(logging.DEBUG) + main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pygpibtoolkit/test/test_prologix_emulator.py Sat May 19 00:07:15 2018 +0200 @@ -0,0 +1,91 @@ +# +import serial +import time +import pygpibtoolkit.prologix_emulator as PE + +import pytest + + +@pytest.fixture(scope='session') +def emulator(): + emul = PE.BasePrologixEmulator() + cnx = serial.Serial(emul.serialurl, timeout=0) + yield (emul, cnx) + emul.close() + cnx.close() + + +def test_base(emulator): + emul, cnx = emulator + emul.reset() + assert emul.mode == 1 + assert emul.addr == 5 + + +def test_cmd_mode_get(emulator): + emul, cnx = emulator + emul.reset() + cnx.write(b'++mode\r\n') + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 1 + result = result[0].strip() + assert result == b'1' + + +def test_cmd_mode_set(emulator): + emul, cnx = emulator + emul.reset() + cnx.write(b'++mode 0\r\n') + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 0 + assert emul.mode == 0 + + +def test_cmd_mode_err(emulator): + emul, cnx = emulator + emul.reset() + for value in (b'2', b'0 0', b'a'): + cnx.write(b'++mode %s\r\n' % value) + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 1 + result = result[0].strip() + assert result == b'Error' + assert emul.mode == 1 + + +def test_cmd_addr_get(emulator): + emul, cnx = emulator + emul.reset() + cnx.write(b'++addr\r\n') + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 1 + result = result[0].strip() + assert result == b'5' + + +def test_cmd_addr_set(emulator): + emul, cnx = emulator + emul.reset() + for value in (0, 1, 10, 31): + cnx.write(b'++addr %s\r\n' % str(value).encode()) + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 0 + assert emul.addr == value + + +def test_cmd_addr_err(emulator): + emul, cnx = emulator + emul.reset() + for value in (b'-2', b'0 0', b'a'): + cnx.write(b'++addr %s\r\n' % value) + time.sleep(0.1) + result = cnx.readlines() + assert len(result) == 1 + result = result[0].strip() + assert result == b'Error' + assert emul.addr == 5
--- a/setup.py Sat May 19 00:06:38 2018 +0200 +++ b/setup.py Sat May 19 00:07:15 2018 +0200 @@ -44,7 +44,9 @@ version='0.1.0', install_requires=[ 'pyserial', 'numpy', 'PyQt5', 'pyqtgraph'], - + extras_require={ + 'test': ['pytest'], + }, entry_points={ 'console_scripts': [ 'pygpib-detect=pygpibtoolkit.detect:main',