Thu, 24 May 2018 23:08:14 +0200
[prologix] implement more prologix commands and add API to register emulated devices
also refactor tests as generative tests using pytest's parametrized API.
pygpibtoolkit/prologix_emulator.py | file | annotate | diff | comparison | revisions | |
pygpibtoolkit/test/test_prologix_emulator.py | file | annotate | diff | comparison | revisions |
--- a/pygpibtoolkit/prologix_emulator.py Sat May 19 00:07:15 2018 +0200 +++ b/pygpibtoolkit/prologix_emulator.py Thu May 24 23:08:14 2018 +0200 @@ -23,6 +23,7 @@ """ import os +import glob import serial import time import subprocess @@ -31,6 +32,7 @@ from queue import Queue import logging from logging import debug, info, warning, error # noqa +import random logging.basicConfig() @@ -39,7 +41,28 @@ SOCAT = '/usr/bin/socat' -class BasePrologixEmulator: +class PrologixEmulator: + params = ( + ('mode', 1), + ('addr', 5), + ('auto', 0), + ('eoi', 0), + ('eos', 0), + ('eot_enable', 0), + ('eot_char', 0), + ('read_tmo_ms', 1), + ('savecfg', 1), + ) + params_constraints = ( + ('mode', [0, 1]), + ('auto', [0, 1]), + ('eoi', [0, 1]), + ('eos', [0, 1]), + ('eot_enable', [0, 1]), + ('eot_char', [0, 1]), + ('read_tmo_ms', (1, 3000)), + ('savecfg', [0, 1]), + ) def __init__(self): """ @@ -55,6 +78,7 @@ time.sleep(0.1) # wait a bit for socat to be started self.cnx = serial.Serial(busside, timeout=0) + self.devices = {} self.running = True self.output = Queue() @@ -63,10 +87,12 @@ self.mainloop.start() + def attach_device(self, addr, device): + self.devices[addr] = device + def reset(self): self.output.queue.clear() # should probably protect this... - self.mode = 1 - self.addr = 5 + self.params = dict(self.__class__.params) def close(self): self.running = False @@ -95,25 +121,40 @@ def parse_data(self, row): logger.info('Received %r' % row) - self.output.put(b'OK\r\n') + addr = self.params['addr'] + if addr in self.devices: + device = self.devices[addr] + device.parse(row) def parse_cmd(self, row): logger.info('Received CMD %r' % row) cmd = row.decode().split() args = cmd[1:] cmd = cmd[0][2:] - getattr(self, 'parse_cmd_{}'.format(cmd))(args) + if hasattr(self, 'parse_cmd_{}'.format(cmd)): + return getattr(self, 'parse_cmd_{}'.format(cmd))(args) + elif cmd in self.params: + self.parse_generic_cmd(cmd, args) + else: + raise Exception() - def parse_cmd_mode(self, args): + def parse_generic_cmd(self, cmd, args): if not args: - self.output.put(('%s\r\n' % self.mode).encode()) + self.output.put(('%s\r\n' % self.params[cmd]).encode()) else: try: if len(args) == 1: - mode = int(args[0]) - assert mode in (0, 1) - logger.info('Set mode=%s' % mode) - self.mode = mode + value = int(args[0]) + if cmd in self.params_constraints: + cstr = self.params_constraints[cmd] + if isinstance(cstr, list): + assert value in cstr + elif isinstance(cstr, tuple): + assert cstr[0] <= value <= cstr[1] + else: + raise Exception() + logger.info('Set %s=%s' % (cmd, value)) + self.params[cmd] = value else: raise Exception() except: @@ -121,22 +162,49 @@ def parse_cmd_addr(self, args): if not args: - self.output.put(('%s\r\n' % self.addr).encode()) + addrs = self.params['addr'] + if isinstance(addrs, int): + addrs = [addrs, None] + addrs = ' '.join(str(x) for x in addrs if x is not None) + self.output.put(('%s\r\n' % addrs).encode()) else: try: - if len(args) == 1: - addr = int(args[0]) - assert 0 <= addr <= 31 - logger.info('Set addr=%s' % addr) - self.addr = addr + if len(args) in (1, 2): + values = [int(x) for x in args] + if len(values) == 1: + values.append(None) + pad, sad = values + assert 0 <= pad <= 30 + if sad: + assert 96 <= sad <= 126 + logger.info('Set addr=%s' % (values)) + if sad: + self.params['addr'] = values + else: + self.params['addr'] = pad else: raise Exception() except: self.output.put(b'Error\r\n') +class HPGLPlotingDevice: + def __init__(self, emulator, address=5): + self.plotter_address = address + self.filenames = glob.glob('examples/hpgl_plots/*.plt') + self.emulator = emulator + + def plot(self, fname=None): + if fname is None: + fname = random.choice(self.filenames) + with open(fname) as fobj: + ret = fobj.read().strip() + if ret: + self.emulator.send(ret) + + def main(): - emul = BasePrologixEmulator() + emul = PrologixEmulator() cnx = serial.Serial(emul.serialurl, timeout=0) for i in range(10): data = '++cmd %s' % i
--- a/pygpibtoolkit/test/test_prologix_emulator.py Sat May 19 00:07:15 2018 +0200 +++ b/pygpibtoolkit/test/test_prologix_emulator.py Thu May 24 23:08:14 2018 +0200 @@ -1,91 +1,83 @@ # -import serial import time -import pygpibtoolkit.prologix_emulator as PE - import pytest -@pytest.fixture(scope='session') -def emulator(): - emul = PE.BasePrologixEmulator() - cnx = serial.Serial(emul.serialurl, timeout=0) - yield (emul, cnx) - emul.close() - cnx.close() - - def test_base(emulator): emul, cnx = emulator - emul.reset() - assert emul.mode == 1 - assert emul.addr == 5 + assert emul.params['mode'] == 1 + assert emul.params['addr'] == 5 -def test_cmd_mode_get(emulator): +def check_cmd_get(emulator, cmd, value): emul, cnx = emulator - emul.reset() - cnx.write(b'++mode\r\n') + emul.params[cmd] = value + cnx.write(b'++%s\r\n' % cmd.encode()) time.sleep(0.1) result = cnx.readlines() assert len(result) == 1 result = result[0].strip() - assert result == b'1' + if isinstance(value, list): + value = ' '.join(str(x) for x in value) + assert result == str(value).encode() -def test_cmd_mode_set(emulator): +def check_cmd_set(emulator, cmd, value): emul, cnx = emulator - emul.reset() - cnx.write(b'++mode 0\r\n') + expected = value + if isinstance(value, list): + value = ' '.join(str(x) for x in value) + cnx.write(b'++%s %s\r\n' % (cmd.encode(), str(value).encode())) time.sleep(0.1) result = cnx.readlines() assert len(result) == 0 - assert emul.mode == 0 + assert emul.params[cmd] == expected -def test_cmd_mode_err(emulator): +def check_cmd_err(emulator, cmd, value): emul, cnx = emulator - emul.reset() - for value in (b'2', b'0 0', b'a'): - cnx.write(b'++mode %s\r\n' % value) - time.sleep(0.1) - result = cnx.readlines() - assert len(result) == 1 - result = result[0].strip() - assert result == b'Error' - assert emul.mode == 1 - - -def test_cmd_addr_get(emulator): - emul, cnx = emulator - emul.reset() - cnx.write(b'++addr\r\n') + defaultvalue = emul.params[cmd] + cnx.write(b'++%s %s\r\n' % (cmd.encode(), str(value).encode())) time.sleep(0.1) result = cnx.readlines() assert len(result) == 1 result = result[0].strip() - assert result == b'5' + assert result == b'Error' + assert emul.params[cmd] == defaultvalue + + +test_cmd_get = pytest.mark.parametrize( + "cmd,value", [ + ('mode', 0), ('mode', 1), + ('addr', 0), ('addr', 1), ('addr', 30), + ('addr', [0, 96]), ('addr', [0, 126]), ('addr', [30, 96]), + ('auto', 0), ('auto', 1), + ('eoi', 0), ('eoi', 1), + ('eos', 0), ('eos', 1), ('eos', 3), + ('eot_enable', 0), ('eot_enable', 1), + ('eot_char', 0), ('eot_enable', 42), ('eot_char', 255), + ('read_tmo_ms', 1), ('read_tmo_ms', 3000), + ('savecfg', 0), ('savecfg', 1), + ])(check_cmd_get) -def test_cmd_addr_set(emulator): - emul, cnx = emulator - emul.reset() - for value in (0, 1, 10, 31): - cnx.write(b'++addr %s\r\n' % str(value).encode()) - time.sleep(0.1) - result = cnx.readlines() - assert len(result) == 0 - assert emul.addr == value +test_cmd_set = pytest.mark.parametrize( + "cmd,value", [ + ('mode', 0), ('mode', 1), + ('addr', 0), ('addr', 1), ('addr', 30), + ('addr', [0, 96]), ('addr', [0, 126]), ('addr', [30, 96]), + ('auto', 0), ('auto', 1), + ('eoi', 0), ('eoi', 1), + ('eos', 0), ('eos', 1), ('eos', 3), + ('eot_enable', 0), ('eot_enable', 1), + ('eot_char', 0), ('eot_enable', 42), ('eot_char', 255), + ('read_tmo_ms', 1), ('read_tmo_ms', 3000), + ('savecfg', 0), ('savecfg', 1), + ])(check_cmd_set) -def test_cmd_addr_err(emulator): - emul, cnx = emulator - emul.reset() - for value in (b'-2', b'0 0', b'a'): - cnx.write(b'++addr %s\r\n' % value) - time.sleep(0.1) - result = cnx.readlines() - assert len(result) == 1 - result = result[0].strip() - assert result == b'Error' - assert emul.addr == 5 +test_cmd_err = pytest.mark.parametrize( + "cmd,value", [ + ('mode', b'2'), ('mode', b'0 0'), ('mode', b'a'), + ('addr', b'-2'), ('addr', b'31'), ('addr', b'0 0'), ('addr', b'a'), + ])(check_cmd_err)