Source code for dlab.hardware.wrappers.pfeiffer_vacuum

#!/usr/bin/env python3
# -*- encoding: UTF-8 -*-

# Author: Philipp Klaus, philipp.l.klaus AT web.de

# This file is part of PfeifferVacuum.py.
#
# PfeifferVacuum.py is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PfeifferVacuum.py is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PfeifferVacuum.py. If not, see <http://www.gnu.org/licenses/>.

import serial
import time
import signal
import math
from threading import Thread, Event


[docs] class MaxiGauge: def __init__(self, serialPort, baud=9600, debug=False): self.debug = debug try: self.connection = serial.Serial(serialPort, baudrate=baud, timeout=0.2) except serial.serialutil.SerialException as se: raise MaxiGaugeError(se) self.logfilename = 'measurement-data.txt'
[docs] def checkDevice(self): message = "The Display Contrast is currently set to %d (out of 20).\n" % self.displayContrast() message += "Keys since MaxiGauge was switched on: %s (out of 1,2,3,4,5).\n" % ", ".join(map(str, self.pressedKeys())) return message
[docs] def pressedKeys(self): keys = int(self.send('TKB', 1)[0]) pressedKeys = [] for i in [4, 3, 2, 1, 0]: # It's got 5 keys if keys // 2 ** i == 1: pressedKeys.append(i + 1) keys = keys % 2 ** i pressedKeys.reverse() return pressedKeys
[docs] def displayContrast(self, newContrast=-1): if newContrast == -1: return int(self.send('DCC', 1)[0]) else: return int(self.send('DCC,%d' % (newContrast,), 1)[0])
[docs] def pressures(self): return [self.pressure(i + 1) for i in range(6)]
[docs] def pressure(self, sensor): if sensor < 1 or sensor > 6: raise MaxiGaugeError(f"Sensor can only be between 1 and 6. You chose {sensor}") reading = self.send(f'PR{sensor}', 1) # Reading will have the form x,x.xxxEsx <CR><LF> (see p.88) try: r = reading[0].split(',') status = int(r[0]) pressure = float(r[-1]) except: raise MaxiGaugeError(f"Problem interpreting the returned line:\n{reading}") return PressureReading(sensor, status, pressure)
[docs] def signal_handler(self, sig, frame): self.stopping_continuous_update.set() signal.signal(signal.SIGINT, signal.SIG_DFL)
[docs] def start_continuous_pressure_updates(self, update_time, log_every=0): self.stopping_continuous_update = Event() signal.signal(signal.SIGINT, self.signal_handler) self.update_time = update_time self.log_every = log_every self.update_counter = 1 self.t = Thread(target=self.continuous_pressure_updates) self.t.daemon = True self.t.start()
[docs] def continuous_pressure_updates(self): cache = [] while not self.stopping_continuous_update.is_set(): startTime = time.time() self.update_counter += 1 self.cached_pressures = self.pressures() cache.append([time.time()] + [sensor.pressure if sensor.status in [0, 1, 2] else float('nan') for sensor in self.cached_pressures]) if self.log_every > 0 and (self.update_counter % self.log_every == 0): logtime = cache[self.log_every // 2][0] cache = list(zip(*cache)) # Transpose cache cache = cache[1:] # Remove the first element avgs = [sum(vals) / self.log_every for vals in cache] self.log_to_file(logtime=logtime, logvalues=avgs) cache = [] time.sleep(0.1) # We want a minimum pause of 0.1 s while not self.stopping_continuous_update.is_set() and (self.update_time - (time.time() - startTime) > .2): time.sleep(.2) time.sleep(max([0., self.update_time - (time.time() - startTime)])) if self.log_every > 0 and (self.update_counter % self.log_every == 0): self.flush_logfile()
[docs] def log_to_file(self, logtime=None, logvalues=None): if not hasattr(self, 'logfile'): self.logfile = open(self.logfilename, 'a') if not logtime: logtime = time.time() if not logvalues: logvalues = [sensor.pressure if sensor.status in [0, 1, 2] else float('nan') for sensor in self.cached_pressures] line = "%d, " % logtime + ', '.join(["%.3E" % val if not math.isnan(val) else '' for val in logvalues]) self.logfile.write(line + '\n')
[docs] def flush_logfile(self): try: self.logfile.flush() except: pass
[docs] def debugMessage(self, message): if self.debug: print(repr(message))
[docs] def send(self, mnemonic, numEnquiries=0): self.connection.flushInput() self.write(mnemonic + LINE_TERMINATION) self.getACQorNAK() response = [] for i in range(numEnquiries): self.enquire() response.append(self.read()) return response
[docs] def write(self, what): self.debugMessage(what) self.connection.write(what.encode()) # Encode string for serial communication
[docs] def enquire(self): self.write(C['ENQ'])
[docs] def read(self): data = "" while True: x = self.connection.read().decode() # Decode bytes to string self.debugMessage(x) data += x if len(data) > 1 and data[-2:] == LINE_TERMINATION: break return data[:-len(LINE_TERMINATION)]
[docs] def getACQorNAK(self): returncode = self.connection.readline().decode() self.debugMessage(returncode) if len(returncode) > 2 and returncode[-3] == C['NAK']: self.enquire() returnedError = self.read() error = str(returnedError).split(',', 1) errmsg = {'System Error': ERR_CODES[0][int(error[0])], 'Gauge Error': ERR_CODES[1][int(error[1])]} raise MaxiGaugeNAK(errmsg)
[docs] def disconnect(self): try: self.stopping_continuous_update.set() except: pass
def __del__(self): self.disconnect() if hasattr(self, 'connection') and self.connection: self.connection.close()
[docs] class PressureReading: def __init__(self, id, status, pressure): if int(id) not in range(1, 7): raise MaxiGaugeError('Pressure Gauge ID must be between 1-6') self.id = int(id) if int(status) not in PRESSURE_READING_STATUS.keys(): raise MaxiGaugeError(f'The Pressure Status must be in the range {list(PRESSURE_READING_STATUS.keys())}') self.status = int(status) self.pressure = float(pressure)
[docs] def statusMsg(self): return PRESSURE_READING_STATUS[self.status]
def __repr__(self): return f"Gauge #{self.id}: Status {self.status} ({self.statusMsg()}), Pressure: {self.pressure} mbar\n"
### ------ Exceptions ------
[docs] class MaxiGaugeError(Exception): pass
[docs] class MaxiGaugeNAK(MaxiGaugeError): pass
### ------- Control Symbols ----------- C = { 'ETX': "\x03", # End of Text (Ctrl-C) Reset the interface 'CR': "\x0D", # Carriage Return Go to the beginning of line 'LF': "\x0A", # Line Feed Advance by one line 'ENQ': "\x05", # Enquiry Request for data transmission 'ACQ': "\x06", # Acknowledge Positive report signal 'NAK': "\x15", # Negative Acknowledge Negative report signal 'ESC': "\x1b", # Escape } LINE_TERMINATION = C['CR'] + C['LF'] ### ------- Error codes ----------- ERR_CODES = [ { 0: 'No error', 1: 'Watchdog has responded', 2: 'Task fail error', 4: 'IDCX idle error', 8: 'Stack overflow error', 16: 'EPROM error', 32: 'RAM error', 64: 'EEPROM error', 128: 'Key error', 4096: 'Syntax error', 8192: 'Inadmissible parameter', 16384: 'No hardware', 32768: 'Fatal error' }, { 0: 'No error', 1: 'Sensor 1: Measurement error', 2: 'Sensor 2: Measurement error', 4: 'Sensor 3: Measurement error', 8: 'Sensor 4: Measurement error', 16: 'Sensor 5: Measurement error', 32: 'Sensor 6: Measurement error', 512: 'Sensor 1: Identification error', 1024: 'Sensor 2: Identification error', 2048: 'Sensor 3: Identification error', 4096: 'Sensor 4: Identification error', 8192: 'Sensor 5: Identification error', 16384: 'Sensor 6: Identification error', } ] ### ------ Pressure reading status ------ PRESSURE_READING_STATUS = { 0: 'Measurement data okay', 1: 'Underrange', 2: 'Overrange', 3: 'Sensor error', 4: 'Sensor off', 5: 'No sensor', 6: 'Identification error' }