pygpibtoolkit/prologix.py

Tue, 01 May 2018 00:10:23 +0200

author
David Douard <david.douard@logilab.fr>
date
Tue, 01 May 2018 00:10:23 +0200
changeset 86
96e30b092f70
parent 79
b8eec4f9db52
child 87
59a0946aa3d1
permissions
-rw-r--r--

[py3k] beginning to port to py3k

also write a proper setuptools based setup.py and convert some bin/* scripts
as entry_points.

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
""" Copyright (c) 2007-2018 David Douard (Paris, FRANCE).
http://www.logilab.org/project/pygpibtoolkit -- mailto:david.douard@sdfa3.org

prologix
========

Module defining a communication object to talk to Prologix USB-GPIB controler.
"""

import sys
import serial
import time

from pygpibtoolkit.pygpib import ConnectionError

GPIB_CONTROLLER = 1
GPIB_DEVICE = 0


class GPIB:
    _retries = 15

    def __init__(self, device="/dev/ttyUSB0", baudrate=115200, timeout=0.1,
                 address=0, mode=1):
        """
        Create a new GPIB controller for the Prologix USB-GPIB device
        located on serial device 'device'.
        """
        self._cnx = serial.Serial(port=device, baudrate=baudrate,
                                  timeout=timeout)
        self._timeout = timeout

        try:
            res = self._cnx.readlines()
            if res:  # empty evetual read buffer
                print("there where pending stuffs in buffer %r" % res)
            self.set_mode(1)
            self.set_address(address)
            self._set_cmd('auto', 0)
            self.set_mode(mode)
        except Exception as e:
            print("Humm, something went wrong: %s" % e)

    def set_address(self, address, check=True):
        """
        Set the address of the GPIB device:

        - if the device is the Controller In Charge, this is the
          address of the device commands are sent to,

        - if the device is in GPIB_DEVICE mode, this is its address.
        """
        self._set_cmd('addr', address, check)
        self._address = address
        # self._set_cmd('auto', 0)

    def set_mode(self, mode):
        """
        Set GPIB device mode to 'mode':

        - GPIB_CONTROLLER: set the device as the Controller In Charge
          on the GPIB bus

        - GPIB_DEVICE: set the device as a standard GPIB device on the
          bus.
        """
        self._set_cmd('mode', mode)
        self._mode = mode
        if self._mode:
            self._cnx.write('++ifc\r')

    def set_controller(self):
        """
        Set GPIB device the Controller In Charge on the GPIB bus.
        """
        self.set_mode(1)

    def set_device(self):
        """
        Set the GPIB device as a simple device on the GPIB bus.
        """
        self.set_mode(0)

    def send_command(self, cmd, address=None):
        """
        Send the specified GPIB command on the bus (must be the CIC),
        and read the answer.
        Eventually, set the addressed device first.
        """
        assert self._mode == 1
        if address is not None:
            self.set_address(address)
        self._cnx.write(cmd+';\r')
        time.sleep(self._timeout)  # required?
        return self.read_eoi()

    def read_eoi(self, address=None):
        """
        Read the HPIB buffer from device, till EOI is performed, or timeout.
        """
        if address is not None:
            self.set_address(address, check=False)
        self._cnx.write('++read eoi\r')  # idem
        ret = ""
        i = 0
        while not ret.endswith('\r\n') and i<3:
            ret += ''.join(self._cnx.readlines())
            time.sleep(self._timeout)  # required?
            i += 1
        return ''.join(ret)

    def check_srq(self):
        """
        Check the SRQ line
        """
        assert self._mode == 1, "must be the Controller In Charge"
        ret = self._cnx.readline().strip()
        if ret:
            print("garbage: %s" % ret)
        self._cnx.write('++srq\r')
        ret = self._cnx.readline().strip()
        if ret in "01":
            return bool(int(ret))
        return None

    def trigger(self, address=None):
        """
        Trigger device at 'address'
        """
        if address is not None:
            self.set_address(address, check=False)
        self._cnx.write('++trg\r')  # idem

    def poll(self, addresses=None, verbose=False):
        """
        Poll every address, and return a dictionnary
         {add: status, ...}
        """
        assert self._mode == 1, "must be the Controller In Charge"
        only_one = False
        if addresses is None:
            addresses = range(31)
        if not isinstance(addresses, (list, tuple)):
            addresses = [addresses]
            only_one = True
        if not addresses:
            return None

        if verbose:
            sys.stderr.write('polling ')
        dico = {}
        for add in addresses:
            self._cnx.write('++spoll %d\r' % add)
            time.sleep(0.1)
            ret = self._cnx.readline().strip()
            if ret:
                if verbose:
                    sys.stderr.write('X')
                dico[add] = int(ret)
            else:
                if verbose:
                    sys.stderr.write('.')

            # need to wait at least 150ms (not enough on prologix)
            time.sleep(0.30)

        if verbose:
            sys.stderr.write('\n')
        self.set_address(self._address)
        if only_one and dico:
            return dico.values()[0]
        return dico

    def _read(self):
        for i in range(self._retries):
            rdata = self._cnx.readline()
            if rdata.strip() != "":
                break
            time.sleep(self._timeout)
        return rdata

    def _set_cmd(self, cmd, value, check=True):
        self._cnx.write('++%s %d\r' % (cmd, value))
        if check:
            self._cnx.write('++%s\r' % (cmd))
            rval = self._read().strip()
            if not rval.isdigit() or int(rval) != value:
                raise ConnectionError("Can't set GPIB %s to %s [ret=%s]" % (
                    cmd, value, repr(rval)))

    def reset(self):
        """
        Perform a reset of the USB device

        """
        print("Resetting GPIB controller")
        self._cnx.write('++rst\r')
        print("Must wait for 5 seconds")
        time.sleep(5)

mercurial