pygpibtoolkit/prologix_emulator.py

Mon, 04 Jun 2018 22:28:05 +0200

author
David Douard <david.douard@logilab.fr>
date
Mon, 04 Jun 2018 22:28:05 +0200
changeset 109
c2d93abebab3
parent 105
89123c2af2fd
permissions
-rw-r--r--

[plotter] add a --mockup option to the demo main function

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
"""Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org

prologix emulator
=================

Base class for writing an emulator of a Prologix dongle.

This aims at writing tests and virtual devices to check tools.

"""

import os
import glob
import serial
import time
import subprocess
import tempfile
import threading
from queue import Queue
import logging
from logging import debug, info, warning, error  # noqa
import random


logging.basicConfig()
logger = logging.getLogger(__name__)

SOCAT = '/usr/bin/socat'


class PrologixEmulator:
    params = (
        ('mode', 1),
        ('addr', 5),
        ('auto', 0),
        ('eoi', 0),
        ('eos', 0),
        ('eot_enable', 0),
        ('eot_char', 0),
        ('read_tmo_ms', 1),
        ('savecfg', 1),
    )
    params_constraints = (
        ('mode', [0, 1]),
        ('auto', [0, 1]),
        ('eoi', [0, 1]),
        ('eos', [0, 1]),
        ('eot_enable', [0, 1]),
        ('eot_char', [0, 1]),
        ('read_tmo_ms', (1, 3000)),
        ('savecfg', [0, 1]),
    )

    def __init__(self):
        """
        """
        self.tmpdir = tempfile.mkdtemp()
        busside = os.path.join(self.tmpdir, 'bus')
        serialside = os.path.join(self.tmpdir, 'serial')
        self.socatp = subprocess.Popen([
            SOCAT,
            'PTY,raw,echo=0,link={}'.format(busside),
            'PTY,raw,echo=0,link={}'.format(serialside)])
        self.serialurl = serialside

        time.sleep(0.1)  # wait a bit for socat to be started
        self.cnx = serial.Serial(busside, timeout=0)
        self.devices = {}

        self.running = True
        self.output = Queue()
        self.mainloop = threading.Thread(target=self.run)
        self.reset()

        self.mainloop.start()

    def attach_device(self, addr, device):
        self.devices[addr] = device

    def reset(self):
        self.output.queue.clear()  # should probably protect this...
        self.params = dict(self.__class__.params)

    def close(self):
        self.running = False
        self.socatp.kill()
        self.socatp.wait()
        self.mainloop.join()

    def run(self):
        while self.running:
            data = self.cnx.readline().strip()
            if data:
                for row in data.splitlines():
                    if row[:2] == b'++':
                        self.parse_cmd(row)
                    else:
                        self.parse_data(row)
            elif not self.output.empty():
                data = self.output.get()
                logger.info('Sending %r' % data)
                self.cnx.write(data)
            else:
                time.sleep(0.01)

    def send(self, data):
        for row in data.splitlines():
            self.output.put(row)

    def parse_data(self, row):
        logger.info('Received %r' % row)
        addr = self.params['addr']
        if addr in self.devices:
            device = self.devices[addr]
            device.parse(row)

    def parse_cmd(self, row):
        logger.info('Received CMD %r' % row)
        cmd = row.decode().split()
        args = cmd[1:]
        cmd = cmd[0][2:]
        if hasattr(self, 'parse_cmd_{}'.format(cmd)):
            return getattr(self, 'parse_cmd_{}'.format(cmd))(args)
        elif cmd in self.params:
            self.parse_generic_cmd(cmd, args)
        else:
            raise Exception()

    def parse_generic_cmd(self, cmd, args):
        if not args:
            self.output.put(('%s\r\n' % self.params[cmd]).encode())
        else:
            try:
                if len(args) == 1:
                    value = int(args[0])
                    if cmd in self.params_constraints:
                        cstr = self.params_constraints[cmd]
                        if isinstance(cstr, list):
                            assert value in cstr
                        elif isinstance(cstr, tuple):
                            assert cstr[0] <= value <= cstr[1]
                        else:
                            raise Exception()
                    logger.info('Set %s=%s' % (cmd, value))
                    self.params[cmd] = value
                else:
                    raise Exception()
            except:
                self.output.put(b'Error\r\n')

    def parse_cmd_addr(self, args):
        if not args:
            addrs = self.params['addr']
            if isinstance(addrs, int):
                addrs = [addrs, None]
            addrs = ' '.join(str(x) for x in addrs if x is not None)
            self.output.put(('%s\r\n' % addrs).encode())
        else:
            try:
                if len(args) in (1, 2):
                    values = [int(x) for x in args]
                    if len(values) == 1:
                        values.append(None)
                    pad, sad = values
                    assert 0 <= pad <= 30
                    if sad:
                        assert 96 <= sad <= 126
                    logger.info('Set addr=%s' % (values))
                    if sad:
                        self.params['addr'] = values
                    else:
                        self.params['addr'] = pad
                else:
                    raise Exception()
            except:
                self.output.put(b'Error\r\n')

    def parse_cmd_ifc(self, args):
        pass


class HPGLPlotingDevice:
    def __init__(self, emulator, address=5):
        self.plotter_address = address
        self.filenames = glob.glob('examples/hpgl_plots/*.plt')
        self.emulator = emulator

    def plot(self, fname=None):
        if fname is None:
            fname = random.choice(self.filenames)
        with open(fname) as fobj:
            ret = fobj.read().strip()
            if ret:
                self.emulator.send(ret.encode())


def main():
    emul = PrologixEmulator()
    device = HPGLPlotingDevice(emul)
    emul.attach_device(10, device)
    from pygpibtoolkit.plotter.gpib_plotter import GPIBplotter
    plotter = GPIBplotter(
        device=emul.serialurl,
        address=5,
        timeout=0.06)
    device.plot()

    while True:
        plot = plotter.load_plot()
        if plot:
            print('PLOT')
            break

    emul.close()


if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    main()

mercurial