Merge pull request #5 from ianjosephwilson/master

Refactoring and testing.
This commit is contained in:
Tristan Hearn
2013-06-01 21:19:39 -07:00
4 changed files with 350 additions and 131 deletions

View File

@@ -1,4 +1,5 @@
#!/usr/bin/env python
import logging
import itertools
import platform
import serial
@@ -10,6 +11,9 @@ else:
import glob
log = logging.getLogger(__name__)
def enumerate_serial_ports():
"""
Uses the Win32 registry to return a iterator of serial
@@ -29,62 +33,86 @@ def enumerate_serial_ports():
break
def build_cmd_str(cmd, args=None):
"""
Build a command string that can be sent to the arduino.
Input:
cmd (str): the command to send to the arduino, must not
contain a % character
args (iterable): the arguments to send to the command
@TODO: a strategy is needed to escape % characters in the args
"""
if args:
args = '%'.join(map(str, args))
else:
args = ''
return "@{cmd}%{args}$!".format(cmd=cmd, args=args)
def find_port(baud, timeout):
"""
Find the first port that is connected to an arduino with a compatible
sketch installed.
"""
if platform.system() == 'Windows':
ports = enumerate_serial_ports()
elif platform.system() == 'Darwin':
ports = [i[0] for i in list_ports.comports()]
else:
ports = glob.glob("/dev/ttyUSB*") + glob.glob("/dev/ttyACM*")
for p in ports:
log.debug('Found {0}, testing...'.format(p))
try:
sr = serial.Serial(p, baud, timeout=timeout)
except serial.serialutil.SerialException, e:
log.debug(str(e))
continue
time.sleep(2)
version = get_version(sr)
if version != 'version':
log.debug('Bad version {0}. This is not a Shrimp/Arduino!'.format(
version))
sr.close()
continue
log.info('Using port {0}.'.format(p))
if sr:
return sr
return None
def get_version(sr):
cmd_str = build_cmd_str("version")
try:
sr.write(cmd_str)
sr.flush()
except Exception:
return None
return sr.readline().replace("\r\n", "")
class Arduino(object):
def __init__(self, baud=9600, port=None, timeout=2):
def __init__(self, baud=9600, port=None, timeout=2, sr=None):
"""
Initializes serial communication with Arduino.
Initializes serial communication with Arduino if no connection is given.
Attempts to self-select COM port, if not specified.
"""
self.baud = baud
self.timeout = timeout
self.ss_connected = False
self.port = port
if not self.port:
self.findPort()
else:
self.sr = serial.Serial(self.port, self.baud,
timeout = self.timeout)
if not sr:
if not port:
sr = find_port(baud, timeout)
if not sr:
raise ValueError("Could not find port.")
else:
sr = serial.Serial(port, baud, timeout=timeout)
sr.flush()
self.sr = sr
self.SoftwareSerial = SoftwareSerial(self)
self.Servos = Servos(self)
self.sr.flush()
def version(self):
cmd_str = ''.join(["@version%$!"])
try:
self.sr.write(cmd_str)
self.sr.flush()
except:
pass
version = self.sr.readline().replace("\r\n", "")
return version
def findPort(self):
"""
Sets port to the first Arduino found
in system's device list
"""
if platform.system() == 'Windows':
ports = enumerate_serial_ports()
elif platform.system() == 'Darwin':
ports = [i[0] for i in list_ports.comports()]
else:
ports = glob.glob("/dev/ttyUSB*")
for p in ports:
print 'Found ', p
version = None
try:
print 'Testing ', p
self.sr = serial.Serial(p, self.baud, timeout=self.timeout)
time.sleep(2)
version = self.version()
if version != 'version':
raise Exception('This is not a Shrimp/Arduino!')
self.port = p
print p, 'passed'
break
except Exception, e:
print "Exception: ", e
pass
return get_version(self.sr)
def digitalWrite(self, pin, val):
"""
@@ -99,7 +127,7 @@ class Arduino(object):
pin_ = -pin
else:
pin_ = pin
cmd_str = ''.join(["@dw%", str(pin_), "$!"])
cmd_str = build_cmd_str("dw", (pin_,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -119,7 +147,7 @@ class Arduino(object):
val = 255
elif val < 0:
val = 0
cmd_str = ''.join(["@aw%", str(pin), "%", str(val), "$!"])
cmd_str = build_cmd_str("aw", (pin, val))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -135,7 +163,7 @@ class Arduino(object):
returns:
value: integer from 1 to 1023
"""
cmd_str = ''.join(["@ar%", str(pin), "$!"])
cmd_str = build_cmd_str("ar", (pin,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -158,7 +186,7 @@ class Arduino(object):
pin_ = -pin
else:
pin_ = pin
cmd_str = ''.join(["@pm%", str(pin_), "$!"])
cmd_str = build_cmd_str("pm", (pin_,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -178,7 +206,7 @@ class Arduino(object):
pin_ = -pin
else:
pin_ = pin
cmd_str = ''.join(["@pi%", str(pin_), "$!"])
cmd_str = build_cmd_str("pi", (pin_,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -223,7 +251,7 @@ class Arduino(object):
pin_ = -pin
else:
pin_ = pin
cmd_str = ''.join(["@ps%", str(pin_), "$!"])
cmd_str = build_cmd_str("ps", (pin_,))
durations = []
for s in range(numTrials):
try:
@@ -246,8 +274,9 @@ class Arduino(object):
return -1
def close(self):
self.sr.flush()
self.sr.close()
if self.sr.isOpen():
self.sr.flush()
self.sr.close()
def digitalRead(self, pin):
"""
@@ -258,7 +287,7 @@ class Arduino(object):
returns:
value: 0 for "LOW", 1 for "HIGH"
"""
cmd_str = ''.join(["@dr%", str(pin), "$!"])
cmd_str = build_cmd_str("dr", (pin,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -300,22 +329,17 @@ class Arduino(object):
,GS7=3322,A7=3520,AS7=3729,B7=3951,C8=4186,CS8=4435,D8=4699,DS8=4978)
if (type(melody) == list) and (type(durations) == list):
length = len(melody)
cmd_str = "@to%"+str(length)+"%"+str(pin)+"%"
d = ""
cmd_args = [length, pin]
if length == len(durations):
for note in range(length):
n = NOTES.get(melody[note])
cmd_str = cmd_str+str(n)+"%"
for duration in range(len(durations)):
d = str(durations[duration])
cmd_str = cmd_str+d+"%"
cmd_str = cmd_str[:-1]+"$!"
cmd_args.extend([NOTES.get(melody[note]) for note in range(length)])
cmd_args.extend([durations[duration] for duration in range(len(durations))])
cmd_str = build_cmd_str("to", cmd_args)
try:
self.sr.write(cmd_str)
self.sr.flush()
except:
pass
cmd_str=''.join(["@nto%",str(pin),"$!"])
cmd_str = build_cmd_str("nto", [pin])
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -326,7 +350,6 @@ class Arduino(object):
else:
return -1
def capacitivePin(self, pin):
'''
Input:
@@ -338,9 +361,9 @@ class Arduino(object):
the pin is toggled to output mode to discharge the port,
and if connected to a voltage source,
will short circuit the pin, potentially damaging
the Arduino/Shrimp and any hardware attached to the pin.
the Arduino/Shrimp and any hardware attached to the pin.
'''
cmd_str="@cap%"+str(pin)+"$!"
cmd_str = build_cmd_str("cap", (pin,))
self.sr.write(cmd_str)
rd = self.sr.readline().replace("\r\n","")
if rd.isdigit() == True:
@@ -356,7 +379,7 @@ class Arduino(object):
pinOrder (String): either 'MSBFIRST' or 'LSBFIRST'
value (int): an integer from 0 and 255
"""
cmd_str = self._buildCmdStr("so",
cmd_str = build_cmd_str("so",
(dataPin, clockPin, pinOrder, value))
self.sr.write(cmd_str)
self.sr.flush()
@@ -372,31 +395,20 @@ class Arduino(object):
Output:
(int) an integer from 0 to 255
"""
cmd_str = self._buildCmdStr("si", (dataPin, clockPin, pinOrder))
cmd_str = build_cmd_str("si", (dataPin, clockPin, pinOrder))
self.sr.write(cmd_str)
self.sr.flush()
rd = self.sr.readline().replace("\r\n","")
if rd.isdigit() == True:
return int(rd)
def _buildCmdStr(self, cmd, args):
"""
Build a command string that can be sent to the arduino.
Input:
cmd (str): the command to send to the arduino
args (iterable): the arguments to send to the command
"""
args = '%'.join(map(str, args))
return "@{cmd}%{args}$!".format(cmd=cmd, args=args)
class Shrimp(Arduino):
def __init__(self):
Arduino.__init__(self)
class Wires(object):
class Wires(object):
"""
Class for Arduino wire (i2c) support
"""
@@ -404,7 +416,7 @@ class Wires(object):
self.board = board
self.sr = board.sr
class Servos(object):
"""
Class for Arduino servo support
@@ -415,26 +427,24 @@ class Servos(object):
self.sr = board.sr
self.servo_pos = {}
def attach(self,pin,min = 544, max = 2400):
cmd_str=''.join(["@sva%",str(pin),"%",str(min),"%",str(max),"$!"])
def attach(self, pin, min=544, max=2400):
cmd_str = build_cmd_str("sva", (pin, min, max))
while True:
self.sr.write(cmd_str)
self.sr.flush()
rd = self.sr.readline().replace("\r\n","")
if rd:
break
else:
print "trying to attach servo to pin",pin
log.debug("trying to attach servo to pin {0}".format(pin))
position = int(rd)
self.servo_pos[pin] = position
return 1
def detach(self,pin):
cmd_str=''.join(["@svd%",str(position),"$!"])
def detach(self, pin):
cmd_str = build_cmd_str("svd", (position,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -442,27 +452,25 @@ class Servos(object):
pass
del self.servo_pos[pin]
def write(self,pin,angle):
def write(self, pin, angle):
position = self.servo_pos[pin]
cmd_str=''.join(["@svw%",str(position),"%",str(angle),"$!"])
cmd_str = build_cmd_str("svw" (position, angle))
self.sr.write(cmd_str)
self.sr.flush()
def writeMicroseconds(self,pin,uS):
cmd_str=''.join(["@svw%",str(position),"%",str(uS),"$!"])
def writeMicroseconds(self, pin, uS):
position = self.servo_pos[pin]
cmd_str = build_cmd_str("svw", (position, uS))
self.sr.write(cmd_str)
self.sr.flush()
def read(self,pin):
def read(self, pin):
if pin not in self.servo_pos.keys():
self.attach(pin)
self.attach(pin)
position = self.servo_pos[pin]
cmd_str=''.join(["@svr%",str(position),"$!"])
cmd_str = build_cmd_str("svr", (position,))
try:
self.sr.write(cmd_str)
self.sr.flush()
@@ -480,24 +488,23 @@ class SoftwareSerial(object):
"""
Class for Arduino software serial functionality
"""
def __init__(self,board):
self.board=board
def __init__(self, board):
self.board = board
self.sr = board.sr
self.connected = False
def begin(self,p1,p2,baud):
def begin(self, p1, p2, baud):
"""
Create software serial instance on
Create software serial instance on
specified tx,rx pins, at specified baud
"""
cmd_str=''.join(["@ss%",str(p1),"%",str(p2),"%",str(baud),"$!"])
cmd_str = build_cmd_str("ss", (p1, p2, baud))
try:
self.sr.write(cmd_str)
self.sr.flush()
except:
pass
response= self.sr.readline().replace("\r\n","")
response = self.sr.readline().replace("\r\n","")
if response == "ss OK":
self.connected = True
return True
@@ -505,37 +512,35 @@ class SoftwareSerial(object):
self.connected = False
return False
def write(self,data):
def write(self, data):
"""
sends data to existing software serial instance
sends data to existing software serial instance
using Arduino's 'write' function
"""
if self.connected:
cmd_str=''.join(["@sw%",str(data),"$!"])
cmd_str = build_cmd_str("sw", (data,))
try:
self.sr.write(cmd_str)
self.sr.flush()
except:
pass
response= self.sr.readline().replace("\r\n","")
response = self.sr.readline().replace("\r\n","")
if response == "ss OK":
return True
else:
return False
def read(self):
"""
returns first character read from
existing software serial instance
"""
if self.connected:
cmd_str=''.join(["@sr%$!"])
cmd_str = build_cmd_str("sr")
self.sr.write(cmd_str)
self.sr.flush()
response= self.sr.readline().replace("\r\n","")
response = self.sr.readline().replace("\r\n","")
if response:
return response
else:
return False
return False

View File

@@ -55,10 +55,16 @@ connecting and issuing commands to a live Arduino, hosting any hardware
required to test a particular function. But a core of basic communication tests
should at least be maintained here and used before merging into the `master` branch.
After installation, the tests can be run from the source directory:
After installation, the interactive tests can be run from the source directory:
```bash
$ python tests/test_main.py
```
Automated tests can be run from the source directory with:
```bash
$ python tests/test_arduino.py
```
## Classes
- `Arduino(baud)` - Set up communication with currently connected and powered
Arduino.
@@ -123,6 +129,13 @@ val = val / 4 # scale to 0 - 255
board.analogWrite(11) #Set analog value (PWM) based on analog measurement
```
**Shift Register**
- `Arduino.shiftIn(dataPin, clockPin, bitOrder)` shift a byte in and returns it
- `Arduino.shiftOut(dataPin, clockPin, bitOrder, value)` shift the given byte out
`bitOrder` should be either `"MSBFIRST"` or `"LSBFIRST"`
**Servo Library Functionality**
Support is included for up to 8 servos.

176
tests/test_arduino.py Normal file
View File

@@ -0,0 +1,176 @@
import logging
import unittest
logging.basicConfig(level=logging.DEBUG)
class MockSerial(object):
def __init__(self, baud, port, timeout=None):
self.port = port
self.baud = baud
self.timeout = timeout
self.output = []
self.input = []
def flush(self):
pass
def write(self, line):
self.output.append(line)
def readline(self):
"""
@TODO: This does not take timeout into account at all.
"""
return self.input.pop(0)
def reset_mock(self):
self.output = []
self.input = []
def push_line(self, line, term='\r\n'):
self.input.append(str(line) + term)
INPUT = "INPUT"
OUTPUT = "OUTPUT"
LOW = "LOW"
HIGH = "HIGH"
READ_LOW = 0
READ_HIGH = 1
MSBFIRST = "MSBFIRST"
LSBFIRST = "LSBFIRST"
class TestArduino(unittest.TestCase):
def parse_cmd_sr(self, cmd_str):
assert cmd_str[0] == '@'
first_index = cmd_str.find('%')
assert first_index != -1
assert cmd_str[-2:] == '$!'
# Skip over the @ and read up to but not including the %.
cmd = cmd_str[1:first_index]
# Skip over the first % and ignore the trailing $!.
args_str = cmd_str[first_index+1:-2]
args = args_str.split('%')
return cmd, args
def setUp(self):
from Arduino.arduino import Arduino
self.mock_serial = MockSerial(9600, '/dev/ttyACM0')
self.board = Arduino(sr=self.mock_serial)
def test_version(self):
from Arduino.arduino import build_cmd_str
expected_version = "version"
self.mock_serial.push_line(expected_version)
self.assertEquals(self.board.version(), expected_version)
self.assertEquals(self.mock_serial.output[0], build_cmd_str('version'))
def test_pinMode_input(self):
from Arduino.arduino import build_cmd_str
pin = 9
self.board.pinMode(pin, INPUT)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('pm', (-pin,)))
def test_pinMode_output(self):
from Arduino.arduino import build_cmd_str
pin = 9
self.board.pinMode(pin, OUTPUT)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('pm', (pin,)))
def test_pulseIn_low(self):
from Arduino.arduino import build_cmd_str
expected_duration = 230
self.mock_serial.push_line(expected_duration)
pin = 9
self.assertEquals(self.board.pulseIn(pin, LOW), expected_duration)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('pi', (-pin,)))
def test_pulseIn_high(self):
from Arduino.arduino import build_cmd_str
expected_duration = 230
pin = 9
self.mock_serial.push_line(expected_duration)
self.assertEquals(self.board.pulseIn(pin, HIGH), expected_duration)
self.assertEquals(self.mock_serial.output[0], build_cmd_str('pi', (pin,)))
def test_digitalRead(self):
from Arduino.arduino import build_cmd_str
pin = 9
self.mock_serial.push_line(READ_LOW)
self.assertEquals(self.board.digitalRead(pin), READ_LOW)
self.assertEquals(self.mock_serial.output[0], build_cmd_str('dr', (pin,)))
def test_digitalWrite_low(self):
from Arduino.arduino import build_cmd_str
pin = 9
self.board.digitalWrite(pin, LOW)
self.assertEquals(self.mock_serial.output[0], build_cmd_str('dw', (-pin,)))
def test_digitalWrite_high(self):
from Arduino.arduino import build_cmd_str
pin = 9
self.board.digitalWrite(pin, HIGH)
self.assertEquals(self.mock_serial.output[0], build_cmd_str('dw', (pin,)))
def test_melody(self):
from Arduino.arduino import build_cmd_str
pin = 9
notes = ["C4"]
duration = 4
C4_NOTE = 262
self.board.Melody(pin, notes, [duration])
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('to', (len(notes), pin, C4_NOTE, duration)))
self.assertEquals(self.mock_serial.output[1],
build_cmd_str('nto', (pin,)))
def test_shiftIn(self):
from Arduino.arduino import build_cmd_str
dataPin = 2
clockPin = 3
pinOrder = MSBFIRST
expected = 0xff
self.mock_serial.push_line(expected)
self.assertEquals(self.board.shiftIn(dataPin, clockPin, pinOrder),
expected)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('si', (dataPin, clockPin, pinOrder,)))
def test_shiftOut(self):
from Arduino.arduino import build_cmd_str
dataPin = 2
clockPin = 3
pinOrder = MSBFIRST
value = 0xff
self.board.shiftOut(dataPin, clockPin, pinOrder, value)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('so', (dataPin, clockPin, pinOrder, value)))
def test_analogRead(self):
from Arduino.arduino import build_cmd_str
pin = 9
expected = 1023
self.mock_serial.push_line(expected)
self.assertEquals(self.board.analogRead(pin), expected)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('ar', (pin,)))
def test_analogWrite(self):
from Arduino.arduino import build_cmd_str
pin = 9
value = 255
self.board.analogWrite(pin, value)
self.assertEquals(self.mock_serial.output[0],
build_cmd_str('aw', (pin, value)))
if __name__ == '__main__':
unittest.main()

View File

@@ -1,6 +1,6 @@
import logging
import unittest
import time
from Arduino import Arduino
"""
A collection of some basic tests for the Arduino library.
@@ -12,15 +12,40 @@ should at least be maintained here.
"""
logging.basicConfig(level=logging.DEBUG)
class TestBasics(unittest.TestCase):
_ = raw_input('Plug in Arduino board w/LED at pin 13, reset, then press enter')
board = Arduino('9600')
def test_find(self):
"""
Tests auto-connection/board detection
"""
self.assertIsNotNone(self.board.port)
""" Tests auto-connection/board detection. """
raw_input(
'Plug in Arduino board w/LED at pin 13, reset, then press enter')
from Arduino import Arduino
board = None
try:
# This will trigger automatic port resolution.
board = Arduino(9600)
finally:
if board:
board.close()
def test_open(self):
""" Tests connecting to an explicit port. """
port = None
while not port:
port = raw_input(
'Plug in Arduino board w/LED at pin 13, reset.\n'\
'Enter the port where the Arduino is connected, then press enter:')
if not port:
print 'You must enter a port.'
from Arduino import Arduino
board = None
try:
board = Arduino(9600, port=port)
finally:
if board:
board.close()
if __name__ == '__main__':
unittest.main()