From bf563fefba20d7238b415f2acf97b02316937114 Mon Sep 17 00:00:00 2001 From: Sjoerd Dirk Meijer Date: Tue, 30 Apr 2013 16:52:47 +0200 Subject: [PATCH] My arduino.py --- Arduino/arduino.py | 781 ++++++++++++++++++++++++--------------------- 1 file changed, 426 insertions(+), 355 deletions(-) diff --git a/Arduino/arduino.py b/Arduino/arduino.py index 973444e..a76e321 100644 --- a/Arduino/arduino.py +++ b/Arduino/arduino.py @@ -1,355 +1,426 @@ -#!/usr/bin/env python -from serial.tools import list_ports -import serial, time - -class Arduino(object): - def __init__(self,baud,port="",timeout=2): - """ - Initializes serial communication with Arduino. - Attempts to self-select COM port, if not specified. - """ - self.baud = baud - self.timeout = timeout - self.ss_connected=False - if port == "": - self.findPort() - self.sr = serial.Serial(self.port, self.baud,timeout =self.timeout) - time.sleep(2) - self.SoftwareSerial = SoftwareSerial(self) - self.Servos = Servos(self) - - def findPort(self): - """ - Sets port to the first Arduino found - in system's device list - """ - for pt in list_ports.comports(): - if ("FTDIBUS" in pt[-1]) or ("usbserial" in pt[-1]): - self.port = pt[0] - return - - def digitalWrite(self,pin,val): - """ - Sends digitalWrite command - to digital pin on Arduino - ------------- - inputs: - pin : digital pin number - val : either "HIGH" or "LOW" - """ - if val=="LOW": - pin_ = -pin - else: - pin_ = pin - cmd_str=''.join(["@dw%",str(pin_),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - - def analogWrite(self,pin,val): - """ - Sends analogWrite pwm command - to pin on Arduino - ------------- - inputs: - pin : pin number - val : integer 0 (off) to 255 (always on) - """ - if val>255: - val=255 - elif val<0: - val=0 - cmd_str=''.join(["@aw%",str(pin),"%",str(val),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - - def analogRead(self,pin): - """ - Returns the value of a specified - analog pin. - inputs: - pin : analog pin number for measurement - returns: - value: integer from 1 to 1023 - """ - cmd_str=''.join(["@ar%",str(pin),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - return int(rd) - except: - return 0 - - - def pinMode(self,pin,val): - """ - Sets I/O mode of pin - inputs: - pin: pin number to toggle - val: "INPUT" or "OUTPUT" - """ - if val=="INPUT": - pin_ = -pin - else: - pin_ = pin - cmd_str=''.join(["@pm%",str(pin_),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - - def pulseIn(self,pin,val): - """ - Reads a pulse from a pin - - inputs: - pin: pin number for pulse measurement - returns: - duration : pulse length measurement - - """ - if val=="LOW": - pin_ = -pin - else: - pin_ = pin - cmd_str=''.join(["@pi%",str(pin_),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - return float(rd) - except: - return -1 - - def pulseIn_set(self,pin,val): - """ - Sets a digital pin value, then reads the response - as a pulse width. - Useful for some ultrasonic rangefinders, etc. - - inputs: - pin: pin number for pulse measurement - val: "HIGH" or "LOW". Pulse is measured - when this state is detected - returns: - duration : pulse length measurement - - This method will automatically toggle - I/O modes on the pin and precondition the - measurment with a clean LOW/HIGH pulse. - Arduino.pulseIn_set(pin,"HIGH") is - equivalent to the Arduino sketch code: - - pinMode(pin, OUTPUT); - digitalWrite(pin, LOW); - delayMicroseconds(2); - digitalWrite(pin, HIGH); - delayMicroseconds(5); - digitalWrite(pin, LOW); - pinMode(pin, INPUT); - long duration = pulseIn(pin, HIGH); - """ - if val=="LOW": - pin_ = -pin - else: - pin_ = pin - cmd_str=''.join(["@ps%",str(pin_),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - return float(rd) - except: - return -1 - - def close(self): - self.sr.close() - - def digitalRead(self,pin): - """ - Returns the value of a specified - digital pin. - inputs: - pin : digital pin number for measurement - returns: - value: 0 for "LOW", 1 for "HIGH" - """ - cmd_str=''.join(["@dr%",str(pin),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - return 1 - int(rd) - except: - return 0 - -class Wires(object): - """ - Class for Arduino wire (i2c) support - """ - def __init__(self, board): - self.board = board - self.sr = board.sr - -class Servos(object): - """ - Class for Arduino servo support - 0.03 second delay noted - """ - def __init__(self, board): - self.board = board - self.sr = board.sr - self.servo_pos = {} - - def attach(self,pin,min = 544, max = 2400): - cmd_str=''.join(["@sva%",str(pin),"%",str(min),"%",str(max),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - position = int(rd) - self.servo_pos[pin] = position - return 1 - except: - return 0 - - def detach(self,pin): - cmd_str=''.join(["@svd%",str(position),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - del self.servo_pos[pin] - - def write(self,pin,angle): - position = self.servo_pos[pin] - cmd_str=''.join(["@svw%",str(position),"%",str(angle),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - - def writeMicroseconds(self,pin,uS): - cmd_str=''.join(["@svw%",str(position),"%",str(uS),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - - def read(self,pin): - if pin not in self.servo_pos.keys(): - self.attach(pin) - position = self.servo_pos[pin] - cmd_str=''.join(["@svr%",str(position),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - rd = self.sr.readline().replace("\r\n","") - try: - angle = int(rd) - return angle - except: - return None - -class SoftwareSerial(object): - """ - Class for Arduino software serial functionality - """ - def __init__(self,board): - self.board=board - self.sr = board.sr - self.connected = False - - def begin(self,p1,p2,baud): - """ - Create software serial instance on - specified tx,rx pins, at specified baud - """ - cmd_str=''.join(["@ss%",str(p1),"%",str(p2),"%",str(baud),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - response= self.sr.readline().replace("\r\n","") - if response == "ss OK": - self.connected = True - return True - else: - self.connected = False - return False - - def write(self,data): - """ - sends data to existing software serial instance - using Arduino's 'write' function - """ - if self.connected: - cmd_str=''.join(["@sw%",str(data),"$!"]) - try: - self.sr.write(cmd_str) - self.sr.flush() - except: - pass - response= self.sr.readline().replace("\r\n","") - if response == "ss OK": - return True - else: - return False - - def read(self): - """ - returns first character read from - existing software serial instance - """ - if self.connected: - cmd_str=''.join(["@sr%$!"]) - self.sr.write(cmd_str) - self.sr.flush() - response= self.sr.readline().replace("\r\n","") - if response: - return response - else: - return False - -if __name__=="__main__": - # quick test - board=Arduino(9600) - board.Servos.attach(9) - board.Servos.write(9,90) - time.sleep(1) - print board.Servos.read(9) - t=time.time() - board.Servos.write(9,1) - while True: - a=board.Servos.read(9) - if a == 1: - print "time",time.time() - t - break \ No newline at end of file +#!/usr/bin/env python +from serial.tools import list_ports +import serial, time +##SDM +import platform +if platform.system() == 'Windows': + import _winreg as winreg +if platform.system() == 'Linux': + import glob +import itertools + +def enumerate_serial_ports(): + """ Uses the Win32 registry to return a iterator of serial + (COM) ports existing on this computer. + """ + path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM' + try: + key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) + except WindowsError: + raise IterationError + + for i in itertools.count(): + try: + val = winreg.EnumValue(key, i) + yield (str(val[1]))#, str(val[0])) + except EnvironmentError: + break +##SDM + +class Arduino(object): + def __init__(self,baud=9600,port="",timeout=2): + """ + Initializes serial communication with Arduino. + Attempts to self-select COM port, if not specified. + """ + self.baud = baud + self.timeout = timeout + self.ss_connected=False +##SDM + self.port = port + if self.port == "": + if platform.system() == 'Windows': + ports = enumerate_serial_ports() + if platform.system() == 'Linux': + ports = glob.glob("/dev/ttyUSB*") + for p in ports: + print 'Found ', p + try: + print 'Testing ', p + self.sr = serial.Serial(p, self.baud,timeout=self.timeout) + time.sleep(2) + cmd_str=''.join(["@version%$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + version = self.sr.readline().replace("\r\n","") + #print version + if version != 'version': + raise Exception('This is not a Shrimp/Arduino!') + self.port = p + print p, 'passed' + break + except Exception, e: + print "Exception: ", e + pass +##SDM + #--self.sr = serial.Serial(self.port, self.baud,timeout =self.timeout) + #--time.sleep(2) + self.SoftwareSerial = SoftwareSerial(self) + self.Servos = Servos(self) + + def findPort(self): + """ + Sets port to the first Arduino found + in system's device list + """ + for pt in list_ports.comports(): + if ("FTDIBUS" in pt[-1]) or ("usbserial" in pt[-1]): + self.port = pt[0] + return + + def digitalWrite(self,pin,val): + """ + Sends digitalWrite command + to digital pin on Arduino + ------------- + inputs: + pin : digital pin number + val : either "HIGH" or "LOW" + """ + if val=="LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str=''.join(["@dw%",str(pin_),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + + def analogWrite(self,pin,val): + """ + Sends analogWrite pwm command + to pin on Arduino + ------------- + inputs: + pin : pin number + val : integer 0 (off) to 255 (always on) + """ + if val>255: + val=255 + elif val<0: + val=0 + cmd_str=''.join(["@aw%",str(pin),"%",str(val),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + + def analogRead(self,pin): + """ + Returns the value of a specified + analog pin. + inputs: + pin : analog pin number for measurement + returns: + value: integer from 1 to 1023 + """ + cmd_str=''.join(["@ar%",str(pin),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + try: + return int(rd) + except: + return 0 + + + def pinMode(self,pin,val): + """ + Sets I/O mode of pin + inputs: + pin: pin number to toggle + val: "INPUT" or "OUTPUT" + """ + if val=="INPUT": + pin_ = -pin + else: + pin_ = pin + cmd_str=''.join(["@pm%",str(pin_),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + + def pulseIn(self,pin,val): + """ + Reads a pulse from a pin + + inputs: + pin: pin number for pulse measurement + returns: + duration : pulse length measurement + + """ + if val=="LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str=''.join(["@pi%",str(pin_),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + try: + return float(rd) + except: + return -1 + + def pulseIn_set(self,pin,val,numTrials=5): + """ + Sets a digital pin value, then reads the response + as a pulse width. + Useful for some ultrasonic rangefinders, etc. + + inputs: + pin: pin number for pulse measurement + val: "HIGH" or "LOW". Pulse is measured + when this state is detected + numTrials: number of trials (for an average) + returns: + duration : an average of pulse length measurements + + This method will automatically toggle + I/O modes on the pin and precondition the + measurment with a clean LOW/HIGH pulse. + Arduino.pulseIn_set(pin,"HIGH") is + equivalent to the Arduino sketch code: + + pinMode(pin, OUTPUT); + digitalWrite(pin, LOW); + delayMicroseconds(2); + digitalWrite(pin, HIGH); + delayMicroseconds(5); + digitalWrite(pin, LOW); + pinMode(pin, INPUT); + long duration = pulseIn(pin, HIGH); + """ + if val=="LOW": + pin_ = -pin + else: + pin_ = pin + cmd_str=''.join(["@ps%",str(pin_),"$!"]) + durations = [] + for s in range(numTrials): + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + if rd.isdigit() == True: + if (int(rd) > 1) == True: + durations.append(int(rd)) + #print durations + if len(durations) > 0: + duration = int(sum(durations)) / int(len(durations)) + else: + duration = None + + try: + return float(duration) + except: + return -1 + + def close(self): + self.sr.close() + + def digitalRead(self,pin): + """ + Returns the value of a specified + digital pin. + inputs: + pin : digital pin number for measurement + returns: + value: 0 for "LOW", 1 for "HIGH" + """ + cmd_str=''.join(["@dr%",str(pin),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + try: + return 1 - int(rd) + except: + return 0 + +##SDM +class Shrimp(Arduino): + def __init__(self): + Arduino.__init__(self) +##SDM + +class Wires(object): + """ + Class for Arduino wire (i2c) support + """ + def __init__(self, board): + self.board = board + self.sr = board.sr + +class Servos(object): + """ + Class for Arduino servo support + 0.03 second delay noted + """ + def __init__(self, board): + self.board = board + self.sr = board.sr + self.servo_pos = {} + + def attach(self,pin,min = 544, max = 2400): + cmd_str=''.join(["@sva%",str(pin),"%",str(min),"%",str(max),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + try: + position = int(rd) + self.servo_pos[pin] = position + return 1 + except: + return 0 + + def detach(self,pin): + cmd_str=''.join(["@svd%",str(position),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + del self.servo_pos[pin] + + def write(self,pin,angle): + position = self.servo_pos[pin] + cmd_str=''.join(["@svw%",str(position),"%",str(angle),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + + def writeMicroseconds(self,pin,uS): + cmd_str=''.join(["@svw%",str(position),"%",str(uS),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + + def read(self,pin): + if pin not in self.servo_pos.keys(): + self.attach(pin) + position = self.servo_pos[pin] + cmd_str=''.join(["@svr%",str(position),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + rd = self.sr.readline().replace("\r\n","") + try: + angle = int(rd) + return angle + except: + return None + +class SoftwareSerial(object): + """ + Class for Arduino software serial functionality + """ + def __init__(self,board): + self.board=board + self.sr = board.sr + self.connected = False + + def begin(self,p1,p2,baud): + """ + Create software serial instance on + specified tx,rx pins, at specified baud + """ + cmd_str=''.join(["@ss%",str(p1),"%",str(p2),"%",str(baud),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + response= self.sr.readline().replace("\r\n","") + if response == "ss OK": + self.connected = True + return True + else: + self.connected = False + return False + + def write(self,data): + """ + sends data to existing software serial instance + using Arduino's 'write' function + """ + if self.connected: + cmd_str=''.join(["@sw%",str(data),"$!"]) + try: + self.sr.write(cmd_str) + self.sr.flush() + except: + pass + response= self.sr.readline().replace("\r\n","") + if response == "ss OK": + return True + else: + return False + + def read(self): + """ + returns first character read from + existing software serial instance + """ + if self.connected: + cmd_str=''.join(["@sr%$!"]) + self.sr.write(cmd_str) + self.sr.flush() + response= self.sr.readline().replace("\r\n","") + if response: + return response + else: + return False + +##if __name__=="__main__": + # quick test +## board=Arduino(9600, 'COM9') +## board.Servos.attach(9) +## board.Servos.write(9,90) +## time.sleep(1) +## print board.Servos.read(9) +## t=time.time() +## board.Servos.write(9,1) +## while True: +## a=board.Servos.read(9) +## if a == 1: +## print "time",time.time() - t +## break