[prologix] implement more prologix commands and add API to register emulated devices

Thu, 24 May 2018 23:08:14 +0200

author
David Douard <david.douard@logilab.fr>
date
Thu, 24 May 2018 23:08:14 +0200
changeset 102
91713944ebb0
parent 101
975576e55563
child 103
ad9b6e53ca68

[prologix] implement more prologix commands and add API to register emulated devices

also refactor tests as generative tests using pytest's parametrized API.

pygpibtoolkit/prologix_emulator.py file | annotate | diff | comparison | revisions
pygpibtoolkit/test/test_prologix_emulator.py file | annotate | diff | comparison | revisions
--- a/pygpibtoolkit/prologix_emulator.py	Sat May 19 00:07:15 2018 +0200
+++ b/pygpibtoolkit/prologix_emulator.py	Thu May 24 23:08:14 2018 +0200
@@ -23,6 +23,7 @@
 """
 
 import os
+import glob
 import serial
 import time
 import subprocess
@@ -31,6 +32,7 @@
 from queue import Queue
 import logging
 from logging import debug, info, warning, error  # noqa
+import random
 
 
 logging.basicConfig()
@@ -39,7 +41,28 @@
 SOCAT = '/usr/bin/socat'
 
 
-class BasePrologixEmulator:
+class PrologixEmulator:
+    params = (
+        ('mode', 1),
+        ('addr', 5),
+        ('auto', 0),
+        ('eoi', 0),
+        ('eos', 0),
+        ('eot_enable', 0),
+        ('eot_char', 0),
+        ('read_tmo_ms', 1),
+        ('savecfg', 1),
+    )
+    params_constraints = (
+        ('mode', [0, 1]),
+        ('auto', [0, 1]),
+        ('eoi', [0, 1]),
+        ('eos', [0, 1]),
+        ('eot_enable', [0, 1]),
+        ('eot_char', [0, 1]),
+        ('read_tmo_ms', (1, 3000)),
+        ('savecfg', [0, 1]),
+    )
 
     def __init__(self):
         """
@@ -55,6 +78,7 @@
 
         time.sleep(0.1)  # wait a bit for socat to be started
         self.cnx = serial.Serial(busside, timeout=0)
+        self.devices = {}
 
         self.running = True
         self.output = Queue()
@@ -63,10 +87,12 @@
 
         self.mainloop.start()
 
+    def attach_device(self, addr, device):
+        self.devices[addr] = device
+
     def reset(self):
         self.output.queue.clear()  # should probably protect this...
-        self.mode = 1
-        self.addr = 5
+        self.params = dict(self.__class__.params)
 
     def close(self):
         self.running = False
@@ -95,25 +121,40 @@
 
     def parse_data(self, row):
         logger.info('Received %r' % row)
-        self.output.put(b'OK\r\n')
+        addr = self.params['addr']
+        if addr in self.devices:
+            device = self.devices[addr]
+            device.parse(row)
 
     def parse_cmd(self, row):
         logger.info('Received CMD %r' % row)
         cmd = row.decode().split()
         args = cmd[1:]
         cmd = cmd[0][2:]
-        getattr(self, 'parse_cmd_{}'.format(cmd))(args)
+        if hasattr(self, 'parse_cmd_{}'.format(cmd)):
+            return getattr(self, 'parse_cmd_{}'.format(cmd))(args)
+        elif cmd in self.params:
+            self.parse_generic_cmd(cmd, args)
+        else:
+            raise Exception()
 
-    def parse_cmd_mode(self, args):
+    def parse_generic_cmd(self, cmd, args):
         if not args:
-            self.output.put(('%s\r\n' % self.mode).encode())
+            self.output.put(('%s\r\n' % self.params[cmd]).encode())
         else:
             try:
                 if len(args) == 1:
