[prologix] beginning of a proper prologix dongle emulator

Sat, 19 May 2018 00:07:15 +0200

author
David Douard <david.douard@logilab.fr>
date
Sat, 19 May 2018 00:07:15 +0200
changeset 101
975576e55563
parent 100
709ffa28cf7e
child 102
91713944ebb0

[prologix] beginning of a proper prologix dongle emulator

.hgignore file | annotate | diff | comparison | revisions
pygpibtoolkit/prologix_emulator.py file | annotate | diff | comparison | revisions
pygpibtoolkit/test/test_prologix_emulator.py file | annotate | diff | comparison | revisions
setup.py file | annotate | diff | comparison | revisions
--- a/.hgignore	Sat May 19 00:06:38 2018 +0200
+++ b/.hgignore	Sat May 19 00:07:15 2018 +0200
@@ -5,3 +5,4 @@
 *.pyo
 *~
 *.orig
+.pytest_cache
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pygpibtoolkit/prologix_emulator.py	Sat May 19 00:07:15 2018 +0200
@@ -0,0 +1,153 @@
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation; either version 2 of the License, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
+#
+# You should have received a copy of the GNU General Public License along with
+# this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+"""Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
+https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org
+
+prologix emulator
+=================
+
+Base class for writing an emulator of a Prologix dongle.
+
+This aims at writing tests and virtual devices to check tools.
+
+"""
+
+import os
+import serial
+import time
+import subprocess
+import tempfile
+import threading
+from queue import Queue
+import logging
+from logging import debug, info, warning, error  # noqa
+
+
+logging.basicConfig()
+logger = logging.getLogger(__name__)
+
+SOCAT = '/usr/bin/socat'
+
+
+class BasePrologixEmulator:
+
+    def __init__(self):
+        """
+        """
+        self.tmpdir = tempfile.mkdtemp()
+        busside = os.path.join(self.tmpdir, 'bus')
+        serialside = os.path.join(self.tmpdir, 'serial')
+        self.socatp = subprocess.Popen([
+            SOCAT,
+            'PTY,raw,echo=0,link={}'.format(busside),
+            'PTY,raw,echo=0,link={}'.format(serialside)])
+        self.serialurl = serialside
+
+        time.sleep(0.1)  # wait a bit for socat to be started
+        self.cnx = serial.Serial(busside, timeout=0)
+
+        self.running = True
+        self.output = Queue()
+        self.mainloop = threading.Thread(target=self.run)
+        self.reset()
+
+        self.mainloop.start()
+
+    def reset(self):
+        self.output.queue.clear()  # should probably protect this...
+        self.mode = 1
+        self.addr = 5
+
+    def close(self):
+        self.running = False
+        self.socatp.kill()
+        self.socatp.wait()
+        self.mainloop.join()
+
+    def run(self):
+        while self.running:
+            data = self.cnx.readline().strip()
+            if data:
+                if data[:2] == b'++':
+                    self.parse_cmd(data)
+                else:
+                    self.parse_data(data)
+            elif not self.output.empty():
+                data = self.output.get()
+                logger.info('Sending %r' % data)
+                self.cnx.write(data)
+            else:
+                time.sleep(0.01)
+
+    def send(self, data):
+        for row in data.splitlines():
+            self.output.put(row)
+
+    def parse_data(self, row):
+        logger.info('Received %r' % row)
+        self.output.put(b'OK\r\n')
+
+    def parse_cmd(self, row):
+        logger.info('Received CMD %r' % row)
+        cmd = row.decode().split()
+        args = cmd[1:]
+        cmd = cmd[0][2:]
+        getattr(self, 'parse_cmd_{}'.format(cmd))(args)
+
+    def parse_cmd_mode(self, args):
+        if not args:
+            self.output.put(('%s\r\n' % self.mode).encode())
+        else:
+            try:
+                if len(args) == 1:
+                    mode = int(args[0])
+                    assert mode in (0, 1)
+                    logger.info('Set mode=%s' % mode)
+                    self.mode = mode
+                else:
+                    raise Exception()
+            except:
+                self.output.put(b'Error\r\n')
+
+    def parse_cmd_addr(self, args):
+        if not args:
+            self.output.put(('%s\r\n' % self.addr).encode())
+        else:
+            try:
+                if len(args) == 1:
+                    addr = int(args[0])
+                    assert 0 <= addr <= 31
+                    logger.info('Set addr=%s' % addr)
+                    self.addr = addr
+                else:
+                    raise Exception()
+            except:
+                self.output.put(b'Error\r\n')
+
+
+def main():
+    emul = BasePrologixEmulator()
+    cnx = serial.Serial(emul.serialurl, timeout=0)
+    for i in range(10):
+        data = '++cmd %s' % i
+        print('SEND', data)
+        cnx.write(data.encode() + b'\r\n')
+        time.sleep(0.1)
+        print('GOT %r' % cnx.readall())
+
+    emul.close()
+
+
+if __name__ == '__main__':
+    logger.setLevel(logging.DEBUG)
+    main()
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pygpibtoolkit/test/test_prologix_emulator.py	Sat May 19 00:07:15 2018 +0200
@@ -0,0 +1,91 @@
+#
+import serial
+import time
+import pygpibtoolkit.prologix_emulator as PE
+
+import pytest
+
+
+@pytest.fixture(scope='session')
+def emulator():
+    emul = PE.BasePrologixEmulator()
+    cnx = serial.Serial(emul.serialurl, timeout=0)
+    yield (emul, cnx)
+    emul.close()
+    cnx.close()
+
+
+def test_base(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    assert emul.mode == 1
+    assert emul.addr == 5
+
+
+def test_cmd_mode_get(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    cnx.write(b'++mode\r\n')
+    time.sleep(0.1)
+    result = cnx.readlines()
+    assert len(result) == 1
+    result = result[0].strip()
+    assert result == b'1'
+
+
+def test_cmd_mode_set(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    cnx.write(b'++mode 0\r\n')
+    time.sleep(0.1)
+    result = cnx.readlines()
+    assert len(result) == 0
+    assert emul.mode == 0
+
+
+def test_cmd_mode_err(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    for value in (b'2', b'0 0', b'a'):
+        cnx.write(b'++mode %s\r\n' % value)
+        time.sleep(0.1)
+        result = cnx.readlines()
+        assert len(result) == 1
+        result = result[0].strip()
+        assert result == b'Error'
+        assert emul.mode == 1
+
+
+def test_cmd_addr_get(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    cnx.write(b'++addr\r\n')
+    time.sleep(0.1)
+    result = cnx.readlines()
+    assert len(result) == 1
+    result = result[0].strip()
+    assert result == b'5'
+
+
+def test_cmd_addr_set(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    for value in (0, 1, 10, 31):
+        cnx.write(b'++addr %s\r\n' % str(value).encode())
+        time.sleep(0.1)
+        result = cnx.readlines()
+        assert len(result) == 0
+        assert emul.addr == value
+
+
+def test_cmd_addr_err(emulator):
+    emul, cnx = emulator
+    emul.reset()
+    for value in (b'-2', b'0 0', b'a'):
+        cnx.write(b'++addr %s\r\n' % value)
+        time.sleep(0.1)
+        result = cnx.readlines()
+        assert len(result) == 1
+        result = result[0].strip()
+        assert result == b'Error'
+        assert emul.addr == 5
--- a/setup.py	Sat May 19 00:06:38 2018 +0200
+++ b/setup.py	Sat May 19 00:07:15 2018 +0200
@@ -44,7 +44,9 @@
       version='0.1.0',
       install_requires=[
           'pyserial', 'numpy', 'PyQt5', 'pyqtgraph'],
-
+      extras_require={
+          'test': ['pytest'],
+      },
       entry_points={
           'console_scripts': [
               'pygpib-detect=pygpibtoolkit.detect:main',

mercurial