pygpibtoolkit/prologix_emulator.py

Sat, 19 May 2018 00:07:15 +0200

author
David Douard <david.douard@logilab.fr>
date
Sat, 19 May 2018 00:07:15 +0200
changeset 101
975576e55563
child 102
91713944ebb0
permissions
-rw-r--r--

[prologix] beginning of a proper prologix dongle emulator

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
"""Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
https://bitbucket.org/dddouard/pygpibtoolkit -- mailto:david.douard@sdfa3.org

prologix emulator
=================

Base class for writing an emulator of a Prologix dongle.

This aims at writing tests and virtual devices to check tools.

"""

import os
import serial
import time
import subprocess
import tempfile
import threading
from queue import Queue
import logging
from logging import debug, info, warning, error  # noqa


logging.basicConfig()
logger = logging.getLogger(__name__)

SOCAT = '/usr/bin/socat'


class BasePrologixEmulator:

    def __init__(self):
        """
        """
        self.tmpdir = tempfile.mkdtemp()
        busside = os.path.join(self.tmpdir, 'bus')
        serialside = os.path.join(self.tmpdir, 'serial')
        self.socatp = subprocess.Popen([
            SOCAT,
            'PTY,raw,echo=0,link={}'.format(busside),
            'PTY,raw,echo=0,link={}'.format(serialside)])
        self.serialurl = serialside

        time.sleep(0.1)  # wait a bit for socat to be started
        self.cnx = serial.Serial(busside, timeout=0)

        self.running = True
        self.output = Queue()
        self.mainloop = threading.Thread(target=self.run)
        self.reset()

        self.mainloop.start()

    def reset(self):
        self.output.queue.clear()  # should probably protect this...
        self.mode = 1
        self.addr = 5

    def close(self):
        self.running = False
        self.socatp.kill()
        self.socatp.wait()
        self.mainloop.join()

    def run(self):
        while self.running:
            data = self.cnx.readline().strip()
            if data:
                if data[:2] == b'++':
                    self.parse_cmd(data)
                else:
                    self.parse_data(data)
            elif not self.output.empty():
                data = self.output.get()
                logger.info('Sending %r' % data)
                self.cnx.write(data)
            else:
                time.sleep(0.01)

    def send(self, data):
        for row in data.splitlines():
            self.output.put(row)

    def parse_data(self, row):
        logger.info('Received %r' % row)
        self.output.put(b'OK\r\n')

    def parse_cmd(self, row):
        logger.info('Received CMD %r' % row)
        cmd = row.decode().split()
        args = cmd[1:]
        cmd = cmd[0][2:]
        getattr(self, 'parse_cmd_{}'.format(cmd))(args)

    def parse_cmd_mode(self, args):
        if not args:
            self.output.put(('%s\r\n' % self.mode).encode())
        else:
            try:
                if len(args) == 1:
                    mode = int(args[0])
                    assert mode in (0, 1)
                    logger.info('Set mode=%s' % mode)
                    self.mode = mode
                else:
                    raise Exception()
            except:
                self.output.put(b'Error\r\n')

    def parse_cmd_addr(self, args):
        if not args:
            self.output.put(('%s\r\n' % self.addr).encode())
        else:
            try:
                if len(args) == 1:
                    addr = int(args[0])
                    assert 0 <= addr <= 31
                    logger.info('Set addr=%s' % addr)
                    self.addr = addr
                else:
                    raise Exception()
            except:
                self.output.put(b'Error\r\n')


def main():
    emul = BasePrologixEmulator()
    cnx = serial.Serial(emul.serialurl, timeout=0)
    for i in range(10):
        data = '++cmd %s' % i
        print('SEND', data)
        cnx.write(data.encode() + b'\r\n')
        time.sleep(0.1)
        print('GOT %r' % cnx.readall())

    emul.close()


if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    main()

mercurial