-                    mode = int(args[0])
-                    assert mode in (0, 1)
-                    logger.info('Set mode=%s' % mode)
-                    self.mode = mode
+                    value = int(args[0])
+                    if cmd in self.params_constraints:
+                        cstr = self.params_constraints[cmd]
+                        if isinstance(cstr, list):
+                            assert value in cstr
+                        elif isinstance(cstr, tuple):
+                            assert cstr[0] <= value <= cstr[1]
+                        else:
+                            raise Exception()
+                    logger.info('Set %s=%s' % (cmd, value))
+                    self.params[cmd] = value
                 else:
                     raise Exception()
             except:
@@ -121,22 +162,49 @@
 
     def parse_cmd_addr(self, args):
         if not args:
-            self.output.put(('%s\r\n' % self.addr).encode())
+            addrs = self.params['addr']
+            if isinstance(addrs, int):
+                addrs = [addrs, None]
+            addrs = ' '.join(str(x) for x in addrs if x is not None)
+            self.output.put(('%s\r\n' % addrs).encode())
         else:
             try:
-                if len(args) == 1:
-                    addr = int(args[0])
-                    assert 0 <= addr <= 31
-                    logger.info('Set addr=%s' % addr)
-                    self.addr = addr
+                if len(args) in (1, 2):
+                    values = [int(x) for x in args]
+                    if len(values) == 1:
+                        values.append(None)
+                    pad, sad = values
+                    assert 0 <= pad <= 30
+                    if sad:
+                        assert 96 <= sad <= 126
+                    logger.info('Set addr=%s' % (values))
+                    if sad:
+                        self.params['addr'] = values
+                    else:
+                        self.params['addr'] = pad
                 else:
                     raise Exception()
             except:
                 self.output.put(b'Error\r\n')
 
 
+class HPGLPlotingDevice:
+    def __init__(self, emulator, address=5):
+        self.plotter_address = address
+        self.filenames = glob.glob('examples/hpgl_plots/*.plt')
+        self.emulator = emulator
+
+    def plot(self, fname=None):
+        if fname is None:
+            fname = random.choice(self.filenames)
+        with open(fname) as fobj:
+            ret = fobj.read().strip()
+            if ret:
+                self.emulator.send(ret)
+
+
 def main():
-    emul = BasePrologixEmulator()
+    emul = PrologixEmulator()
     cnx = serial.Serial(emul.serialurl, timeout=0)
     for i in range(10):
         data = '++cmd %s' % i
--- a/pygpibtoolkit/test/test_prologix_emulator.py	Sat May 19 00:07:15 2018 +0200
+++ b/pygpibtoolkit/test/test_prologix_emulator.py	Thu May 24 23:08:14 2018 +0200
@@ -1,91 +1,83 @@
 #
-import serial
 import time
-import pygpibtoolkit.prologix_emulator as PE
-
 import pytest
 
 
-@pytest.fixture(scope='session')
-def emulator():
-    emul = PE.BasePrologixEmulator()
-    cnx = serial.Serial(emul.serialurl, timeout=0)
-    yield (emul, cnx)
-    emul.close()
-    cnx.close()
-
-
 def test_base(emulator):
     emul, cnx = emulator
-    emul.reset()
-    assert emul.mode == 1
-    assert emul.addr == 5
+    assert emul.params['mode'] == 1
+    assert emul.params['addr'] == 5
 
 
-def test_cmd_mode_get(emulator):
+def check_cmd_get(emulator, cmd, value):
     emul, cnx = emulator
-    emul.reset()
-    cnx.write(b'++mode\r\n')
+    emul.params[cmd] = value
+    cnx.write(b'++%s\r\n' % cmd.encode())
     time.sleep(0.1)
     result = cnx.readlines()
     assert len(result) == 1
     result = result[0].strip()
-    assert result == b'1'
+    if isinstance(value, list):
+        value = ' '.join(str(x) for x in value)
+    assert result == str(value).encode()
 
 
-def test_cmd_mode_set(emulator):
+def check_cmd_set(emulator, cmd, value):
     emul, cnx = emulator
