mirror of
https://github.com/python/cpython.git
synced 2025-10-23 23:22:11 +00:00

This function used to live as pipes.quote, where it was undocumented but used anyway. (An alias still exists for backward compatibility.) The tests have been moved as is, but the code of the function was changed to use a regex instead of a loop with string comparisons (at Ian Bicking’s suggestion). I’m terrible at regexes, so any feedback is welcome.
196 lines
6.3 KiB
Python
196 lines
6.3 KiB
Python
import pipes
|
|
import os
|
|
import string
|
|
import unittest
|
|
from test.support import TESTFN, run_unittest, unlink, reap_children
|
|
|
|
if os.name != 'posix':
|
|
raise unittest.SkipTest('pipes module only works on posix')
|
|
|
|
TESTFN2 = TESTFN + "2"
|
|
|
|
# tr a-z A-Z is not portable, so make the ranges explicit
|
|
s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
|
|
|
|
class SimplePipeTests(unittest.TestCase):
|
|
def tearDown(self):
|
|
for f in (TESTFN, TESTFN2):
|
|
unlink(f)
|
|
|
|
def testSimplePipe1(self):
|
|
t = pipes.Template()
|
|
t.append(s_command, pipes.STDIN_STDOUT)
|
|
f = t.open(TESTFN, 'w')
|
|
f.write('hello world #1')
|
|
f.close()
|
|
with open(TESTFN) as f:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #1')
|
|
|
|
def testSimplePipe2(self):
|
|
with open(TESTFN, 'w') as f:
|
|
f.write('hello world #2')
|
|
t = pipes.Template()
|
|
t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
|
|
t.copy(TESTFN, TESTFN2)
|
|
with open(TESTFN2) as f:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #2')
|
|
|
|
def testSimplePipe3(self):
|
|
with open(TESTFN, 'w') as f:
|
|
f.write('hello world #2')
|
|
t = pipes.Template()
|
|
t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
|
|
f = t.open(TESTFN, 'r')
|
|
try:
|
|
self.assertEqual(f.read(), 'HELLO WORLD #2')
|
|
finally:
|
|
f.close()
|
|
|
|
def testEmptyPipeline1(self):
|
|
# copy through empty pipe
|
|
d = 'empty pipeline test COPY'
|
|
with open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
with open(TESTFN2, 'w') as f:
|
|
f.write('')
|
|
t=pipes.Template()
|
|
t.copy(TESTFN, TESTFN2)
|
|
with open(TESTFN2) as f:
|
|
self.assertEqual(f.read(), d)
|
|
|
|
def testEmptyPipeline2(self):
|
|
# read through empty pipe
|
|
d = 'empty pipeline test READ'
|
|
with open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
t=pipes.Template()
|
|
f = t.open(TESTFN, 'r')
|
|
try:
|
|
self.assertEqual(f.read(), d)
|
|
finally:
|
|
f.close()
|
|
|
|
def testEmptyPipeline3(self):
|
|
# write through empty pipe
|
|
d = 'empty pipeline test WRITE'
|
|
t = pipes.Template()
|
|
with t.open(TESTFN, 'w') as f:
|
|
f.write(d)
|
|
with open(TESTFN) as f:
|
|
self.assertEqual(f.read(), d)
|
|
|
|
def testRepr(self):
|
|
t = pipes.Template()
|
|
self.assertEqual(repr(t), "<Template instance, steps=[]>")
|
|
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
|
|
self.assertEqual(repr(t),
|
|
"<Template instance, steps=[('tr a-z A-Z', '--')]>")
|
|
|
|
def testSetDebug(self):
|
|
t = pipes.Template()
|
|
t.debug(False)
|
|
self.assertEqual(t.debugging, False)
|
|
t.debug(True)
|
|
self.assertEqual(t.debugging, True)
|
|
|
|
def testReadOpenSink(self):
|
|
# check calling open('r') on a pipe ending with
|
|
# a sink raises ValueError
|
|
t = pipes.Template()
|
|
t.append('boguscmd', pipes.SINK)
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
|
|
|
|
def testWriteOpenSource(self):
|
|
# check calling open('w') on a pipe ending with
|
|
# a source raises ValueError
|
|
t = pipes.Template()
|
|
t.prepend('boguscmd', pipes.SOURCE)
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
|
|
|
|
def testBadAppendOptions(self):
|
|
t = pipes.Template()
|
|
|
|
# try a non-string command
|
|
self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
|
|
|
|
# try a type that isn't recognized
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
|
|
|
|
# shouldn't be able to append a source
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
|
|
|
|
# check appending two sinks
|
|
t = pipes.Template()
|
|
t.append('boguscmd', pipes.SINK)
|
|
self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
|
|
|
|
# command needing file input but with no $IN
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd',
|
|
pipes.FILEIN_STDOUT)
|
|
|
|
# command needing file output but with no $OUT
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd $IN',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.append, 'boguscmd',
|
|
pipes.STDIN_FILEOUT)
|
|
|
|
|
|
def testBadPrependOptions(self):
|
|
t = pipes.Template()
|
|
|
|
# try a non-string command
|
|
self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
|
|
|
|
# try a type that isn't recognized
|
|
self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
|
|
|
|
# shouldn't be able to prepend a sink
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
|
|
|
|
# check prepending two sources
|
|
t = pipes.Template()
|
|
t.prepend('boguscmd', pipes.SOURCE)
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
|
|
|
|
# command needing file input but with no $IN
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd',
|
|
pipes.FILEIN_STDOUT)
|
|
|
|
# command needing file output but with no $OUT
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
|
|
pipes.FILEIN_FILEOUT)
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.prepend, 'boguscmd',
|
|
pipes.STDIN_FILEOUT)
|
|
|
|
def testBadOpenMode(self):
|
|
t = pipes.Template()
|
|
self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
|
|
|
|
def testClone(self):
|
|
t = pipes.Template()
|
|
t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
|
|
|
|
u = t.clone()
|
|
self.assertNotEqual(id(t), id(u))
|
|
self.assertEqual(t.steps, u.steps)
|
|
self.assertNotEqual(id(t.steps), id(u.steps))
|
|
self.assertEqual(t.debugging, u.debugging)
|
|
|
|
def test_main():
|
|
run_unittest(SimplePipeTests)
|
|
reap_children()
|
|
|
|
if __name__ == "__main__":
|
|
test_main()
|