#!/usr/bin/env python import logging import itertools import platform import serial import time from serial.tools import list_ports if platform.system() == 'Windows': import _winreg as winreg else: import glob log = logging.getLogger(__name__) def enumerate_serial_ports(): """ Uses the Win32 registry to return a iterator of serial (COM) ports existing on this computer. """ path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM' try: key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) except WindowsError: raise Exception for i in itertools.count(): try: val = winreg.EnumValue(key, i) yield (str(val[1])) # , str(val[0])) except EnvironmentError: break def build_cmd_str(cmd, args=None): """ Build a command string that can be sent to the arduino. Input: cmd (str): the command to send to the arduino, must not contain a % character args (iterable): the arguments to send to the command @TODO: a strategy is needed to escape % characters in the args """ if args: args = '%'.join(map(str, args)) else: args = '' return "@{cmd}%{args}$!".format(cmd=cmd, args=args) def find_port(baud, timeout): """ Find the first port that is connected to an arduino with a compatible sketch installed. """ if platform.system() == 'Windows': ports = enumerate_serial_ports() elif platform.system() == 'Darwin': ports = [i[0] for i in list_ports.comports()] else: ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*") for p in ports: log.debug('Found {0}, testing...'.format(p)) try: sr = serial.Serial(p, baud, timeout=timeout) except serial.serialutil.SerialException as e: log.debug(str(e)) continue time.sleep(2) version = get_version(sr) if version != 'version': log.debug('Bad version {0}. This is not a Shrimp/Arduino!'.format( version)) sr.close() continue log.info('Using port {0}.'.format(p)) if sr: return sr return None def get_version(sr): cmd_str = build_cmd_str("version") try: sr.write(cmd_str) sr.flush() except Exception: return None return sr.readline().replace("\r\n", "") class Arduino(object): def __init__(self, baud=9600, port=None, timeout=2, sr=None): """ Initializes serial communication with Arduino if no connection is given. Attempts to self-select COM port, if not specified. """ if not sr: if not port: sr = find_port(baud, timeout) if not sr: raise ValueError("Could not find port.") else: sr = serial.Serial(port, baud, timeout=timeout) sr.flush() self.sr = sr self.SoftwareSerial = SoftwareSerial(self) self.Servos = Servos(self) def version(self): return get_version(self.sr) def digitalWrite(self, pin, val): """ Sends digitalWrite command to digital pin on Arduino ------------- inputs: pin : digital pin number val : either "HIGH" or "LOW" """ if val == "LOW": pin_ = -pin else: pin_ = pin cmd_str = build_cmd_str("dw", (pin_,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass def analogWrite(self, pin, val): """ Sends analogWrite pwm command to pin on Arduino ------------- inputs: pin : pin number val : integer 0 (off) to 255 (always on) """ if val > 255: val = 255 elif val < 0: val = 0 cmd_str = build_cmd_str("aw", (pin, val)) try: self.sr.write(cmd_str) self.sr.flush() except: pass def analogRead(self, pin): """ Returns the value of a specified analog pin. inputs: pin : analog pin number for measurement returns: value: integer from 1 to 1023 """ cmd_str = build_cmd_str("ar", (pin,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass rd = self.sr.readline().replace("\r\n", "") try: return int(rd) except: return 0 def pinMode(self, pin, val): """ Sets I/O mode of pin inputs: pin: pin number to toggle val: "INPUT" or "OUTPUT" """ if val == "INPUT": pin_ = -pin else: pin_ = pin cmd_str = build_cmd_str("pm", (pin_,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass def pulseIn(self, pin, val): """ Reads a pulse from a pin inputs: pin: pin number for pulse measurement returns: duration : pulse length measurement """ if val == "LOW": pin_ = -pin else: pin_ = pin cmd_str = build_cmd_str("pi", (pin_,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass rd = self.sr.readline().replace("\r\n", "") try: return float(rd) except: return -1 def pulseIn_set(self, pin, val, numTrials=5): """ Sets a digital pin value, then reads the response as a pulse width. Useful for some ultrasonic rangefinders, etc. inputs: pin: pin number for pulse measurement val: "HIGH" or "LOW". Pulse is measured when this state is detected numTrials: number of trials (for an average) returns: duration : an average of pulse length measurements This method will automatically toggle I/O modes on the pin and precondition the measurment with a clean LOW/HIGH pulse. Arduino.pulseIn_set(pin,"HIGH") is equivalent to the Arduino sketch code: pinMode(pin, OUTPUT); digitalWrite(pin, LOW); delayMicroseconds(2); digitalWrite(pin, HIGH); delayMicroseconds(5); digitalWrite(pin, LOW); pinMode(pin, INPUT); long duration = pulseIn(pin, HIGH); """ if val == "LOW": pin_ = -pin else: pin_ = pin cmd_str = build_cmd_str("ps", (pin_,)) durations = [] for s in range(numTrials): try: self.sr.write(cmd_str) self.sr.flush() except: pass rd = self.sr.readline().replace("\r\n", "") if rd.isdigit(): if (int(rd) > 1): durations.append(int(rd)) if len(durations) > 0: duration = int(sum(durations)) / int(len(durations)) else: duration = None try: return float(duration) except: return -1 def close(self): if self.sr.isOpen(): self.sr.flush() self.sr.close() def digitalRead(self, pin): """ Returns the value of a specified digital pin. inputs: pin : digital pin number for measurement returns: value: 0 for "LOW", 1 for "HIGH" """ cmd_str = build_cmd_str("dr", (pin,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass rd = self.sr.readline().replace("\r\n", "") try: return int(rd) except: return 0 def Melody(self, pin, melody, durations): """ Plays a melody. inputs: pin: digital pin number for playback melody: list of tones durations: list of duration (4=quarter note, 8=eighth note, etc.) length of melody should be of same length as length of duration Melodies of the following lenght, can cause trouble when playing it multiple times. board.Melody(9,["C4","G3","G3","A3","G3",0,"B3","C4"], [4,8,8,4,4,4,4,4]) Playing short melodies (1 or 2 tones) didn't cause trouble during testing """ NOTES = dict( B0=31, C1=33, CS1=35, D1=37, DS1=39, E1=41, F1=44, FS1=46, G1=49, GS1=52, A1=55, AS1=58, B1=62, C2=65, CS2=69, D2=73, DS2=78, E2=82, F2=87, FS2=93, G2=98, GS2=104, A2=110, AS2=117, B2=123, C3=131, CS3=139, D3=147, DS3=156, E3=165, F3=175, FS3=185, G3=196, GS3=208, A3=220, AS3=233, B3=247, C4=262, CS4=277, D4=294, DS4=311, E4=330, F4=349, FS4=370, G4=392, GS4=415, A4=440, AS4=466, B4=494, C5=523, CS5=554, D5=587, DS5=622, E5=659, F5=698, FS5=740, G5=784, GS5=831, A5=880, AS5=932, B5=988, C6=1047, CS6=1109, D6=1175, DS6=1245, E6=1319, F6=1397, FS6=1480, G6=1568, GS6=1661, A6=1760, AS6=1865, B6=1976, C7=2093, CS7=2217, D7=2349, DS7=2489, E7=2637, F7=2794, FS7=2960, G7=3136, GS7=3322, A7=3520, AS7=3729, B7=3951, C8=4186, CS8=4435, D8=4699, DS8=4978) if (isinstance(melody, list)) and (isinstance(durations, list)): length = len(melody) cmd_args = [length, pin] if length == len(durations): cmd_args.extend([NOTES.get(melody[note]) for note in range(length)]) cmd_args.extend([durations[duration] for duration in range(len(durations))]) cmd_str = build_cmd_str("to", cmd_args) try: self.sr.write(cmd_str) self.sr.flush() except: pass cmd_str = build_cmd_str("nto", [pin]) try: self.sr.write(cmd_str) self.sr.flush() except: pass else: return -1 else: return -1 def capacitivePin(self, pin): ''' Input: pin (int): pin to use as capacitive sensor Use it in a loop! DO NOT CONNECT ANY ACTIVE DRIVER TO THE USED PIN ! the pin is toggled to output mode to discharge the port, and if connected to a voltage source, will short circuit the pin, potentially damaging the Arduino/Shrimp and any hardware attached to the pin. ''' cmd_str = build_cmd_str("cap", (pin,)) self.sr.write(cmd_str) rd = self.sr.readline().replace("\r\n", "") if rd.isdigit(): return int(rd) def shiftOut(self, dataPin, clockPin, pinOrder, value): """ Shift a byte out on the datapin using Arduino's shiftOut() Input: dataPin (int): pin for data clockPin (int): pin for clock pinOrder (String): either 'MSBFIRST' or 'LSBFIRST' value (int): an integer from 0 and 255 """ cmd_str = build_cmd_str("so", (dataPin, clockPin, pinOrder, value)) self.sr.write(cmd_str) self.sr.flush() def shiftIn(self, dataPin, clockPin, pinOrder): """ Shift a byte in from the datapin using Arduino's shiftIn(). Input: dataPin (int): pin for data clockPin (int): pin for clock pinOrder (String): either 'MSBFIRST' or 'LSBFIRST' Output: (int) an integer from 0 to 255 """ cmd_str = build_cmd_str("si", (dataPin, clockPin, pinOrder)) self.sr.write(cmd_str) self.sr.flush() rd = self.sr.readline().replace("\r\n", "") if rd.isdigit(): return int(rd) class Shrimp(Arduino): def __init__(self): Arduino.__init__(self) class Wires(object): """ Class for Arduino wire (i2c) support """ def __init__(self, board): self.board = board self.sr = board.sr class Servos(object): """ Class for Arduino servo support 0.03 second delay noted """ def __init__(self, board): self.board = board self.sr = board.sr self.servo_pos = {} def attach(self, pin, min=544, max=2400): cmd_str = build_cmd_str("sva", (pin, min, max)) while True: self.sr.write(cmd_str) self.sr.flush() rd = self.sr.readline().replace("\r\n", "") if rd: break else: log.debug("trying to attach servo to pin {0}".format(pin)) position = int(rd) self.servo_pos[pin] = position return 1 def detach(self, pin): position = self.servo_pos[pin] cmd_str = build_cmd_str("svd", (position,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass del self.servo_pos[pin] def write(self, pin, angle): position = self.servo_pos[pin] cmd_str = build_cmd_str("svw", (position, angle)) self.sr.write(cmd_str) self.sr.flush() def writeMicroseconds(self, pin, uS): position = self.servo_pos[pin] cmd_str = build_cmd_str("svwm", (position, uS)) self.sr.write(cmd_str) self.sr.flush() def read(self, pin): if pin not in self.servo_pos.keys(): self.attach(pin) position = self.servo_pos[pin] cmd_str = build_cmd_str("svr", (position,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass rd = self.sr.readline().replace("\r\n", "") try: angle = int(rd) return angle except: return None class SoftwareSerial(object): """ Class for Arduino software serial functionality """ def __init__(self, board): self.board = board self.sr = board.sr self.connected = False def begin(self, p1, p2, baud): """ Create software serial instance on specified tx,rx pins, at specified baud """ cmd_str = build_cmd_str("ss", (p1, p2, baud)) try: self.sr.write(cmd_str) self.sr.flush() except: pass response = self.sr.readline().replace("\r\n", "") if response == "ss OK": self.connected = True return True else: self.connected = False return False def write(self, data): """ sends data to existing software serial instance using Arduino's 'write' function """ if self.connected: cmd_str = build_cmd_str("sw", (data,)) try: self.sr.write(cmd_str) self.sr.flush() except: pass response = self.sr.readline().replace("\r\n", "") if response == "ss OK": return True else: return False def read(self): """ returns first character read from existing software serial instance """ if self.connected: cmd_str = build_cmd_str("sr") self.sr.write(cmd_str) self.sr.flush() response = self.sr.readline().replace("\r\n", "") if response: return response else: return False