-    emul.reset()
-    cnx.write(b'++mode 0\r\n')
+    expected = value
+    if isinstance(value, list):
+        value = ' '.join(str(x) for x in value)
+    cnx.write(b'++%s %s\r\n' % (cmd.encode(), str(value).encode()))
     time.sleep(0.1)
     result = cnx.readlines()
     assert len(result) == 0
-    assert emul.mode == 0
+    assert emul.params[cmd] == expected
 
 
-def test_cmd_mode_err(emulator):
+def check_cmd_err(emulator, cmd, value):
     emul, cnx = emulator
-    emul.reset()
-    for value in (b'2', b'0 0', b'a'):
-        cnx.write(b'++mode %s\r\n' % value)
-        time.sleep(0.1)
-        result = cnx.readlines()
-        assert len(result) == 1
-        result = result[0].strip()
-        assert result == b'Error'
-        assert emul.mode == 1
-
-
-def test_cmd_addr_get(emulator):
-    emul, cnx = emulator
-    emul.reset()
-    cnx.write(b'++addr\r\n')
+    defaultvalue = emul.params[cmd]
+    cnx.write(b'++%s %s\r\n' % (cmd.encode(), str(value).encode()))
     time.sleep(0.1)
     result = cnx.readlines()
     assert len(result) == 1
     result = result[0].strip()
-    assert result == b'5'
+    assert result == b'Error'
+    assert emul.params[cmd] == defaultvalue
+
+
+test_cmd_get = pytest.mark.parametrize(
+    "cmd,value", [
+        ('mode', 0), ('mode', 1),
+        ('addr', 0), ('addr', 1), ('addr', 30),
+        ('addr', [0, 96]), ('addr', [0, 126]), ('addr', [30, 96]),
+        ('auto', 0), ('auto', 1),
+        ('eoi', 0), ('eoi', 1),
+        ('eos', 0), ('eos', 1), ('eos', 3),
+        ('eot_enable', 0), ('eot_enable', 1),
+        ('eot_char', 0), ('eot_enable', 42), ('eot_char', 255),
+        ('read_tmo_ms', 1), ('read_tmo_ms', 3000),
+        ('savecfg', 0), ('savecfg', 1),
+    ])(check_cmd_get)
 
 
-def test_cmd_addr_set(emulator):
-    emul, cnx = emulator
-    emul.reset()
-    for value in (0, 1, 10, 31):
-        cnx.write(b'++addr %s\r\n' % str(value).encode())
-        time.sleep(0.1)
-        result = cnx.readlines()
-        assert len(result) == 0
-        assert emul.addr == value
+test_cmd_set = pytest.mark.parametrize(
+    "cmd,value", [
+        ('mode', 0), ('mode', 1),
+        ('addr', 0), ('addr', 1), ('addr', 30),
+        ('addr', [0, 96]), ('addr', [0, 126]), ('addr', [30, 96]),
+        ('auto', 0), ('auto', 1),
+        ('eoi', 0), ('eoi', 1),
+        ('eos', 0), ('eos', 1), ('eos', 3),
+        ('eot_enable', 0), ('eot_enable', 1),
+        ('eot_char', 0), ('eot_enable', 42), ('eot_char', 255),
+        ('read_tmo_ms', 1), ('read_tmo_ms', 3000),
+        ('savecfg', 0), ('savecfg', 1),
+    ])(check_cmd_set)
 
 
-def test_cmd_addr_err(emulator):
-    emul, cnx = emulator
-    emul.reset()
-    for value in (b'-2', b'0 0', b'a'):
-        cnx.write(b'++addr %s\r\n' % value)
-        time.sleep(0.1)
-        result = cnx.readlines()
-        assert len(result) == 1
-        result = result[0].strip()
-        assert result == b'Error'
-        assert emul.addr == 5
+test_cmd_err = pytest.mark.parametrize(
+    "cmd,value", [
+        ('mode', b'2'), ('mode', b'0 0'), ('mode', b'a'),
+        ('addr', b'-2'), ('addr', b'31'), ('addr', b'0 0'), ('addr', b'a'),
+    ])(check_cmd_err)

mercurial