pygpibtoolkit/prologix_emulator.py

changeset 101
975576e55563
child 102
91713944ebb0
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pygpibtoolkit/prologix_emulator.py	Sat May 19 00:07:15 2018 +0200
@@ -0,0 +1,153 @@
+# This program is free software; you can redistribute it and/or modify it under
+# the terms of the GNU General Public License as published by the Free Software
+# Foundation; either version 2 of the License, or (at your option) any later
+# version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
+#
+# You should have received a copy of the GNU General Public License along with
+# this program; if not, write to the Free Software Foundation, Inc.,
+# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+"""Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
+https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org
+
+prologix emulator
+=================
+
+Base class for writing an emulator of a Prologix dongle.
+
+This aims at writing tests and virtual devices to check tools.
+
+"""
+
+import os
+import serial
+import time
+import subprocess
+import tempfile
+import threading
+from queue import Queue
+import logging
+from logging import debug, info, warning, error  # noqa
+
+
+logging.basicConfig()
+logger = logging.getLogger(__name__)
+
+SOCAT = '/usr/bin/socat'
+
+
+class BasePrologixEmulator:
+
+    def __init__(self):
+        """
+        """
+        self.tmpdir = tempfile.mkdtemp()
+        busside = os.path.join(self.tmpdir, 'bus')
+        serialside = os.path.join(self.tmpdir, 'serial')
+        self.socatp = subprocess.Popen([
+            SOCAT,
+            'PTY,raw,echo=0,link={}'.format(busside),
+            'PTY,raw,echo=0,link={}'.format(serialside)])
+        self.serialurl = serialside
+
+        time.sleep(0.1)  # wait a bit for socat to be started
+        self.cnx = serial.Serial(busside, timeout=0)
+
+        self.running = True
+        self.output = Queue()
+        self.mainloop = threading.Thread(target=self.run)
+        self.reset()
+
+        self.mainloop.start()
+
+    def reset(self):
+        self.output.queue.clear()  # should probably protect this...
+        self.mode = 1
+        self.addr = 5
+
+    def close(self):
+        self.running = False
+        self.socatp.kill()
+        self.socatp.wait()
+        self.mainloop.join()
+
+    def run(self):
+        while self.running:
+            data = self.cnx.readline().strip()
+            if data:
+                if data[:2] == b'++':
+                    self.parse_cmd(data)
+                else:
+                    self.parse_data(data)
+            elif not self.output.empty():
+                data = self.output.get()
+                logger.info('Sending %r' % data)
+                self.cnx.write(data)
+            else:
+                time.sleep(0.01)
+
+    def send(self, data):
+        for row in data.splitlines():
+            self.output.put(row)
+
+    def parse_data(self, row):
+        logger.info('Received %r' % row)
+        self.output.put(b'OK\r\n')
+
+    def parse_cmd(self, row):
+        logger.info('Received CMD %r' % row)
+        cmd = row.decode().split()
+        args = cmd[1:]
+        cmd = cmd[0][2:]
+        getattr(self, 'parse_cmd_{}'.format(cmd))(args)
+
+    def parse_cmd_mode(self, args):
+        if not args:
+            self.output.put(('%s\r\n' % self.mode).encode())
+        else:
+            try:
+                if len(args) == 1:
+                    mode = int(args[0])
+                    assert mode in (0, 1)
+                    logger.info('Set mode=%s' % mode)
+                    self.mode = mode
+                else:
+                    raise Exception()
+            except:
+                self.output.put(b'Error\r\n')
+
+    def parse_cmd_addr(self, args):
+        if not args:
+            self.output.put(('%s\r\n' % self.addr).encode())
+        else:
+            try:
+                if len(args) == 1:
+                    addr = int(args[0])
+                    assert 0 <= addr <= 31
+                    logger.info('Set addr=%s' % addr)
+                    self.addr = addr
+                else:
+                    raise Exception()
+            except:
+                self.output.put(b'Error\r\n')
+
+
+def main():
+    emul = BasePrologixEmulator()
+    cnx = serial.Serial(emul.serialurl, timeout=0)
+    for i in range(10):
+        data = '++cmd %s' % i
+        print('SEND', data)
+        cnx.write(data.encode() + b'\r\n')
+        time.sleep(0.1)
+        print('GOT %r' % cnx.readall())
+
+    emul.close()
+
+
+if __name__ == '__main__':
+    logger.setLevel(logging.DEBUG)
+    main()

mercurial