mirror of
https://github.com/python/cpython.git
synced 2025-09-28 19:25:27 +00:00
commit merge
This commit is contained in:
commit
b61efd0a68
126 changed files with 2332 additions and 441 deletions
|
@ -1,6 +1,6 @@
|
|||
import sqlite3
|
||||
|
||||
# The shared cache is only available in SQLite versions 3.3.3 or later
|
||||
# See the SQLite documentaton for details.
|
||||
# See the SQLite documentation for details.
|
||||
|
||||
sqlite3.enable_shared_cache(True)
|
||||
|
|
|
@ -743,6 +743,17 @@ as internal buffering of data.
|
|||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: fexecve(fd, args, env)
|
||||
|
||||
Execute the program specified by a file descriptor *fd* with arguments given
|
||||
by *args* and environment given by *env*, replacing the current process.
|
||||
*args* and *env* are given as in :func:`execve`.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: fpathconf(fd, name)
|
||||
|
||||
Return system configuration information relevant to an open file. *name*
|
||||
|
@ -819,6 +830,45 @@ as internal buffering of data.
|
|||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: futimens(fd, (atime_sec, atime_nsec), (mtime_sec, mtime_nsec))
|
||||
futimens(fd, None, None)
|
||||
|
||||
Updates the timestamps of a file specified by the file descriptor *fd*, with
|
||||
nanosecond precision.
|
||||
The second form sets *atime* and *mtime* to the current time.
|
||||
If *atime_nsec* or *mtime_nsec* is specified as :data:`UTIME_NOW`, the corresponding
|
||||
timestamp is updated to the current time.
|
||||
If *atime_nsec* or *mtime_nsec* is specified as :data:`UTIME_OMIT`, the corresponding
|
||||
timestamp is not updated.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. data:: UTIME_NOW
|
||||
UTIME_OMIT
|
||||
|
||||
Flags used with :func:`futimens` to specify that the timestamp must be
|
||||
updated either to the current time or not updated at all.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: futimes(fd, (atime, mtime))
|
||||
futimes(fd, None)
|
||||
|
||||
Set the access and modified time of the file specified by the file
|
||||
descriptor *fd* to the given values. If the second form is used, set the
|
||||
access and modified times to the current time.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: isatty(fd)
|
||||
|
||||
Return ``True`` if the file descriptor *fd* is open and connected to a
|
||||
|
@ -841,6 +891,30 @@ as internal buffering of data.
|
|||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: lockf(fd, cmd, len)
|
||||
|
||||
Apply, test or remove a POSIX lock on an open file descriptor.
|
||||
*fd* is an open file descriptor.
|
||||
*cmd* specifies the command to use - one of :data:`F_LOCK`, :data:`F_TLOCK`,
|
||||
:data:`F_ULOCK` or :data:`F_TEST`.
|
||||
*len* specifies the section of the file to lock.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. data:: F_LOCK
|
||||
F_TLOCK
|
||||
F_ULOCK
|
||||
F_TEST
|
||||
|
||||
Flags that specify what action :func:`lockf` will take.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
.. function:: lseek(fd, pos, how)
|
||||
|
||||
Set the current position of file descriptor *fd* to position *pos*, modified
|
||||
|
@ -945,6 +1019,66 @@ as internal buffering of data.
|
|||
Availability: Unix, Windows.
|
||||
|
||||
|
||||
.. function:: posix_fallocate(fd, offset, len)
|
||||
|
||||
Ensures that enough disk space is allocated for the file specified by *fd*
|
||||
starting from *offset* and continuing for *len* bytes.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: posix_fadvise(fd, offset, len, advice)
|
||||
|
||||
Announces an intention to access data in a specific pattern thus allowing
|
||||
the kernel to make optimizations.
|
||||
The advice applies to the region of the file specified by *fd* starting at
|
||||
*offset* and continuing for *len* bytes.
|
||||
*advice* is one of :data:`POSIX_FADV_NORMAL`, :data:`POSIX_FADV_SEQUENTIAL`,
|
||||
:data:`POSIX_FADV_RANDOM`, :data:`POSIX_FADV_NOREUSE`,
|
||||
:data:`POSIX_FADV_WILLNEED` or :data:`POSIX_FADV_DONTNEED`.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. data:: POSIX_FADV_NORMAL
|
||||
POSIX_FADV_SEQUENTIAL
|
||||
POSIX_FADV_RANDOM
|
||||
POSIX_FADV_NOREUSE
|
||||
POSIX_FADV_WILLNEED
|
||||
POSIX_FADV_DONTNEED
|
||||
|
||||
Flags that can be used in *advice* in :func:`posix_fadvise` that specify
|
||||
the access pattern that is likely to be used.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: pread(fd, buffersize, offset)
|
||||
|
||||
Read from a file descriptor, *fd*, at a position of *offset*. It will read up
|
||||
to *buffersize* number of bytes. The file offset remains unchanged.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: pwrite(fd, string, offset)
|
||||
|
||||
Write *string* to a file descriptor, *fd*, from *offset*, leaving the file
|
||||
offset unchanged.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: read(fd, n)
|
||||
|
||||
Read at most *n* bytes from file descriptor *fd*. Return a bytestring containing the
|
||||
|
@ -1038,6 +1172,17 @@ as internal buffering of data.
|
|||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: readv(fd, buffers)
|
||||
|
||||
Read from a file descriptor into a number of writable buffers. *buffers* is
|
||||
an arbitrary sequence of writable buffers. Returns the total number of bytes
|
||||
read.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: tcgetpgrp(fd)
|
||||
|
||||
Return the process group associated with the terminal given by *fd* (an open
|
||||
|
@ -1111,6 +1256,17 @@ as internal buffering of data.
|
|||
:meth:`~file.write` method.
|
||||
|
||||
|
||||
.. function:: writev(fd, buffers)
|
||||
|
||||
Write the the contents of *buffers* to file descriptor *fd*, where *buffers*
|
||||
is an arbitrary sequence of buffers.
|
||||
Returns the total number of bytes written.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. _open-constants:
|
||||
|
||||
``open()`` flag constants
|
||||
|
@ -1384,6 +1540,17 @@ Files and Directories
|
|||
Added support for Windows 6.0 (Vista) symbolic links.
|
||||
|
||||
|
||||
.. function:: lutimes(path, (atime, mtime))
|
||||
lutimes(path, None)
|
||||
|
||||
Like :func:`utime`, but if *path* is a symbolic link, it is not
|
||||
dereferenced.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: mkfifo(path[, mode])
|
||||
|
||||
Create a FIFO (a named pipe) named *path* with numeric mode *mode*. The
|
||||
|
@ -1727,6 +1894,25 @@ Files and Directories
|
|||
Added support for Windows 6.0 (Vista) symbolic links.
|
||||
|
||||
|
||||
.. function:: sync()
|
||||
|
||||
Force write of everything to disk.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: truncate(path, length)
|
||||
|
||||
Truncate the file corresponding to *path*, so that it is at most
|
||||
*length* bytes in size.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: unlink(path)
|
||||
|
||||
Remove (delete) the file *path*. This is the same function as
|
||||
|
@ -2306,6 +2492,58 @@ written in Python, such as a mail server's external command delivery program.
|
|||
|
||||
Availability: Unix.
|
||||
|
||||
.. function:: waitid(idtype, id, options)
|
||||
|
||||
Wait for the completion of one or more child processes.
|
||||
*idtype* can be :data:`P_PID`, :data:`P_PGID` or :data:`P_ALL`.
|
||||
*id* specifies the pid to wait on.
|
||||
*options* is constructed from the ORing of one or more of :data:`WEXITED`,
|
||||
:data:`WSTOPPED` or :data:`WCONTINUED` and additionally may be ORed with
|
||||
:data:`WNOHANG` or :data:`WNOWAIT`. The return value is an object
|
||||
representing the data contained in the :c:type:`siginfo_t` structure, namely:
|
||||
:attr:`si_pid`, :attr:`si_uid`, :attr:`si_signo`, :attr:`si_status`,
|
||||
:attr:`si_code` or ``None`` if :data:`WNOHANG` is specified and there are no
|
||||
children in a waitable state.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
.. data:: P_PID
|
||||
P_PGID
|
||||
P_ALL
|
||||
|
||||
These are the possible values for *idtype* in :func:`waitid`. They affect
|
||||
how *id* is interpreted.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
.. data:: WEXITED
|
||||
WSTOPPED
|
||||
WNOWAIT
|
||||
|
||||
Flags that can be used in *options* in :func:`waitid` that specify what
|
||||
child signal to wait for.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. data:: CLD_EXITED
|
||||
CLD_DUMPED
|
||||
CLD_TRAPPED
|
||||
CLD_CONTINUED
|
||||
|
||||
These are the possible values for :attr:`si_code` in the result returned by
|
||||
:func:`waitid`.
|
||||
|
||||
Availability: Unix.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. function:: waitpid(pid, options)
|
||||
|
||||
|
|
|
@ -125,12 +125,14 @@ This module defines one class called :class:`Popen`:
|
|||
|
||||
*stdin*, *stdout* and *stderr* specify the executed programs' standard input,
|
||||
standard output and standard error file handles, respectively. Valid values
|
||||
are :data:`PIPE`, an existing file descriptor (a positive integer), an
|
||||
existing :term:`file object`, and ``None``. :data:`PIPE` indicates that a
|
||||
new pipe to the child should be created. With ``None``, no redirection will
|
||||
occur; the child's file handles will be inherited from the parent. Additionally,
|
||||
*stderr* can be :data:`STDOUT`, which indicates that the stderr data from the
|
||||
applications should be captured into the same file handle as for stdout.
|
||||
are :data:`PIPE`, :data:`DEVNULL`, an existing file descriptor (a positive
|
||||
integer), an existing :term:`file object`, and ``None``. :data:`PIPE`
|
||||
indicates that a new pipe to the child should be created. :data:`DEVNULL`
|
||||
indicates that the special file :data:`os.devnull` will be used. With ``None``,
|
||||
no redirection will occur; the child's file handles will be inherited from
|
||||
the parent. Additionally, *stderr* can be :data:`STDOUT`, which indicates
|
||||
that the stderr data from the applications should be captured into the same
|
||||
file handle as for stdout.
|
||||
|
||||
If *preexec_fn* is set to a callable object, this object will be called in the
|
||||
child process just before the child is executed.
|
||||
|
@ -229,6 +231,15 @@ This module defines one class called :class:`Popen`:
|
|||
Added context manager support.
|
||||
|
||||
|
||||
.. data:: DEVNULL
|
||||
|
||||
Special value that can be used as the *stdin*, *stdout* or *stderr* argument
|
||||
to :class:`Popen` and indicates that the special file :data:`os.devnull`
|
||||
will be used.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
|
||||
|
||||
.. data:: PIPE
|
||||
|
||||
Special value that can be used as the *stdin*, *stdout* or *stderr* argument
|
||||
|
@ -387,7 +398,7 @@ All of the functions and methods that accept a *timeout* parameter, such as
|
|||
:func:`call` and :meth:`Popen.communicate` will raise :exc:`TimeoutExpired` if
|
||||
the timeout expires before the process exits.
|
||||
|
||||
Exceptions defined in this module all inherit from :ext:`SubprocessError`.
|
||||
Exceptions defined in this module all inherit from :exc:`SubprocessError`.
|
||||
|
||||
.. versionadded:: 3.3
|
||||
The :exc:`SubprocessError` base class was added.
|
||||
|
|
|
@ -468,7 +468,7 @@ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx*/
|
|||
arbitrary data.
|
||||
|
||||
0 is returned on success. buffer and buffer_len are only
|
||||
set in case no error occurrs. Otherwise, -1 is returned and
|
||||
set in case no error occurs. Otherwise, -1 is returned and
|
||||
an exception set.
|
||||
*/
|
||||
|
||||
|
@ -482,7 +482,7 @@ xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx*/
|
|||
writable memory location in buffer of size buffer_len.
|
||||
|
||||
0 is returned on success. buffer and buffer_len are only
|
||||
set in case no error occurrs. Otherwise, -1 is returned and
|
||||
set in case no error occurs. Otherwise, -1 is returned and
|
||||
an exception set.
|
||||
*/
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@
|
|||
# endif
|
||||
|
||||
# if defined(__LP64__)
|
||||
/* MacOSX 10.4 (the first release to suppport 64-bit code
|
||||
/* MacOSX 10.4 (the first release to support 64-bit code
|
||||
* at all) only supports 64-bit in the UNIX layer.
|
||||
* Therefore surpress the toolbox-glue in 64-bit mode.
|
||||
*/
|
||||
|
|
|
@ -52,14 +52,13 @@ class FInfo:
|
|||
|
||||
def getfileinfo(name):
|
||||
finfo = FInfo()
|
||||
fp = io.open(name, 'rb')
|
||||
# Quick check for textfile
|
||||
data = fp.read(512)
|
||||
if 0 not in data:
|
||||
finfo.Type = 'TEXT'
|
||||
fp.seek(0, 2)
|
||||
dsize = fp.tell()
|
||||
fp.close()
|
||||
with io.open(name, 'rb') as fp:
|
||||
# Quick check for textfile
|
||||
data = fp.read(512)
|
||||
if 0 not in data:
|
||||
finfo.Type = 'TEXT'
|
||||
fp.seek(0, 2)
|
||||
dsize = fp.tell()
|
||||
dir, file = os.path.split(name)
|
||||
file = file.replace(':', '-', 1)
|
||||
return file, finfo, dsize, 0
|
||||
|
@ -140,19 +139,26 @@ class _Rlecoderengine:
|
|||
class BinHex:
|
||||
def __init__(self, name_finfo_dlen_rlen, ofp):
|
||||
name, finfo, dlen, rlen = name_finfo_dlen_rlen
|
||||
close_on_error = False
|
||||
if isinstance(ofp, str):
|
||||
ofname = ofp
|
||||
ofp = io.open(ofname, 'wb')
|
||||
ofp.write(b'(This file must be converted with BinHex 4.0)\r\r:')
|
||||
hqxer = _Hqxcoderengine(ofp)
|
||||
self.ofp = _Rlecoderengine(hqxer)
|
||||
self.crc = 0
|
||||
if finfo is None:
|
||||
finfo = FInfo()
|
||||
self.dlen = dlen
|
||||
self.rlen = rlen
|
||||
self._writeinfo(name, finfo)
|
||||
self.state = _DID_HEADER
|
||||
close_on_error = True
|
||||
try:
|
||||
ofp.write(b'(This file must be converted with BinHex 4.0)\r\r:')
|
||||
hqxer = _Hqxcoderengine(ofp)
|
||||
self.ofp = _Rlecoderengine(hqxer)
|
||||
self.crc = 0
|
||||
if finfo is None:
|
||||
finfo = FInfo()
|
||||
self.dlen = dlen
|
||||
self.rlen = rlen
|
||||
self._writeinfo(name, finfo)
|
||||
self.state = _DID_HEADER
|
||||
except:
|
||||
if close_on_error:
|
||||
ofp.close()
|
||||
raise
|
||||
|
||||
def _writeinfo(self, name, finfo):
|
||||
nl = len(name)
|
||||
|
|
|
@ -284,7 +284,7 @@ class Sniffer:
|
|||
an all or nothing approach, so we allow for small variations in this
|
||||
number.
|
||||
1) build a table of the frequency of each character on every line.
|
||||
2) build a table of freqencies of this frequency (meta-frequency?),
|
||||
2) build a table of frequencies of this frequency (meta-frequency?),
|
||||
e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,
|
||||
7 times in 2 rows'
|
||||
3) use the mode of the meta-frequency to determine the /expected/
|
||||
|
|
|
@ -37,7 +37,7 @@ class ArrayTestCase(unittest.TestCase):
|
|||
values = [ia[i] for i in range(len(init))]
|
||||
self.assertEqual(values, [0] * len(init))
|
||||
|
||||
# Too many in itializers should be caught
|
||||
# Too many initializers should be caught
|
||||
self.assertRaises(IndexError, int_array, *range(alen*2))
|
||||
|
||||
CharArray = ARRAY(c_char, 3)
|
||||
|
|
|
@ -27,7 +27,7 @@ class InitTest(unittest.TestCase):
|
|||
self.assertEqual((y.x.a, y.x.b), (0, 0))
|
||||
self.assertEqual(y.x.new_was_called, False)
|
||||
|
||||
# But explicitely creating an X structure calls __new__ and __init__, of course.
|
||||
# But explicitly creating an X structure calls __new__ and __init__, of course.
|
||||
x = X()
|
||||
self.assertEqual((x.a, x.b), (9, 12))
|
||||
self.assertEqual(x.new_was_called, True)
|
||||
|
|
|
@ -157,7 +157,7 @@ class NumberTestCase(unittest.TestCase):
|
|||
def test_int_from_address(self):
|
||||
from array import array
|
||||
for t in signed_types + unsigned_types:
|
||||
# the array module doesn't suppport all format codes
|
||||
# the array module doesn't support all format codes
|
||||
# (no 'q' or 'Q')
|
||||
try:
|
||||
array(t._type_)
|
||||
|
|
|
@ -17,7 +17,7 @@ if sys.platform == "win32" and sizeof(c_void_p) == sizeof(c_int):
|
|||
# ValueError: Procedure probably called with not enough arguments (4 bytes missing)
|
||||
self.assertRaises(ValueError, IsWindow)
|
||||
|
||||
# This one should succeeed...
|
||||
# This one should succeed...
|
||||
self.assertEqual(0, IsWindow(0))
|
||||
|
||||
# ValueError: Procedure probably called with too many arguments (8 bytes in excess)
|
||||
|
|
|
@ -1719,7 +1719,7 @@ class HtmlDiff(object):
|
|||
line = line.replace(' ','\0')
|
||||
# expand tabs into spaces
|
||||
line = line.expandtabs(self._tabsize)
|
||||
# relace spaces from expanded tabs back into tab characters
|
||||
# replace spaces from expanded tabs back into tab characters
|
||||
# (we'll replace them with markup after we do differencing)
|
||||
line = line.replace(' ','\t')
|
||||
return line.replace('\0',' ').rstrip('\n')
|
||||
|
|
|
@ -359,7 +359,7 @@ class Command:
|
|||
not self.force, dry_run=self.dry_run)
|
||||
|
||||
def move_file (self, src, dst, level=1):
|
||||
"""Move a file respectin dry-run flag."""
|
||||
"""Move a file respecting dry-run flag."""
|
||||
return file_util.move_file(src, dst, dry_run=self.dry_run)
|
||||
|
||||
def spawn(self, cmd, search_path=1, level=1):
|
||||
|
|
|
@ -155,7 +155,7 @@ class CygwinCCompiler(UnixCCompiler):
|
|||
self.dll_libraries = get_msvcr()
|
||||
|
||||
def _compile(self, obj, src, ext, cc_args, extra_postargs, pp_opts):
|
||||
"""Compiles the source by spawing GCC and windres if needed."""
|
||||
"""Compiles the source by spawning GCC and windres if needed."""
|
||||
if ext == '.rc' or ext == '.res':
|
||||
# gcc needs '.res' and '.rc' compiled to object files !!!
|
||||
try:
|
||||
|
|
|
@ -39,7 +39,7 @@ class cleanTestCase(support.TempdirManager,
|
|||
self.assertTrue(not os.path.exists(path),
|
||||
'%s was not removed' % path)
|
||||
|
||||
# let's run the command again (should spit warnings but suceed)
|
||||
# let's run the command again (should spit warnings but succeed)
|
||||
cmd.all = 1
|
||||
cmd.ensure_finalized()
|
||||
cmd.run()
|
||||
|
|
|
@ -62,7 +62,7 @@ class InstallTestCase(support.TempdirManager,
|
|||
if sys.version < '2.6':
|
||||
return
|
||||
|
||||
# preparing the environement for the test
|
||||
# preparing the environment for the test
|
||||
self.old_user_base = site.USER_BASE
|
||||
self.old_user_site = site.USER_SITE
|
||||
self.tmpdir = self.mkdtemp()
|
||||
|
|
|
@ -311,7 +311,7 @@ class SDistTestCase(PyPIRCCommandTestCase):
|
|||
# adding a file
|
||||
self.write_file((self.tmp_dir, 'somecode', 'doc2.txt'), '#')
|
||||
|
||||
# make sure build_py is reinitinialized, like a fresh run
|
||||
# make sure build_py is reinitialized, like a fresh run
|
||||
build_py = dist.get_command_obj('build_py')
|
||||
build_py.finalized = False
|
||||
build_py.ensure_finalized()
|
||||
|
|
|
@ -1211,7 +1211,7 @@ class DocTestRunner:
|
|||
# Process each example.
|
||||
for examplenum, example in enumerate(test.examples):
|
||||
|
||||
# If REPORT_ONLY_FIRST_FAILURE is set, then supress
|
||||
# If REPORT_ONLY_FIRST_FAILURE is set, then suppress
|
||||
# reporting after the first failure.
|
||||
quiet = (self.optionflags & REPORT_ONLY_FIRST_FAILURE and
|
||||
failures > 0)
|
||||
|
@ -2135,7 +2135,7 @@ class DocTestCase(unittest.TestCase):
|
|||
caller can catch the errors and initiate post-mortem debugging.
|
||||
|
||||
The DocTestCase provides a debug method that raises
|
||||
UnexpectedException errors if there is an unexepcted
|
||||
UnexpectedException errors if there is an unexpected
|
||||
exception:
|
||||
|
||||
>>> test = DocTestParser().get_doctest('>>> raise KeyError\n42',
|
||||
|
|
|
@ -12,7 +12,7 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
from base64 import b64encode as _bencode
|
||||
from base64 import encodebytes as _bencode
|
||||
from quopri import encodestring as _encodestring
|
||||
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ ecre = re.compile(r'''
|
|||
# For use with .match()
|
||||
fcre = re.compile(r'[\041-\176]+:$')
|
||||
|
||||
# Find a header embeded in a putative header value. Used to check for
|
||||
# Find a header embedded in a putative header value. Used to check for
|
||||
# header injection attack.
|
||||
_embeded_header = re.compile(r'\n[^ \t]+:')
|
||||
|
||||
|
@ -314,7 +314,7 @@ class Header:
|
|||
self._continuation_ws, splitchars)
|
||||
for string, charset in self._chunks:
|
||||
lines = string.splitlines()
|
||||
formatter.feed(lines[0], charset)
|
||||
formatter.feed(lines[0] if lines else '', charset)
|
||||
for line in lines[1:]:
|
||||
formatter.newline()
|
||||
if charset.header_encoding is not None:
|
||||
|
|
|
@ -48,9 +48,9 @@ def _sanitize_header(name, value):
|
|||
def _splitparam(param):
|
||||
# Split header parameters. BAW: this may be too simple. It isn't
|
||||
# strictly RFC 2045 (section 5.1) compliant, but it catches most headers
|
||||
# found in the wild. We may eventually need a full fledged parser
|
||||
# eventually.
|
||||
a, sep, b = param.partition(';')
|
||||
# found in the wild. We may eventually need a full fledged parser.
|
||||
# RDM: we might have a Header here; for now just stringify it.
|
||||
a, sep, b = str(param).partition(';')
|
||||
if not sep:
|
||||
return a.strip(), None
|
||||
return a.strip(), b.strip()
|
||||
|
@ -90,6 +90,8 @@ def _formatparam(param, value=None, quote=True):
|
|||
return param
|
||||
|
||||
def _parseparam(s):
|
||||
# RDM This might be a Header, so for now stringify it.
|
||||
s = ';' + str(s)
|
||||
plist = []
|
||||
while s[:1] == ';':
|
||||
s = s[1:]
|
||||
|
@ -240,7 +242,8 @@ class Message:
|
|||
if i is not None and not isinstance(self._payload, list):
|
||||
raise TypeError('Expected list, got %s' % type(self._payload))
|
||||
payload = self._payload
|
||||
cte = self.get('content-transfer-encoding', '').lower()
|
||||
# cte might be a Header, so for now stringify it.
|
||||
cte = str(self.get('content-transfer-encoding', '')).lower()
|
||||
# payload may be bytes here.
|
||||
if isinstance(payload, str):
|
||||
if _has_surrogates(payload):
|
||||
|
@ -561,7 +564,7 @@ class Message:
|
|||
if value is missing:
|
||||
return failobj
|
||||
params = []
|
||||
for p in _parseparam(';' + value):
|
||||
for p in _parseparam(value):
|
||||
try:
|
||||
name, val = p.split('=', 1)
|
||||
name = name.strip()
|
||||
|
|
|
@ -573,9 +573,18 @@ class TestMessageAPI(TestEmailBase):
|
|||
msg['Dummy'] = 'dummy\nX-Injected-Header: test'
|
||||
self.assertRaises(errors.HeaderParseError, msg.as_string)
|
||||
|
||||
|
||||
# Test the email.encoders module
|
||||
class TestEncoders(unittest.TestCase):
|
||||
|
||||
def test_EncodersEncode_base64(self):
|
||||
with openfile('PyBanner048.gif', 'rb') as fp:
|
||||
bindata = fp.read()
|
||||
mimed = email.mime.image.MIMEImage(bindata)
|
||||
base64ed = mimed.get_payload()
|
||||
# the transfer-encoded body lines should all be <=76 characters
|
||||
lines = base64ed.split('\n')
|
||||
self.assertLessEqual(max([ len(x) for x in lines ]), 76)
|
||||
|
||||
def test_encode_empty_payload(self):
|
||||
eq = self.assertEqual
|
||||
msg = Message()
|
||||
|
@ -1141,10 +1150,11 @@ class TestMIMEApplication(unittest.TestCase):
|
|||
|
||||
def test_body(self):
|
||||
eq = self.assertEqual
|
||||
bytes = b'\xfa\xfb\xfc\xfd\xfe\xff'
|
||||
msg = MIMEApplication(bytes)
|
||||
eq(msg.get_payload(), '+vv8/f7/')
|
||||
eq(msg.get_payload(decode=True), bytes)
|
||||
bytesdata = b'\xfa\xfb\xfc\xfd\xfe\xff'
|
||||
msg = MIMEApplication(bytesdata)
|
||||
# whitespace in the cte encoded block is RFC-irrelevant.
|
||||
eq(msg.get_payload().strip(), '+vv8/f7/')
|
||||
eq(msg.get_payload(decode=True), bytesdata)
|
||||
|
||||
|
||||
|
||||
|
@ -2992,6 +3002,58 @@ class Test8BitBytesHandling(unittest.TestCase):
|
|||
['foo@bar.com',
|
||||
'g\uFFFD\uFFFDst'])
|
||||
|
||||
def test_get_content_type_with_8bit(self):
|
||||
msg = email.message_from_bytes(textwrap.dedent("""\
|
||||
Content-Type: text/pl\xA7in; charset=utf-8
|
||||
""").encode('latin-1'))
|
||||
self.assertEqual(msg.get_content_type(), "text/pl\uFFFDin")
|
||||
self.assertEqual(msg.get_content_maintype(), "text")
|
||||
self.assertEqual(msg.get_content_subtype(), "pl\uFFFDin")
|
||||
|
||||
def test_get_params_with_8bit(self):
|
||||
msg = email.message_from_bytes(
|
||||
'X-Header: foo=\xa7ne; b\xa7r=two; baz=three\n'.encode('latin-1'))
|
||||
self.assertEqual(msg.get_params(header='x-header'),
|
||||
[('foo', '\uFFFDne'), ('b\uFFFDr', 'two'), ('baz', 'three')])
|
||||
self.assertEqual(msg.get_param('Foo', header='x-header'), '\uFFFdne')
|
||||
# XXX: someday you might be able to get 'b\xa7r', for now you can't.
|
||||
self.assertEqual(msg.get_param('b\xa7r', header='x-header'), None)
|
||||
|
||||
def test_get_rfc2231_params_with_8bit(self):
|
||||
msg = email.message_from_bytes(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset=us-ascii;
|
||||
title*=us-ascii'en'This%20is%20not%20f\xa7n"""
|
||||
).encode('latin-1'))
|
||||
self.assertEqual(msg.get_param('title'),
|
||||
('us-ascii', 'en', 'This is not f\uFFFDn'))
|
||||
|
||||
def test_set_rfc2231_params_with_8bit(self):
|
||||
msg = email.message_from_bytes(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset=us-ascii;
|
||||
title*=us-ascii'en'This%20is%20not%20f\xa7n"""
|
||||
).encode('latin-1'))
|
||||
msg.set_param('title', 'test')
|
||||
self.assertEqual(msg.get_param('title'), 'test')
|
||||
|
||||
def test_del_rfc2231_params_with_8bit(self):
|
||||
msg = email.message_from_bytes(textwrap.dedent("""\
|
||||
Content-Type: text/plain; charset=us-ascii;
|
||||
title*=us-ascii'en'This%20is%20not%20f\xa7n"""
|
||||
).encode('latin-1'))
|
||||
msg.del_param('title')
|
||||
self.assertEqual(msg.get_param('title'), None)
|
||||
self.assertEqual(msg.get_content_maintype(), 'text')
|
||||
|
||||
def test_get_payload_with_8bit_cte_header(self):
|
||||
msg = email.message_from_bytes(textwrap.dedent("""\
|
||||
Content-Transfer-Encoding: b\xa7se64
|
||||
Content-Type: text/plain; charset=latin-1
|
||||
|
||||
payload
|
||||
""").encode('latin-1'))
|
||||
self.assertEqual(msg.get_payload(), 'payload\n')
|
||||
self.assertEqual(msg.get_payload(decode=True), b'payload\n')
|
||||
|
||||
non_latin_bin_msg = textwrap.dedent("""\
|
||||
From: foo@bar.com
|
||||
To: báz
|
||||
|
@ -3695,6 +3757,13 @@ A very long line that must get split to something other than at the
|
|||
h = Header('文', charset='shift_jis')
|
||||
self.assertEqual(h.encode(), '=?iso-2022-jp?b?GyRCSjgbKEI=?=')
|
||||
|
||||
def test_flatten_header_with_no_value(self):
|
||||
# Issue 11401 (regression from email 4.x) Note that the space after
|
||||
# the header doesn't reflect the input, but this is also the way
|
||||
# email 4.x behaved. At some point it would be nice to fix that.
|
||||
msg = email.message_from_string("EmptyHeader:")
|
||||
self.assertEqual(str(msg), "EmptyHeader: \n\n")
|
||||
|
||||
|
||||
|
||||
# Test RFC 2231 header parameters (en/de)coding
|
||||
|
|
|
@ -140,7 +140,7 @@ def lru_cache(maxsize=100):
|
|||
tuple=tuple, sorted=sorted, len=len, KeyError=KeyError):
|
||||
|
||||
hits = misses = 0
|
||||
kwd_mark = object() # separates positional and keyword args
|
||||
kwd_mark = (object(),) # separates positional and keyword args
|
||||
lock = Lock() # needed because ordereddicts aren't threadsafe
|
||||
|
||||
if maxsize is None:
|
||||
|
@ -151,7 +151,7 @@ def lru_cache(maxsize=100):
|
|||
nonlocal hits, misses
|
||||
key = args
|
||||
if kwds:
|
||||
key += (kwd_mark,) + tuple(sorted(kwds.items()))
|
||||
key += kwd_mark + tuple(sorted(kwds.items()))
|
||||
try:
|
||||
result = cache[key]
|
||||
hits += 1
|
||||
|
@ -170,7 +170,7 @@ def lru_cache(maxsize=100):
|
|||
nonlocal hits, misses
|
||||
key = args
|
||||
if kwds:
|
||||
key += (kwd_mark,) + tuple(sorted(kwds.items()))
|
||||
key += kwd_mark + tuple(sorted(kwds.items()))
|
||||
try:
|
||||
with lock:
|
||||
result = cache[key]
|
||||
|
|
|
@ -103,15 +103,20 @@ import copy
|
|||
|
||||
# Default error message template
|
||||
DEFAULT_ERROR_MESSAGE = """\
|
||||
<head>
|
||||
<title>Error response</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Error response</h1>
|
||||
<p>Error code %(code)d.
|
||||
<p>Message: %(message)s.
|
||||
<p>Error code explanation: %(code)s = %(explain)s.
|
||||
</body>
|
||||
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
|
||||
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
|
||||
<html xmlns="http://www.w3.org/1999/xhtml">
|
||||
<head>
|
||||
<meta http-equiv="Content-Type" content="text/html;charset=utf-8" />
|
||||
<title>Error response</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Error response</h1>
|
||||
<p>Error code: %(code)d</p>
|
||||
<p>Message: %(message)s.</p>
|
||||
<p>Error code explanation: %(code)s - %(explain)s.</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
DEFAULT_ERROR_CONTENT_TYPE = "text/html;charset=utf-8"
|
||||
|
|
|
@ -54,7 +54,7 @@ class FormatParagraph:
|
|||
# If the block ends in a \n, we dont want the comment
|
||||
# prefix inserted after it. (Im not sure it makes sense to
|
||||
# reformat a comment block that isnt made of complete
|
||||
# lines, but whatever!) Can't think of a clean soltution,
|
||||
# lines, but whatever!) Can't think of a clean solution,
|
||||
# so we hack away
|
||||
block_suffix = ""
|
||||
if not newdata[-1]:
|
||||
|
|
|
@ -18,7 +18,7 @@ window.
|
|||
|
||||
An IDLE extension class is instantiated with a single argument,
|
||||
`editwin', an EditorWindow instance. The extension cannot assume much
|
||||
about this argument, but it is guarateed to have the following instance
|
||||
about this argument, but it is guaranteed to have the following instance
|
||||
variables:
|
||||
|
||||
text a Text instance (a widget)
|
||||
|
|
|
@ -53,8 +53,8 @@ def tkVersionWarning(root):
|
|||
|
||||
def addOpenEventSupport(root, flist):
|
||||
"""
|
||||
This ensures that the application will respont to open AppleEvents, which
|
||||
makes is feaseable to use IDLE as the default application for python files.
|
||||
This ensures that the application will respond to open AppleEvents, which
|
||||
makes is feasible to use IDLE as the default application for python files.
|
||||
"""
|
||||
def doOpenFile(*args):
|
||||
for fn in args:
|
||||
|
|
|
@ -48,7 +48,7 @@ def fixup_parse_tree(cls_node):
|
|||
"""
|
||||
for node in cls_node.children:
|
||||
if node.type == syms.suite:
|
||||
# already in the prefered format, do nothing
|
||||
# already in the preferred format, do nothing
|
||||
return
|
||||
|
||||
# !%@#! oneliners have no suite node, we have to fake one up
|
||||
|
|
|
@ -51,7 +51,7 @@ class Converter(grammar.Grammar):
|
|||
self.finish_off()
|
||||
|
||||
def parse_graminit_h(self, filename):
|
||||
"""Parse the .h file writen by pgen. (Internal)
|
||||
"""Parse the .h file written by pgen. (Internal)
|
||||
|
||||
This file is a sequence of #define statements defining the
|
||||
nonterminals of the grammar as numbers. We build two tables
|
||||
|
@ -82,7 +82,7 @@ class Converter(grammar.Grammar):
|
|||
return True
|
||||
|
||||
def parse_graminit_c(self, filename):
|
||||
"""Parse the .c file writen by pgen. (Internal)
|
||||
"""Parse the .c file written by pgen. (Internal)
|
||||
|
||||
The file looks as follows. The first two lines are always this:
|
||||
|
||||
|
|
|
@ -658,8 +658,8 @@ class WildcardPattern(BasePattern):
|
|||
content: optional sequence of subsequences of patterns;
|
||||
if absent, matches one node;
|
||||
if present, each subsequence is an alternative [*]
|
||||
min: optinal minumum number of times to match, default 0
|
||||
max: optional maximum number of times tro match, default HUGE
|
||||
min: optional minimum number of times to match, default 0
|
||||
max: optional maximum number of times to match, default HUGE
|
||||
name: optional name assigned to this match
|
||||
|
||||
[*] Thus, if content is [[a, b, c], [d, e], [f, g, h]] this is
|
||||
|
|
|
@ -316,7 +316,7 @@ class GrammarTests(unittest.TestCase):
|
|||
### simple_stmt: small_stmt (';' small_stmt)* [';']
|
||||
x = 1; pass; del x
|
||||
def foo():
|
||||
# verify statments that end with semi-colons
|
||||
# verify statements that end with semi-colons
|
||||
x = 1; pass; del x;
|
||||
foo()
|
||||
|
||||
|
|
|
@ -356,7 +356,7 @@ class GrammarTests(unittest.TestCase):
|
|||
### simple_stmt: small_stmt (';' small_stmt)* [';']
|
||||
x = 1; pass; del x
|
||||
def foo():
|
||||
# verify statments that end with semi-colons
|
||||
# verify statements that end with semi-colons
|
||||
x = 1; pass; del x;
|
||||
foo()
|
||||
|
||||
|
|
|
@ -115,8 +115,11 @@ def cpu_count():
|
|||
except (ValueError, KeyError):
|
||||
num = 0
|
||||
elif 'bsd' in sys.platform or sys.platform == 'darwin':
|
||||
comm = '/sbin/sysctl -n hw.ncpu'
|
||||
if sys.platform == 'darwin':
|
||||
comm = '/usr' + comm
|
||||
try:
|
||||
with os.popen('sysctl -n hw.ncpu') as p:
|
||||
with os.popen(comm) as p:
|
||||
num = int(p.read())
|
||||
except ValueError:
|
||||
num = 0
|
||||
|
|
|
@ -405,7 +405,7 @@ def expanduser(path):
|
|||
# - $varname is accepted.
|
||||
# - %varname% is accepted.
|
||||
# - varnames can be made out of letters, digits and the characters '_-'
|
||||
# (though is not verifed in the ${varname} and %varname% cases)
|
||||
# (though is not verified in the ${varname} and %varname% cases)
|
||||
# XXX With COMMAND.COM you can use any characters in a variable name,
|
||||
# XXX except '^|<>='.
|
||||
|
||||
|
|
|
@ -1406,7 +1406,7 @@ opcodes = [
|
|||
proto=0,
|
||||
doc="""Read an object from the memo and push it on the stack.
|
||||
|
||||
The index of the memo object to push is given by the newline-teriminated
|
||||
The index of the memo object to push is given by the newline-terminated
|
||||
decimal string following. BINGET and LONG_BINGET are space-optimized
|
||||
versions.
|
||||
"""),
|
||||
|
|
|
@ -417,7 +417,7 @@ def _syscmd_ver(system='', release='', version='',
|
|||
info = pipe.read()
|
||||
if pipe.close():
|
||||
raise os.error('command failed')
|
||||
# XXX How can I supress shell errors from being written
|
||||
# XXX How can I suppress shell errors from being written
|
||||
# to stderr ?
|
||||
except os.error as why:
|
||||
#print 'Command %s failed: %s' % (cmd,why)
|
||||
|
|
|
@ -168,11 +168,11 @@ def _split_list(s, predicate):
|
|||
def visiblename(name, all=None):
|
||||
"""Decide whether to show documentation on a variable."""
|
||||
# Certain special names are redundant.
|
||||
_hidden_names = ('__builtins__', '__doc__', '__file__', '__path__',
|
||||
if name in {'__builtins__', '__doc__', '__file__', '__path__',
|
||||
'__module__', '__name__', '__slots__', '__package__',
|
||||
'__cached__', '__author__', '__credits__', '__date__',
|
||||
'__version__')
|
||||
if name in _hidden_names: return 0
|
||||
'__version__'}:
|
||||
return 0
|
||||
# Private names are hidden, but special names are displayed.
|
||||
if name.startswith('__') and name.endswith('__'): return 1
|
||||
if all is not None:
|
||||
|
|
|
@ -737,8 +737,8 @@ def unpack_archive(filename, extract_dir=None, format=None):
|
|||
except KeyError:
|
||||
raise ValueError("Unknown unpack format '{0}'".format(format))
|
||||
|
||||
func = format_info[0]
|
||||
func(filename, extract_dir, **dict(format_info[1]))
|
||||
func = format_info[1]
|
||||
func(filename, extract_dir, **dict(format_info[2]))
|
||||
else:
|
||||
# we need to look at the registered unpackers supported extensions
|
||||
format = _find_unpack_format(filename)
|
||||
|
|
|
@ -371,8 +371,9 @@ class TimeoutExpired(SubprocessError):
|
|||
"""This exception is raised when the timeout expires while waiting for a
|
||||
child process.
|
||||
"""
|
||||
def __init__(self, cmd, output=None):
|
||||
def __init__(self, cmd, timeout, output=None):
|
||||
self.cmd = cmd
|
||||
self.timeout = timeout
|
||||
self.output = output
|
||||
|
||||
def __str__(self):
|
||||
|
@ -431,7 +432,7 @@ else:
|
|||
return fds
|
||||
|
||||
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
|
||||
"getoutput", "check_output", "CalledProcessError"]
|
||||
"getoutput", "check_output", "CalledProcessError", "DEVNULL"]
|
||||
|
||||
if mswindows:
|
||||
from _subprocess import CREATE_NEW_CONSOLE, CREATE_NEW_PROCESS_GROUP
|
||||
|
@ -456,6 +457,7 @@ def _cleanup():
|
|||
|
||||
PIPE = -1
|
||||
STDOUT = -2
|
||||
DEVNULL = -3
|
||||
|
||||
|
||||
def _eintr_retry_call(func, *args):
|
||||
|
@ -532,7 +534,7 @@ def check_output(*popenargs, timeout=None, **kwargs):
|
|||
except TimeoutExpired:
|
||||
process.kill()
|
||||
output, unused_err = process.communicate()
|
||||
raise TimeoutExpired(process.args, output=output)
|
||||
raise TimeoutExpired(process.args, timeout, output=output)
|
||||
retcode = process.poll()
|
||||
if retcode:
|
||||
raise CalledProcessError(retcode, process.args, output=output)
|
||||
|
@ -800,6 +802,10 @@ class Popen(object):
|
|||
# Child is still running, keep us alive until we can wait on it.
|
||||
_active.append(self)
|
||||
|
||||
def _get_devnull(self):
|
||||
if not hasattr(self, '_devnull'):
|
||||
self._devnull = os.open(os.devnull, os.O_RDWR)
|
||||
return self._devnull
|
||||
|
||||
def communicate(self, input=None, timeout=None):
|
||||
"""Interact with process: Send data to stdin. Read data from
|
||||
|
@ -839,7 +845,7 @@ class Popen(object):
|
|||
return (stdout, stderr)
|
||||
|
||||
try:
|
||||
stdout, stderr = self._communicate(input, endtime)
|
||||
stdout, stderr = self._communicate(input, endtime, timeout)
|
||||
finally:
|
||||
self._communication_started = True
|
||||
|
||||
|
@ -860,12 +866,12 @@ class Popen(object):
|
|||
return endtime - time.time()
|
||||
|
||||
|
||||
def _check_timeout(self, endtime):
|
||||
def _check_timeout(self, endtime, orig_timeout):
|
||||
"""Convenience for checking if a timeout has expired."""
|
||||
if endtime is None:
|
||||
return
|
||||
if time.time() > endtime:
|
||||
raise TimeoutExpired(self.args)
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
|
||||
|
||||
if mswindows:
|
||||
|
@ -889,6 +895,8 @@ class Popen(object):
|
|||
p2cread, _ = _subprocess.CreatePipe(None, 0)
|
||||
elif stdin == PIPE:
|
||||
p2cread, p2cwrite = _subprocess.CreatePipe(None, 0)
|
||||
elif stdin == DEVNULL:
|
||||
p2cread = msvcrt.get_osfhandle(self._get_devnull())
|
||||
elif isinstance(stdin, int):
|
||||
p2cread = msvcrt.get_osfhandle(stdin)
|
||||
else:
|
||||
|
@ -902,6 +910,8 @@ class Popen(object):
|
|||
_, c2pwrite = _subprocess.CreatePipe(None, 0)
|
||||
elif stdout == PIPE:
|
||||
c2pread, c2pwrite = _subprocess.CreatePipe(None, 0)
|
||||
elif stdout == DEVNULL:
|
||||
c2pwrite = msvcrt.get_osfhandle(self._get_devnull())
|
||||
elif isinstance(stdout, int):
|
||||
c2pwrite = msvcrt.get_osfhandle(stdout)
|
||||
else:
|
||||
|
@ -917,6 +927,8 @@ class Popen(object):
|
|||
errread, errwrite = _subprocess.CreatePipe(None, 0)
|
||||
elif stderr == STDOUT:
|
||||
errwrite = c2pwrite
|
||||
elif stderr == DEVNULL:
|
||||
errwrite = msvcrt.get_osfhandle(self._get_devnull())
|
||||
elif isinstance(stderr, int):
|
||||
errwrite = msvcrt.get_osfhandle(stderr)
|
||||
else:
|
||||
|
@ -1010,7 +1022,7 @@ class Popen(object):
|
|||
except pywintypes.error as e:
|
||||
# Translate pywintypes.error to WindowsError, which is
|
||||
# a subclass of OSError. FIXME: We should really
|
||||
# translate errno using _sys_errlist (or simliar), but
|
||||
# translate errno using _sys_errlist (or similar), but
|
||||
# how can this be done from Python?
|
||||
raise WindowsError(*e.args)
|
||||
finally:
|
||||
|
@ -1026,6 +1038,8 @@ class Popen(object):
|
|||
c2pwrite.Close()
|
||||
if errwrite != -1:
|
||||
errwrite.Close()
|
||||
if hasattr(self, '_devnull'):
|
||||
os.close(self._devnull)
|
||||
|
||||
# Retain the process handle, but close the thread handle
|
||||
self._child_created = True
|
||||
|
@ -1050,9 +1064,11 @@ class Popen(object):
|
|||
return self.returncode
|
||||
|
||||
|
||||
def wait(self, timeout=None):
|
||||
def wait(self, timeout=None, endtime=None):
|
||||
"""Wait for child process to terminate. Returns returncode
|
||||
attribute."""
|
||||
if endtime is not None:
|
||||
timeout = self._remaining_time(endtime)
|
||||
if timeout is None:
|
||||
timeout = _subprocess.INFINITE
|
||||
else:
|
||||
|
@ -1060,7 +1076,7 @@ class Popen(object):
|
|||
if self.returncode is None:
|
||||
result = _subprocess.WaitForSingleObject(self._handle, timeout)
|
||||
if result == _subprocess.WAIT_TIMEOUT:
|
||||
raise TimeoutExpired(self.args)
|
||||
raise TimeoutExpired(self.args, timeout)
|
||||
self.returncode = _subprocess.GetExitCodeProcess(self._handle)
|
||||
return self.returncode
|
||||
|
||||
|
@ -1070,7 +1086,7 @@ class Popen(object):
|
|||
fh.close()
|
||||
|
||||
|
||||
def _communicate(self, input, endtime):
|
||||
def _communicate(self, input, endtime, orig_timeout):
|
||||
# Start reader threads feeding into a list hanging off of this
|
||||
# object, unless they've already been started.
|
||||
if self.stdout and not hasattr(self, "_stdout_buff"):
|
||||
|
@ -1159,6 +1175,8 @@ class Popen(object):
|
|||
pass
|
||||
elif stdin == PIPE:
|
||||
p2cread, p2cwrite = _create_pipe()
|
||||
elif stdin == DEVNULL:
|
||||
p2cread = self._get_devnull()
|
||||
elif isinstance(stdin, int):
|
||||
p2cread = stdin
|
||||
else:
|
||||
|
@ -1169,6 +1187,8 @@ class Popen(object):
|
|||
pass
|
||||
elif stdout == PIPE:
|
||||
c2pread, c2pwrite = _create_pipe()
|
||||
elif stdout == DEVNULL:
|
||||
c2pwrite = self._get_devnull()
|
||||
elif isinstance(stdout, int):
|
||||
c2pwrite = stdout
|
||||
else:
|
||||
|
@ -1181,6 +1201,8 @@ class Popen(object):
|
|||
errread, errwrite = _create_pipe()
|
||||
elif stderr == STDOUT:
|
||||
errwrite = c2pwrite
|
||||
elif stderr == DEVNULL:
|
||||
errwrite = self._get_devnull()
|
||||
elif isinstance(stderr, int):
|
||||
errwrite = stderr
|
||||
else:
|
||||
|
@ -1374,6 +1396,8 @@ class Popen(object):
|
|||
os.close(c2pwrite)
|
||||
if errwrite != -1 and errread != -1:
|
||||
os.close(errwrite)
|
||||
if hasattr(self, '_devnull'):
|
||||
os.close(self._devnull)
|
||||
|
||||
# Wait for exec to fail or succeed; possibly raising an
|
||||
# exception (limited in size)
|
||||
|
@ -1468,13 +1492,18 @@ class Popen(object):
|
|||
def wait(self, timeout=None, endtime=None):
|
||||
"""Wait for child process to terminate. Returns returncode
|
||||
attribute."""
|
||||
# If timeout was passed but not endtime, compute endtime in terms of
|
||||
# timeout.
|
||||
if endtime is None and timeout is not None:
|
||||
endtime = time.time() + timeout
|
||||
if self.returncode is not None:
|
||||
return self.returncode
|
||||
elif endtime is not None:
|
||||
|
||||
# endtime is preferred to timeout. timeout is only used for
|
||||
# printing.
|
||||
if endtime is not None or timeout is not None:
|
||||
if endtime is None:
|
||||
endtime = time.time() + timeout
|
||||
elif timeout is None:
|
||||
timeout = self._remaining_time(endtime)
|
||||
|
||||
if endtime is not None:
|
||||
# Enter a busy loop if we have a timeout. This busy loop was
|
||||
# cribbed from Lib/threading.py in Thread.wait() at r71065.
|
||||
delay = 0.0005 # 500 us -> initial delay of 1 ms
|
||||
|
@ -1486,7 +1515,7 @@ class Popen(object):
|
|||
break
|
||||
remaining = self._remaining_time(endtime)
|
||||
if remaining <= 0:
|
||||
raise TimeoutExpired(self.args)
|
||||
raise TimeoutExpired(self.args, timeout)
|
||||
delay = min(delay * 2, remaining, .05)
|
||||
time.sleep(delay)
|
||||
elif self.returncode is None:
|
||||
|
@ -1495,7 +1524,7 @@ class Popen(object):
|
|||
return self.returncode
|
||||
|
||||
|
||||
def _communicate(self, input, endtime):
|
||||
def _communicate(self, input, endtime, orig_timeout):
|
||||
if self.stdin and not self._communication_started:
|
||||
# Flush stdio buffer. This might block, if the user has
|
||||
# been writing to .stdin in an uncontrolled fashion.
|
||||
|
@ -1504,9 +1533,11 @@ class Popen(object):
|
|||
self.stdin.close()
|
||||
|
||||
if _has_poll:
|
||||
stdout, stderr = self._communicate_with_poll(input, endtime)
|
||||
stdout, stderr = self._communicate_with_poll(input, endtime,
|
||||
orig_timeout)
|
||||
else:
|
||||
stdout, stderr = self._communicate_with_select(input, endtime)
|
||||
stdout, stderr = self._communicate_with_select(input, endtime,
|
||||
orig_timeout)
|
||||
|
||||
self.wait(timeout=self._remaining_time(endtime))
|
||||
|
||||
|
@ -1529,7 +1560,7 @@ class Popen(object):
|
|||
return (stdout, stderr)
|
||||
|
||||
|
||||
def _communicate_with_poll(self, input, endtime):
|
||||
def _communicate_with_poll(self, input, endtime, orig_timeout):
|
||||
stdout = None # Return
|
||||
stderr = None # Return
|
||||
|
||||
|
@ -1580,7 +1611,7 @@ class Popen(object):
|
|||
if e.args[0] == errno.EINTR:
|
||||
continue
|
||||
raise
|
||||
self._check_timeout(endtime)
|
||||
self._check_timeout(endtime, orig_timeout)
|
||||
|
||||
# XXX Rewrite these to use non-blocking I/O on the
|
||||
# file objects; they are no longer using C stdio!
|
||||
|
@ -1604,7 +1635,7 @@ class Popen(object):
|
|||
return (stdout, stderr)
|
||||
|
||||
|
||||
def _communicate_with_select(self, input, endtime):
|
||||
def _communicate_with_select(self, input, endtime, orig_timeout):
|
||||
if not self._communication_started:
|
||||
self._read_set = []
|
||||
self._write_set = []
|
||||
|
@ -1646,9 +1677,9 @@ class Popen(object):
|
|||
# According to the docs, returning three empty lists indicates
|
||||
# that the timeout expired.
|
||||
if not (rlist or wlist or xlist):
|
||||
raise TimeoutExpired(self.args)
|
||||
raise TimeoutExpired(self.args, orig_timeout)
|
||||
# We also check what time it is ourselves for good measure.
|
||||
self._check_timeout(endtime)
|
||||
self._check_timeout(endtime, orig_timeout)
|
||||
|
||||
# XXX Rewrite these to use non-blocking I/O on the
|
||||
# file objects; they are no longer using C stdio!
|
||||
|
|
|
@ -14,3 +14,7 @@ note if the cause is system or environment dependent and what the variables are.
|
|||
Once the crash is fixed, the test case should be moved into an appropriate test
|
||||
(even if it was originally from the test suite). This ensures the regression
|
||||
doesn't happen again. And if it does, it should be easier to track down.
|
||||
|
||||
Also see Lib/test_crashers.py which exercises the crashers in this directory.
|
||||
In particular, make sure to add any new infinite loop crashers to the black
|
||||
list so it doesn't try to run them.
|
||||
|
|
|
@ -9,5 +9,5 @@ Recorded on the tracker as http://bugs.python.org/issue11383
|
|||
# e.g. '1*'*10**5+'1' will die in compiler_visit_expr
|
||||
|
||||
# The exact limit to destroy the stack will vary by platform
|
||||
# but 100k should do the trick most places
|
||||
compile('()'*10**5, '?', 'exec')
|
||||
# but 10M should do the trick even with huge stack allocations
|
||||
compile('()'*10**7, '?', 'exec')
|
||||
|
|
|
@ -3414,7 +3414,7 @@ class TestTimezoneConversions(unittest.TestCase):
|
|||
self.assertEqual(dt, there_and_back)
|
||||
|
||||
# Because we have a redundant spelling when DST begins, there is
|
||||
# (unforunately) an hour when DST ends that can't be spelled at all in
|
||||
# (unfortunately) an hour when DST ends that can't be spelled at all in
|
||||
# local time. When DST ends, the clock jumps from 1:59 back to 1:00
|
||||
# again. The hour 1:MM DST has no spelling then: 1:MM is taken to be
|
||||
# standard time. 1:MM DST == 0:MM EST, but 0:MM is taken to be
|
||||
|
|
|
@ -19,7 +19,7 @@ class C (B):
|
|||
|
||||
# XXX: This causes test_pyclbr.py to fail, but only because the
|
||||
# introspection-based is_method() code in the test can't
|
||||
# distinguish between this and a geniune method function like m().
|
||||
# distinguish between this and a genuine method function like m().
|
||||
# The pyclbr.py module gets this right as it parses the text.
|
||||
#
|
||||
#f = f
|
||||
|
|
|
@ -15,10 +15,12 @@ class BinHexTestCase(unittest.TestCase):
|
|||
def setUp(self):
|
||||
self.fname1 = support.TESTFN + "1"
|
||||
self.fname2 = support.TESTFN + "2"
|
||||
self.fname3 = support.TESTFN + "very_long_filename__very_long_filename__very_long_filename__very_long_filename__"
|
||||
|
||||
def tearDown(self):
|
||||
support.unlink(self.fname1)
|
||||
support.unlink(self.fname2)
|
||||
support.unlink(self.fname3)
|
||||
|
||||
DATA = b'Jack is my hero'
|
||||
|
||||
|
@ -37,6 +39,15 @@ class BinHexTestCase(unittest.TestCase):
|
|||
|
||||
self.assertEqual(self.DATA, finish)
|
||||
|
||||
def test_binhex_error_on_long_filename(self):
|
||||
"""
|
||||
The testcase fails if no exception is raised when a filename parameter provided to binhex.binhex()
|
||||
is too long, or if the exception raised in binhex.binhex() is not an instance of binhex.Error.
|
||||
"""
|
||||
f3 = open(self.fname3, 'wb')
|
||||
f3.close()
|
||||
|
||||
self.assertRaises(binhex.Error, binhex.binhex, self.fname3, self.fname2)
|
||||
|
||||
def test_main():
|
||||
support.run_unittest(BinHexTestCase)
|
||||
|
|
|
@ -127,7 +127,7 @@ class TestPendingCalls(unittest.TestCase):
|
|||
context.event.set()
|
||||
|
||||
def test_pendingcalls_non_threaded(self):
|
||||
#again, just using the main thread, likely they will all be dispathced at
|
||||
#again, just using the main thread, likely they will all be dispatched at
|
||||
#once. It is ok to ask for too many, because we loop until we find a slot.
|
||||
#the loop can be interrupted to dispatch.
|
||||
#there are only 32 dispatch slots, so we go for twice that!
|
||||
|
|
37
Lib/test/test_crashers.py
Normal file
37
Lib/test/test_crashers.py
Normal file
|
@ -0,0 +1,37 @@
|
|||
# Tests that the crashers in the Lib/test/crashers directory actually
|
||||
# do crash the interpreter as expected
|
||||
#
|
||||
# If a crasher is fixed, it should be moved elsewhere in the test suite to
|
||||
# ensure it continues to work correctly.
|
||||
|
||||
import unittest
|
||||
import glob
|
||||
import os.path
|
||||
import test.support
|
||||
from test.script_helper import assert_python_failure
|
||||
|
||||
CRASHER_DIR = os.path.join(os.path.dirname(__file__), "crashers")
|
||||
CRASHER_FILES = os.path.join(CRASHER_DIR, "*.py")
|
||||
|
||||
infinite_loops = ["infinite_loop_re.py", "nasty_eq_vs_dict.py"]
|
||||
|
||||
class CrasherTest(unittest.TestCase):
|
||||
|
||||
@test.support.cpython_only
|
||||
def test_crashers_crash(self):
|
||||
for fname in glob.glob(CRASHER_FILES):
|
||||
if os.path.basename(fname) in infinite_loops:
|
||||
continue
|
||||
# Some "crashers" only trigger an exception rather than a
|
||||
# segfault. Consider that an acceptable outcome.
|
||||
if test.support.verbose:
|
||||
print("Checking crasher:", fname)
|
||||
assert_python_failure(fname)
|
||||
|
||||
|
||||
def test_main():
|
||||
test.support.run_unittest(CrasherTest)
|
||||
test.support.reap_children()
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
|
@ -228,7 +228,7 @@ class DecimalTest(unittest.TestCase):
|
|||
try:
|
||||
t = self.eval_line(line)
|
||||
except DecimalException as exception:
|
||||
#Exception raised where there shoudn't have been one.
|
||||
#Exception raised where there shouldn't have been one.
|
||||
self.fail('Exception "'+exception.__class__.__name__ + '" raised on line '+line)
|
||||
|
||||
return
|
||||
|
|
|
@ -3967,7 +3967,7 @@ order (MRO) for bases """
|
|||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
self.fail("Carlo Verre __setattr__ suceeded!")
|
||||
self.fail("Carlo Verre __setattr__ succeeded!")
|
||||
try:
|
||||
object.__delattr__(str, "lower")
|
||||
except TypeError:
|
||||
|
|
|
@ -1297,7 +1297,7 @@ marking, as well as interline differences.
|
|||
? + ++ ^
|
||||
TestResults(failed=1, attempted=1)
|
||||
|
||||
The REPORT_ONLY_FIRST_FAILURE supresses result output after the first
|
||||
The REPORT_ONLY_FIRST_FAILURE suppresses result output after the first
|
||||
failing example:
|
||||
|
||||
>>> def f(x):
|
||||
|
@ -1327,7 +1327,7 @@ failing example:
|
|||
2
|
||||
TestResults(failed=3, attempted=5)
|
||||
|
||||
However, output from `report_start` is not supressed:
|
||||
However, output from `report_start` is not suppressed:
|
||||
|
||||
>>> doctest.DocTestRunner(verbose=True, optionflags=flags).run(test)
|
||||
... # doctest: +ELLIPSIS
|
||||
|
@ -2278,7 +2278,7 @@ We don't want `-v` in sys.argv for these tests.
|
|||
>>> doctest.master = None # Reset master.
|
||||
|
||||
(Note: we'll be clearing doctest.master after each call to
|
||||
`doctest.testfile`, to supress warnings about multiple tests with the
|
||||
`doctest.testfile`, to suppress warnings about multiple tests with the
|
||||
same name.)
|
||||
|
||||
Globals may be specified with the `globs` and `extraglobs` parameters:
|
||||
|
@ -2314,7 +2314,7 @@ optional `module_relative` parameter:
|
|||
TestResults(failed=0, attempted=2)
|
||||
>>> doctest.master = None # Reset master.
|
||||
|
||||
Verbosity can be increased with the optional `verbose` paremter:
|
||||
Verbosity can be increased with the optional `verbose` parameter:
|
||||
|
||||
>>> doctest.testfile('test_doctest.txt', globs=globs, verbose=True)
|
||||
Trying:
|
||||
|
@ -2351,7 +2351,7 @@ parameter:
|
|||
TestResults(failed=1, attempted=2)
|
||||
>>> doctest.master = None # Reset master.
|
||||
|
||||
The summary report may be supressed with the optional `report`
|
||||
The summary report may be suppressed with the optional `report`
|
||||
parameter:
|
||||
|
||||
>>> doctest.testfile('test_doctest.txt', report=False)
|
||||
|
|
|
@ -228,7 +228,7 @@ Another helper function
|
|||
>>> Foo.method(1, *[2, 3])
|
||||
5
|
||||
|
||||
A PyCFunction that takes only positional parameters shoud allow an
|
||||
A PyCFunction that takes only positional parameters should allow an
|
||||
empty keyword dictionary to pass without a complaint, but raise a
|
||||
TypeError if te dictionary is not empty
|
||||
|
||||
|
|
|
@ -8,11 +8,15 @@ import re
|
|||
import fileinput
|
||||
import collections
|
||||
import gzip
|
||||
import bz2
|
||||
import types
|
||||
import codecs
|
||||
import unittest
|
||||
|
||||
try:
|
||||
import bz2
|
||||
except ImportError:
|
||||
bz2 = None
|
||||
|
||||
from io import StringIO
|
||||
from fileinput import FileInput, hook_encoded
|
||||
|
||||
|
@ -765,6 +769,7 @@ class Test_hook_compressed(unittest.TestCase):
|
|||
self.assertEqual(self.fake_open.invocation_count, 1)
|
||||
self.assertEqual(self.fake_open.last_invocation, (("test.gz", 3), {}))
|
||||
|
||||
@unittest.skipUnless(bz2, "Requires bz2")
|
||||
def test_bz2_ext_fake(self):
|
||||
original_open = bz2.BZ2File
|
||||
bz2.BZ2File = self.fake_open
|
||||
|
|
|
@ -67,7 +67,7 @@ class GeneralFloatCases(unittest.TestCase):
|
|||
def test_float_with_comma(self):
|
||||
# set locale to something that doesn't use '.' for the decimal point
|
||||
# float must not accept the locale specific decimal point but
|
||||
# it still has to accept the normal python syntac
|
||||
# it still has to accept the normal python syntax
|
||||
import locale
|
||||
if not locale.localeconv()['decimal_point'] == ',':
|
||||
return
|
||||
|
@ -189,7 +189,7 @@ class GeneralFloatCases(unittest.TestCase):
|
|||
def assertEqualAndEqualSign(self, a, b):
|
||||
# fail unless a == b and a and b have the same sign bit;
|
||||
# the only difference from assertEqual is that this test
|
||||
# distingishes -0.0 and 0.0.
|
||||
# distinguishes -0.0 and 0.0.
|
||||
self.assertEqual((a, copysign(1.0, a)), (b, copysign(1.0, b)))
|
||||
|
||||
@support.requires_IEEE_754
|
||||
|
|
|
@ -350,7 +350,7 @@ class GrammarTests(unittest.TestCase):
|
|||
### simple_stmt: small_stmt (';' small_stmt)* [';']
|
||||
x = 1; pass; del x
|
||||
def foo():
|
||||
# verify statments that end with semi-colons
|
||||
# verify statements that end with semi-colons
|
||||
x = 1; pass; del x;
|
||||
foo()
|
||||
|
||||
|
|
|
@ -462,7 +462,7 @@ class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
|
|||
return False
|
||||
|
||||
class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
|
||||
"""Test the functionaility of the BaseHTTPServer.
|
||||
"""Test the functionality of the BaseHTTPServer.
|
||||
|
||||
Test the support for the Expect 100-continue header.
|
||||
"""
|
||||
|
|
|
@ -283,8 +283,6 @@ class ImportTests(unittest.TestCase):
|
|||
self.skipTest('path is not encodable to {}'.format(encoding))
|
||||
with self.assertRaises(ImportError) as c:
|
||||
__import__(path)
|
||||
self.assertEqual("Import by filename is not supported.",
|
||||
c.exception.args[0])
|
||||
|
||||
def test_import_in_del_does_not_crash(self):
|
||||
# Issue 4236
|
||||
|
|
|
@ -20,11 +20,11 @@ This is the case for tuples, range objects, and itertools.repeat().
|
|||
|
||||
Some containers become temporarily immutable during iteration. This includes
|
||||
dicts, sets, and collections.deque. Their implementation is equally simple
|
||||
though they need to permantently set their length to zero whenever there is
|
||||
though they need to permanently set their length to zero whenever there is
|
||||
an attempt to iterate after a length mutation.
|
||||
|
||||
The situation slightly more involved whenever an object allows length mutation
|
||||
during iteration. Lists and sequence iterators are dynanamically updatable.
|
||||
during iteration. Lists and sequence iterators are dynamically updatable.
|
||||
So, if a list is extended during iteration, the iterator will continue through
|
||||
the new items. If it shrinks to a point before the most recent iteration,
|
||||
then no further items are available and the length is reported at zero.
|
||||
|
|
|
@ -1526,7 +1526,7 @@ Samuele
|
|||
... return chain(iterable, repeat(None))
|
||||
|
||||
>>> def ncycles(iterable, n):
|
||||
... "Returns the seqeuence elements n times"
|
||||
... "Returns the sequence elements n times"
|
||||
... return chain(*repeat(iterable, n))
|
||||
|
||||
>>> def dotproduct(vec1, vec2):
|
||||
|
|
|
@ -194,7 +194,7 @@ class BugsTestCase(unittest.TestCase):
|
|||
# >>> type(loads(dumps(Int())))
|
||||
# <type 'int'>
|
||||
for typ in (int, float, complex, tuple, list, dict, set, frozenset):
|
||||
# Note: str sublclasses are not tested because they get handled
|
||||
# Note: str subclasses are not tested because they get handled
|
||||
# by marshal's routines for objects supporting the buffer API.
|
||||
subtyp = type('subtyp', (typ,), {})
|
||||
self.assertRaises(ValueError, marshal.dumps, subtyp())
|
||||
|
|
|
@ -820,7 +820,7 @@ class MathTests(unittest.TestCase):
|
|||
|
||||
# the following tests have been commented out since they don't
|
||||
# really belong here: the implementation of ** for floats is
|
||||
# independent of the implemention of math.pow
|
||||
# independent of the implementation of math.pow
|
||||
#self.assertEqual(1**NAN, 1)
|
||||
#self.assertEqual(1**INF, 1)
|
||||
#self.assertEqual(1**NINF, 1)
|
||||
|
|
|
@ -594,7 +594,7 @@ class MmapTests(unittest.TestCase):
|
|||
m2.close()
|
||||
m1.close()
|
||||
|
||||
# Test differnt tag
|
||||
# Test different tag
|
||||
m1 = mmap.mmap(-1, len(data1), tagname="foo")
|
||||
m1[:] = data1
|
||||
m2 = mmap.mmap(-1, len(data2), tagname="boo")
|
||||
|
|
|
@ -795,7 +795,7 @@ class _TestEvent(BaseTestCase):
|
|||
event = self.Event()
|
||||
wait = TimingWrapper(event.wait)
|
||||
|
||||
# Removed temporaily, due to API shear, this does not
|
||||
# Removed temporarily, due to API shear, this does not
|
||||
# work with threading._Event objects. is_set == isSet
|
||||
self.assertEqual(event.is_set(), False)
|
||||
|
||||
|
@ -1765,7 +1765,7 @@ class _TestFinalize(BaseTestCase):
|
|||
|
||||
util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
|
||||
|
||||
# call mutliprocessing's cleanup function then exit process without
|
||||
# call multiprocessing's cleanup function then exit process without
|
||||
# garbage collecting locals
|
||||
util._exit_function()
|
||||
conn.close()
|
||||
|
|
|
@ -56,7 +56,7 @@ class TestPkg(unittest.TestCase):
|
|||
if self.root: # Only clean if the test was actually run
|
||||
cleanout(self.root)
|
||||
|
||||
# delete all modules concerning the tested hiearchy
|
||||
# delete all modules concerning the tested hierarchy
|
||||
if self.pkgname:
|
||||
modules = [name for name in sys.modules
|
||||
if self.pkgname in name.split('.')]
|
||||
|
|
|
@ -37,7 +37,7 @@ class PosixTester(unittest.TestCase):
|
|||
NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
|
||||
"times", "getloadavg",
|
||||
"getegid", "geteuid", "getgid", "getgroups",
|
||||
"getpid", "getpgrp", "getppid", "getuid",
|
||||
"getpid", "getpgrp", "getppid", "getuid", "sync",
|
||||
]
|
||||
|
||||
for name in NO_ARG_FUNCTIONS:
|
||||
|
@ -132,6 +132,156 @@ class PosixTester(unittest.TestCase):
|
|||
finally:
|
||||
fp.close()
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
|
||||
def test_truncate(self):
|
||||
with open(support.TESTFN, 'w') as fp:
|
||||
fp.write('test')
|
||||
fp.flush()
|
||||
posix.truncate(support.TESTFN, 0)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()")
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
|
||||
@unittest.skipUnless(hasattr(os, 'wait'), "test needs os.wait()")
|
||||
def test_fexecve(self):
|
||||
fp = os.open(sys.executable, os.O_RDONLY)
|
||||
try:
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
os.chdir(os.path.split(sys.executable)[0])
|
||||
posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ)
|
||||
else:
|
||||
self.assertEqual(os.wait(), (pid, 0))
|
||||
finally:
|
||||
os.close(fp)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
|
||||
@unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
|
||||
def test_waitid(self):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
os.chdir(os.path.split(sys.executable)[0])
|
||||
posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
|
||||
else:
|
||||
res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
|
||||
self.assertEqual(pid, res.si_pid)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
|
||||
def test_lockf(self):
|
||||
fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
|
||||
try:
|
||||
os.write(fd, b'test')
|
||||
os.lseek(fd, 0, os.SEEK_SET)
|
||||
posix.lockf(fd, posix.F_LOCK, 4)
|
||||
# section is locked
|
||||
posix.lockf(fd, posix.F_ULOCK, 4)
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
|
||||
def test_pread(self):
|
||||
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
os.write(fd, b'test')
|
||||
os.lseek(fd, 0, os.SEEK_SET)
|
||||
self.assertEqual(b'es', posix.pread(fd, 2, 1))
|
||||
# the first pread() shoudn't disturb the file offset
|
||||
self.assertEqual(b'te', posix.read(fd, 2))
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
|
||||
def test_pwrite(self):
|
||||
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
os.write(fd, b'test')
|
||||
os.lseek(fd, 0, os.SEEK_SET)
|
||||
posix.pwrite(fd, b'xx', 1)
|
||||
self.assertEqual(b'txxt', posix.read(fd, 4))
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
|
||||
"test needs posix.posix_fallocate()")
|
||||
def test_posix_fallocate(self):
|
||||
fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
|
||||
try:
|
||||
posix.posix_fallocate(fd, 0, 10)
|
||||
except OSError as inst:
|
||||
# issue10812, ZFS doesn't appear to support posix_fallocate,
|
||||
# so skip Solaris-based since they are likely to have ZFS.
|
||||
if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
|
||||
raise
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
|
||||
"test needs posix.posix_fadvise()")
|
||||
def test_posix_fadvise(self):
|
||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
||||
try:
|
||||
posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()")
|
||||
def test_futimes(self):
|
||||
now = time.time()
|
||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
||||
try:
|
||||
posix.futimes(fd, None)
|
||||
self.assertRaises(TypeError, posix.futimes, fd, (None, None))
|
||||
self.assertRaises(TypeError, posix.futimes, fd, (now, None))
|
||||
self.assertRaises(TypeError, posix.futimes, fd, (None, now))
|
||||
posix.futimes(fd, (int(now), int(now)))
|
||||
posix.futimes(fd, (now, now))
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()")
|
||||
def test_lutimes(self):
|
||||
now = time.time()
|
||||
posix.lutimes(support.TESTFN, None)
|
||||
self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None))
|
||||
self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None))
|
||||
self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now))
|
||||
posix.lutimes(support.TESTFN, (int(now), int(now)))
|
||||
posix.lutimes(support.TESTFN, (now, now))
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()")
|
||||
def test_futimens(self):
|
||||
now = time.time()
|
||||
fd = os.open(support.TESTFN, os.O_RDONLY)
|
||||
try:
|
||||
self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None))
|
||||
self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None)
|
||||
self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0))
|
||||
posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)),
|
||||
(int(now), int((now - int(now)) * 1e9)))
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
|
||||
def test_writev(self):
|
||||
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
os.writev(fd, (b'test1', b'tt2', b't3'))
|
||||
os.lseek(fd, 0, os.SEEK_SET)
|
||||
self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
@unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
|
||||
def test_readv(self):
|
||||
fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
|
||||
try:
|
||||
os.write(fd, b'test1tt2t3')
|
||||
os.lseek(fd, 0, os.SEEK_SET)
|
||||
buf = [bytearray(i) for i in [5, 3, 2]]
|
||||
self.assertEqual(posix.readv(fd, buf), 10)
|
||||
self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
|
||||
finally:
|
||||
os.close(fd)
|
||||
|
||||
def test_dup(self):
|
||||
if hasattr(posix, 'dup'):
|
||||
fp = open(support.TESTFN)
|
||||
|
|
|
@ -6,6 +6,11 @@ import os
|
|||
import sys
|
||||
from posixpath import realpath, abspath, dirname, basename
|
||||
|
||||
try:
|
||||
import posix
|
||||
except ImportError:
|
||||
posix = None
|
||||
|
||||
# An absolute path to a temporary filename for testing. We can't rely on TESTFN
|
||||
# being an absolute path, so we need this.
|
||||
|
||||
|
@ -150,6 +155,7 @@ class PosixPathTest(unittest.TestCase):
|
|||
|
||||
def test_islink(self):
|
||||
self.assertIs(posixpath.islink(support.TESTFN + "1"), False)
|
||||
self.assertIs(posixpath.lexists(support.TESTFN + "2"), False)
|
||||
f = open(support.TESTFN + "1", "wb")
|
||||
try:
|
||||
f.write(b"foo")
|
||||
|
@ -225,6 +231,44 @@ class PosixPathTest(unittest.TestCase):
|
|||
|
||||
def test_ismount(self):
|
||||
self.assertIs(posixpath.ismount("/"), True)
|
||||
self.assertIs(posixpath.ismount(b"/"), True)
|
||||
|
||||
def test_ismount_non_existent(self):
|
||||
# Non-existent mountpoint.
|
||||
self.assertIs(posixpath.ismount(ABSTFN), False)
|
||||
try:
|
||||
os.mkdir(ABSTFN)
|
||||
self.assertIs(posixpath.ismount(ABSTFN), False)
|
||||
finally:
|
||||
safe_rmdir(ABSTFN)
|
||||
|
||||
@unittest.skipUnless(support.can_symlink(),
|
||||
"Test requires symlink support")
|
||||
def test_ismount_symlinks(self):
|
||||
# Symlinks are never mountpoints.
|
||||
try:
|
||||
os.symlink("/", ABSTFN)
|
||||
self.assertIs(posixpath.ismount(ABSTFN), False)
|
||||
finally:
|
||||
os.unlink(ABSTFN)
|
||||
|
||||
@unittest.skipIf(posix is None, "Test requires posix module")
|
||||
def test_ismount_different_device(self):
|
||||
# Simulate the path being on a different device from its parent by
|
||||
# mocking out st_dev.
|
||||
save_lstat = os.lstat
|
||||
def fake_lstat(path):
|
||||
st_ino = 0
|
||||
st_dev = 0
|
||||
if path == ABSTFN:
|
||||
st_dev = 1
|
||||
st_ino = 1
|
||||
return posix.stat_result((0, st_ino, st_dev, 0, 0, 0, 0, 0, 0, 0))
|
||||
try:
|
||||
os.lstat = fake_lstat
|
||||
self.assertIs(posixpath.ismount(ABSTFN), True)
|
||||
finally:
|
||||
os.lstat = save_lstat
|
||||
|
||||
def test_expanduser(self):
|
||||
self.assertEqual(posixpath.expanduser("foo"), "foo")
|
||||
|
@ -254,6 +298,10 @@ class PosixPathTest(unittest.TestCase):
|
|||
with support.EnvironmentVarGuard() as env:
|
||||
env['HOME'] = '/'
|
||||
self.assertEqual(posixpath.expanduser("~"), "/")
|
||||
# expanduser should fall back to using the password database
|
||||
del env['HOME']
|
||||
home = pwd.getpwuid(os.getuid()).pw_dir
|
||||
self.assertEqual(posixpath.expanduser("~"), home)
|
||||
|
||||
def test_normpath(self):
|
||||
self.assertEqual(posixpath.normpath(""), ".")
|
||||
|
@ -286,6 +334,16 @@ class PosixPathTest(unittest.TestCase):
|
|||
finally:
|
||||
support.unlink(ABSTFN)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "symlink"),
|
||||
"Missing symlink implementation")
|
||||
@skip_if_ABSTFN_contains_backslash
|
||||
def test_realpath_relative(self):
|
||||
try:
|
||||
os.symlink(posixpath.relpath(ABSTFN+"1"), ABSTFN)
|
||||
self.assertEqual(realpath(ABSTFN), ABSTFN+"1")
|
||||
finally:
|
||||
support.unlink(ABSTFN)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, "symlink"),
|
||||
"Missing symlink implementation")
|
||||
@skip_if_ABSTFN_contains_backslash
|
||||
|
@ -443,6 +501,11 @@ class PosixPathTest(unittest.TestCase):
|
|||
finally:
|
||||
os.getcwdb = real_getcwdb
|
||||
|
||||
def test_sameopenfile(self):
|
||||
fname = support.TESTFN + "1"
|
||||
with open(fname, "wb") as a, open(fname, "wb") as b:
|
||||
self.assertTrue(posixpath.sameopenfile(a.fileno(), b.fileno()))
|
||||
|
||||
|
||||
class PosixCommonTest(test_genericpath.CommonTest):
|
||||
pathmodule = posixpath
|
||||
|
|
|
@ -20,7 +20,7 @@ NotDefined = object()
|
|||
# A dispatch table all 8 combinations of providing
|
||||
# sep, end, and file
|
||||
# I use this machinery so that I'm not just passing default
|
||||
# values to print, I'm eiher passing or not passing in the
|
||||
# values to print, I'm either passing or not passing in the
|
||||
# arguments
|
||||
dispatch = {
|
||||
(False, False, False):
|
||||
|
|
|
@ -7,6 +7,7 @@ import sys
|
|||
import stat
|
||||
import os
|
||||
import os.path
|
||||
import functools
|
||||
from test import support
|
||||
from test.support import TESTFN
|
||||
from os.path import splitdrive
|
||||
|
@ -48,6 +49,21 @@ try:
|
|||
except ImportError:
|
||||
ZIP_SUPPORT = find_executable('zip')
|
||||
|
||||
def _fake_rename(*args, **kwargs):
|
||||
# Pretend the destination path is on a different filesystem.
|
||||
raise OSError()
|
||||
|
||||
def mock_rename(func):
|
||||
@functools.wraps(func)
|
||||
def wrap(*args, **kwargs):
|
||||
try:
|
||||
builtin_rename = os.rename
|
||||
os.rename = _fake_rename
|
||||
return func(*args, **kwargs)
|
||||
finally:
|
||||
os.rename = builtin_rename
|
||||
return wrap
|
||||
|
||||
class TestShutil(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -393,6 +409,41 @@ class TestShutil(unittest.TestCase):
|
|||
shutil.copytree(src_dir, dst_dir, symlinks=True)
|
||||
self.assertIn('test.txt', os.listdir(dst_dir))
|
||||
|
||||
def _copy_file(self, method):
|
||||
fname = 'test.txt'
|
||||
tmpdir = self.mkdtemp()
|
||||
self.write_file([tmpdir, fname])
|
||||
file1 = os.path.join(tmpdir, fname)
|
||||
tmpdir2 = self.mkdtemp()
|
||||
method(file1, tmpdir2)
|
||||
file2 = os.path.join(tmpdir2, fname)
|
||||
return (file1, file2)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
|
||||
def test_copy(self):
|
||||
# Ensure that the copied file exists and has the same mode bits.
|
||||
file1, file2 = self._copy_file(shutil.copy)
|
||||
self.assertTrue(os.path.exists(file2))
|
||||
self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode)
|
||||
|
||||
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod')
|
||||
@unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.utime')
|
||||
def test_copy2(self):
|
||||
# Ensure that the copied file exists and has the same mode and
|
||||
# modification time bits.
|
||||
file1, file2 = self._copy_file(shutil.copy2)
|
||||
self.assertTrue(os.path.exists(file2))
|
||||
file1_stat = os.stat(file1)
|
||||
file2_stat = os.stat(file2)
|
||||
self.assertEqual(file1_stat.st_mode, file2_stat.st_mode)
|
||||
for attr in 'st_atime', 'st_mtime':
|
||||
# The modification times may be truncated in the new file.
|
||||
self.assertLessEqual(getattr(file1_stat, attr),
|
||||
getattr(file2_stat, attr) + 1)
|
||||
if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'):
|
||||
self.assertEqual(getattr(file1_stat, 'st_flags'),
|
||||
getattr(file2_stat, 'st_flags'))
|
||||
|
||||
@unittest.skipUnless(zlib, "requires zlib")
|
||||
def test_make_tarball(self):
|
||||
# creating something to tar
|
||||
|
@ -403,6 +454,8 @@ class TestShutil(unittest.TestCase):
|
|||
self.write_file([tmpdir, 'sub', 'file3'], 'xxx')
|
||||
|
||||
tmpdir2 = self.mkdtemp()
|
||||
# force shutil to create the directory
|
||||
os.rmdir(tmpdir2)
|
||||
unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
|
||||
"source and target should be on same drive")
|
||||
|
||||
|
@ -518,6 +571,8 @@ class TestShutil(unittest.TestCase):
|
|||
self.write_file([tmpdir, 'file2'], 'xxx')
|
||||
|
||||
tmpdir2 = self.mkdtemp()
|
||||
# force shutil to create the directory
|
||||
os.rmdir(tmpdir2)
|
||||
base_name = os.path.join(tmpdir2, 'archive')
|
||||
_make_zipfile(base_name, tmpdir)
|
||||
|
||||
|
@ -645,6 +700,14 @@ class TestShutil(unittest.TestCase):
|
|||
diff = self._compare_dirs(tmpdir, tmpdir2)
|
||||
self.assertEqual(diff, [])
|
||||
|
||||
# and again, this time with the format specified
|
||||
tmpdir3 = self.mkdtemp()
|
||||
unpack_archive(filename, tmpdir3, format=format)
|
||||
diff = self._compare_dirs(tmpdir, tmpdir3)
|
||||
self.assertEqual(diff, [])
|
||||
self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
|
||||
self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')
|
||||
|
||||
def test_unpack_registery(self):
|
||||
|
||||
formats = get_unpack_formats()
|
||||
|
@ -680,20 +743,11 @@ class TestMove(unittest.TestCase):
|
|||
self.dst_dir = tempfile.mkdtemp()
|
||||
self.src_file = os.path.join(self.src_dir, filename)
|
||||
self.dst_file = os.path.join(self.dst_dir, filename)
|
||||
# Try to create a dir in the current directory, hoping that it is
|
||||
# not located on the same filesystem as the system tmp dir.
|
||||
try:
|
||||
self.dir_other_fs = tempfile.mkdtemp(
|
||||
dir=os.path.dirname(__file__))
|
||||
self.file_other_fs = os.path.join(self.dir_other_fs,
|
||||
filename)
|
||||
except OSError:
|
||||
self.dir_other_fs = None
|
||||
with open(self.src_file, "wb") as f:
|
||||
f.write(b"spam")
|
||||
|
||||
def tearDown(self):
|
||||
for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
|
||||
for d in (self.src_dir, self.dst_dir):
|
||||
try:
|
||||
if d:
|
||||
shutil.rmtree(d)
|
||||
|
@ -722,21 +776,15 @@ class TestMove(unittest.TestCase):
|
|||
# Move a file inside an existing dir on the same filesystem.
|
||||
self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
|
||||
|
||||
@mock_rename
|
||||
def test_move_file_other_fs(self):
|
||||
# Move a file to an existing dir on another filesystem.
|
||||
if not self.dir_other_fs:
|
||||
# skip
|
||||
return
|
||||
self._check_move_file(self.src_file, self.file_other_fs,
|
||||
self.file_other_fs)
|
||||
self.test_move_file()
|
||||
|
||||
@mock_rename
|
||||
def test_move_file_to_dir_other_fs(self):
|
||||
# Move a file to another location on another filesystem.
|
||||
if not self.dir_other_fs:
|
||||
# skip
|
||||
return
|
||||
self._check_move_file(self.src_file, self.dir_other_fs,
|
||||
self.file_other_fs)
|
||||
self.test_move_file_to_dir()
|
||||
|
||||
def test_move_dir(self):
|
||||
# Move a dir to another location on the same filesystem.
|
||||
|
@ -749,32 +797,20 @@ class TestMove(unittest.TestCase):
|
|||
except:
|
||||
pass
|
||||
|
||||
@mock_rename
|
||||
def test_move_dir_other_fs(self):
|
||||
# Move a dir to another location on another filesystem.
|
||||
if not self.dir_other_fs:
|
||||
# skip
|
||||
return
|
||||
dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
|
||||
try:
|
||||
self._check_move_dir(self.src_dir, dst_dir, dst_dir)
|
||||
finally:
|
||||
try:
|
||||
shutil.rmtree(dst_dir)
|
||||
except:
|
||||
pass
|
||||
self.test_move_dir()
|
||||
|
||||
def test_move_dir_to_dir(self):
|
||||
# Move a dir inside an existing dir on the same filesystem.
|
||||
self._check_move_dir(self.src_dir, self.dst_dir,
|
||||
os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
|
||||
|
||||
@mock_rename
|
||||
def test_move_dir_to_dir_other_fs(self):
|
||||
# Move a dir inside an existing dir on another filesystem.
|
||||
if not self.dir_other_fs:
|
||||
# skip
|
||||
return
|
||||
self._check_move_dir(self.src_dir, self.dir_other_fs,
|
||||
os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
|
||||
self.test_move_dir_to_dir()
|
||||
|
||||
def test_existing_file_inside_dest_dir(self):
|
||||
# A file with the same name inside the destination dir already exists.
|
||||
|
|
|
@ -22,7 +22,7 @@ We have to test this with various file encodings. We also test it with
|
|||
exec()/eval(), which uses a different code path.
|
||||
|
||||
This file is really about correct treatment of encodings and
|
||||
backslashes. It doens't concern itself with issues like single
|
||||
backslashes. It doesn't concern itself with issues like single
|
||||
vs. double quotes or singly- vs. triply-quoted strings: that's dealt
|
||||
with elsewhere (I assume).
|
||||
"""
|
||||
|
|
|
@ -536,7 +536,7 @@ class CacheTests(unittest.TestCase):
|
|||
self.assertIsNot(first_time_re, second_time_re)
|
||||
# Possible test locale is not supported while initial locale is.
|
||||
# If this is the case just suppress the exception and fall-through
|
||||
# to the reseting to the original locale.
|
||||
# to the resetting to the original locale.
|
||||
except locale.Error:
|
||||
pass
|
||||
# Make sure we don't trample on the locale setting once we leave the
|
||||
|
|
|
@ -463,7 +463,7 @@ class StructTest(unittest.TestCase):
|
|||
test_string)
|
||||
|
||||
def test_unpack_with_buffer(self):
|
||||
# SF bug 1563759: struct.unpack doens't support buffer protocol objects
|
||||
# SF bug 1563759: struct.unpack doesn't support buffer protocol objects
|
||||
data1 = array.array('B', b'\x12\x34\x56\x78')
|
||||
data2 = memoryview(b'\x12\x34\x56\x78') # XXX b'......XXXX......', 6, 4
|
||||
for data in [data1, data2]:
|
||||
|
|
|
@ -130,7 +130,9 @@ class ProcessTestCase(BaseTestCase):
|
|||
"import sys; sys.stdout.write('BDFL')\n"
|
||||
"sys.stdout.flush()\n"
|
||||
"while True: pass"],
|
||||
timeout=1.5)
|
||||
# Some heavily loaded buildbots (sparc Debian 3.x) require
|
||||
# this much time to start and print.
|
||||
timeout=3)
|
||||
self.fail("Expected TimeoutExpired.")
|
||||
self.assertEqual(c.exception.output, b'BDFL')
|
||||
|
||||
|
@ -323,6 +325,31 @@ class ProcessTestCase(BaseTestCase):
|
|||
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
|
||||
self.assertEqual(rc, 2)
|
||||
|
||||
def test_stdout_devnull(self):
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'for i in range(10240):'
|
||||
'print("x" * 1024)'],
|
||||
stdout=subprocess.DEVNULL)
|
||||
p.wait()
|
||||
self.assertEqual(p.stdout, None)
|
||||
|
||||
def test_stderr_devnull(self):
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'import sys\n'
|
||||
'for i in range(10240):'
|
||||
'sys.stderr.write("x" * 1024)'],
|
||||
stderr=subprocess.DEVNULL)
|
||||
p.wait()
|
||||
self.assertEqual(p.stderr, None)
|
||||
|
||||
def test_stdin_devnull(self):
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'import sys;'
|
||||
'sys.stdin.read(1)'],
|
||||
stdin=subprocess.DEVNULL)
|
||||
p.wait()
|
||||
self.assertEqual(p.stdin, None)
|
||||
|
||||
def test_cwd(self):
|
||||
tmpdir = tempfile.gettempdir()
|
||||
# We cannot use os.path.realpath to canonicalize the path,
|
||||
|
@ -622,13 +649,15 @@ class ProcessTestCase(BaseTestCase):
|
|||
# Subsequent invocations should just return the returncode
|
||||
self.assertEqual(p.wait(), 0)
|
||||
|
||||
|
||||
def test_wait_timeout(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "import time; time.sleep(0.1)"])
|
||||
self.assertRaises(subprocess.TimeoutExpired, p.wait, timeout=0.01)
|
||||
self.assertEqual(p.wait(timeout=2), 0)
|
||||
|
||||
with self.assertRaises(subprocess.TimeoutExpired) as c:
|
||||
p.wait(timeout=0.01)
|
||||
self.assertIn("0.01", str(c.exception)) # For coverage of __str__.
|
||||
# Some heavily loaded buildbots (sparc Debian 3.x) require this much
|
||||
# time to start.
|
||||
self.assertEqual(p.wait(timeout=3), 0)
|
||||
|
||||
def test_invalid_bufsize(self):
|
||||
# an invalid type of the bufsize argument should raise
|
||||
|
|
|
@ -54,7 +54,6 @@ class TestUntestedModules(unittest.TestCase):
|
|||
import py_compile
|
||||
import sndhdr
|
||||
import tabnanny
|
||||
import timeit
|
||||
try:
|
||||
import tty # not available on Windows
|
||||
except ImportError:
|
||||
|
|
|
@ -237,7 +237,7 @@ SyntaxError: can't assign to function call
|
|||
|
||||
Test continue in finally in weird combinations.
|
||||
|
||||
continue in for loop under finally shouuld be ok.
|
||||
continue in for loop under finally should be ok.
|
||||
|
||||
>>> def test():
|
||||
... try:
|
||||
|
|
|
@ -492,7 +492,7 @@ class SysModuleTest(unittest.TestCase):
|
|||
# provide too much opportunity for insane things to happen.
|
||||
# We don't want them in the interned dict and if they aren't
|
||||
# actually interned, we don't want to create the appearance
|
||||
# that they are by allowing intern() to succeeed.
|
||||
# that they are by allowing intern() to succeed.
|
||||
class S(str):
|
||||
def __hash__(self):
|
||||
return 123
|
||||
|
|
|
@ -578,7 +578,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
|
|||
# This acquires the lock and then waits until the child has forked
|
||||
# before returning, which will release the lock soon after. If
|
||||
# someone else tries to fix this test case by acquiring this lock
|
||||
# before forking instead of reseting it, the test case will
|
||||
# before forking instead of resetting it, the test case will
|
||||
# deadlock when it shouldn't.
|
||||
condition = w._block
|
||||
orig_acquire = condition.acquire
|
||||
|
|
305
Lib/test/test_timeit.py
Normal file
305
Lib/test/test_timeit.py
Normal file
|
@ -0,0 +1,305 @@
|
|||
import timeit
|
||||
import unittest
|
||||
import sys
|
||||
import io
|
||||
import time
|
||||
from textwrap import dedent
|
||||
|
||||
from test.support import run_unittest
|
||||
from test.support import captured_stdout
|
||||
from test.support import captured_stderr
|
||||
|
||||
# timeit's default number of iterations.
|
||||
DEFAULT_NUMBER = 1000000
|
||||
|
||||
# timeit's default number of repetitions.
|
||||
DEFAULT_REPEAT = 3
|
||||
|
||||
# XXX: some tests are commented out that would improve the coverage but take a
|
||||
# long time to run because they test the default number of loops, which is
|
||||
# large. The tests could be enabled if there was a way to override the default
|
||||
# number of loops during testing, but this would require changing the signature
|
||||
# of some functions that use the default as a default argument.
|
||||
|
||||
class FakeTimer:
|
||||
BASE_TIME = 42.0
|
||||
def __init__(self, seconds_per_increment=1.0):
|
||||
self.count = 0
|
||||
self.setup_calls = 0
|
||||
self.seconds_per_increment=seconds_per_increment
|
||||
timeit._fake_timer = self
|
||||
|
||||
def __call__(self):
|
||||
return self.BASE_TIME + self.count * self.seconds_per_increment
|
||||
|
||||
def inc(self):
|
||||
self.count += 1
|
||||
|
||||
def setup(self):
|
||||
self.setup_calls += 1
|
||||
|
||||
def wrap_timer(self, timer):
|
||||
"""Records 'timer' and returns self as callable timer."""
|
||||
self.saved_timer = timer
|
||||
return self
|
||||
|
||||
class TestTimeit(unittest.TestCase):
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
del timeit._fake_timer
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
def test_reindent_empty(self):
|
||||
self.assertEqual(timeit.reindent("", 0), "")
|
||||
self.assertEqual(timeit.reindent("", 4), "")
|
||||
|
||||
def test_reindent_single(self):
|
||||
self.assertEqual(timeit.reindent("pass", 0), "pass")
|
||||
self.assertEqual(timeit.reindent("pass", 4), "pass")
|
||||
|
||||
def test_reindent_multi_empty(self):
|
||||
self.assertEqual(timeit.reindent("\n\n", 0), "\n\n")
|
||||
self.assertEqual(timeit.reindent("\n\n", 4), "\n \n ")
|
||||
|
||||
def test_reindent_multi(self):
|
||||
self.assertEqual(timeit.reindent(
|
||||
"print()\npass\nbreak", 0),
|
||||
"print()\npass\nbreak")
|
||||
self.assertEqual(timeit.reindent(
|
||||
"print()\npass\nbreak", 4),
|
||||
"print()\n pass\n break")
|
||||
|
||||
def test_timer_invalid_stmt(self):
|
||||
self.assertRaises(ValueError, timeit.Timer, stmt=None)
|
||||
|
||||
def test_timer_invalid_setup(self):
|
||||
self.assertRaises(ValueError, timeit.Timer, setup=None)
|
||||
|
||||
fake_setup = "import timeit; timeit._fake_timer.setup()"
|
||||
fake_stmt = "import timeit; timeit._fake_timer.inc()"
|
||||
|
||||
def fake_callable_setup(self):
|
||||
self.fake_timer.setup()
|
||||
|
||||
def fake_callable_stmt(self):
|
||||
self.fake_timer.inc()
|
||||
|
||||
def timeit(self, stmt, setup, number=None):
|
||||
self.fake_timer = FakeTimer()
|
||||
t = timeit.Timer(stmt=stmt, setup=setup, timer=self.fake_timer)
|
||||
kwargs = {}
|
||||
if number is None:
|
||||
number = DEFAULT_NUMBER
|
||||
else:
|
||||
kwargs['number'] = number
|
||||
delta_time = t.timeit(**kwargs)
|
||||
self.assertEqual(self.fake_timer.setup_calls, 1)
|
||||
self.assertEqual(self.fake_timer.count, number)
|
||||
self.assertEqual(delta_time, number)
|
||||
|
||||
# Takes too long to run in debug build.
|
||||
#def test_timeit_default_iters(self):
|
||||
# self.timeit(self.fake_stmt, self.fake_setup)
|
||||
|
||||
def test_timeit_zero_iters(self):
|
||||
self.timeit(self.fake_stmt, self.fake_setup, number=0)
|
||||
|
||||
def test_timeit_few_iters(self):
|
||||
self.timeit(self.fake_stmt, self.fake_setup, number=3)
|
||||
|
||||
def test_timeit_callable_stmt(self):
|
||||
self.timeit(self.fake_callable_stmt, self.fake_setup, number=3)
|
||||
|
||||
def test_timeit_callable_stmt_and_setup(self):
|
||||
self.timeit(self.fake_callable_stmt,
|
||||
self.fake_callable_setup, number=3)
|
||||
|
||||
# Takes too long to run in debug build.
|
||||
#def test_timeit_function(self):
|
||||
# delta_time = timeit.timeit(self.fake_stmt, self.fake_setup,
|
||||
# timer=FakeTimer())
|
||||
# self.assertEqual(delta_time, DEFAULT_NUMBER)
|
||||
|
||||
def test_timeit_function_zero_iters(self):
|
||||
delta_time = timeit.timeit(self.fake_stmt, self.fake_setup, number=0,
|
||||
timer=FakeTimer())
|
||||
self.assertEqual(delta_time, 0)
|
||||
|
||||
def repeat(self, stmt, setup, repeat=None, number=None):
|
||||
self.fake_timer = FakeTimer()
|
||||
t = timeit.Timer(stmt=stmt, setup=setup, timer=self.fake_timer)
|
||||
kwargs = {}
|
||||
if repeat is None:
|
||||
repeat = DEFAULT_REPEAT
|
||||
else:
|
||||
kwargs['repeat'] = repeat
|
||||
if number is None:
|
||||
number = DEFAULT_NUMBER
|
||||
else:
|
||||
kwargs['number'] = number
|
||||
delta_times = t.repeat(**kwargs)
|
||||
self.assertEqual(self.fake_timer.setup_calls, repeat)
|
||||
self.assertEqual(self.fake_timer.count, repeat * number)
|
||||
self.assertEqual(delta_times, repeat * [float(number)])
|
||||
|
||||
# Takes too long to run in debug build.
|
||||
#def test_repeat_default(self):
|
||||
# self.repeat(self.fake_stmt, self.fake_setup)
|
||||
|
||||
def test_repeat_zero_reps(self):
|
||||
self.repeat(self.fake_stmt, self.fake_setup, repeat=0)
|
||||
|
||||
def test_repeat_zero_iters(self):
|
||||
self.repeat(self.fake_stmt, self.fake_setup, number=0)
|
||||
|
||||
def test_repeat_few_reps_and_iters(self):
|
||||
self.repeat(self.fake_stmt, self.fake_setup, repeat=3, number=5)
|
||||
|
||||
def test_repeat_callable_stmt(self):
|
||||
self.repeat(self.fake_callable_stmt, self.fake_setup,
|
||||
repeat=3, number=5)
|
||||
|
||||
def test_repeat_callable_stmt_and_setup(self):
|
||||
self.repeat(self.fake_callable_stmt, self.fake_callable_setup,
|
||||
repeat=3, number=5)
|
||||
|
||||
# Takes too long to run in debug build.
|
||||
#def test_repeat_function(self):
|
||||
# delta_times = timeit.repeat(self.fake_stmt, self.fake_setup,
|
||||
# timer=FakeTimer())
|
||||
# self.assertEqual(delta_times, DEFAULT_REPEAT * [float(DEFAULT_NUMBER)])
|
||||
|
||||
def test_repeat_function_zero_reps(self):
|
||||
delta_times = timeit.repeat(self.fake_stmt, self.fake_setup, repeat=0,
|
||||
timer=FakeTimer())
|
||||
self.assertEqual(delta_times, [])
|
||||
|
||||
def test_repeat_function_zero_iters(self):
|
||||
delta_times = timeit.repeat(self.fake_stmt, self.fake_setup, number=0,
|
||||
timer=FakeTimer())
|
||||
self.assertEqual(delta_times, DEFAULT_REPEAT * [0.0])
|
||||
|
||||
def assert_exc_string(self, exc_string, expected_exc_name):
|
||||
exc_lines = exc_string.splitlines()
|
||||
self.assertGreater(len(exc_lines), 2)
|
||||
self.assertTrue(exc_lines[0].startswith('Traceback'))
|
||||
self.assertTrue(exc_lines[-1].startswith(expected_exc_name))
|
||||
|
||||
def test_print_exc(self):
|
||||
s = io.StringIO()
|
||||
t = timeit.Timer("1/0")
|
||||
try:
|
||||
t.timeit()
|
||||
except:
|
||||
t.print_exc(s)
|
||||
self.assert_exc_string(s.getvalue(), 'ZeroDivisionError')
|
||||
|
||||
MAIN_DEFAULT_OUTPUT = "10 loops, best of 3: 1 sec per loop\n"
|
||||
|
||||
def run_main(self, seconds_per_increment=1.0, switches=None, timer=None):
|
||||
if timer is None:
|
||||
timer = FakeTimer(seconds_per_increment=seconds_per_increment)
|
||||
if switches is None:
|
||||
args = []
|
||||
else:
|
||||
args = switches[:]
|
||||
args.append(self.fake_stmt)
|
||||
# timeit.main() modifies sys.path, so save and restore it.
|
||||
orig_sys_path = sys.path[:]
|
||||
with captured_stdout() as s:
|
||||
timeit.main(args=args, _wrap_timer=timer.wrap_timer)
|
||||
sys.path[:] = orig_sys_path[:]
|
||||
return s.getvalue()
|
||||
|
||||
def test_main_bad_switch(self):
|
||||
s = self.run_main(switches=['--bad-switch'])
|
||||
self.assertEqual(s, dedent("""\
|
||||
option --bad-switch not recognized
|
||||
use -h/--help for command line help
|
||||
"""))
|
||||
|
||||
def test_main_seconds(self):
|
||||
s = self.run_main(seconds_per_increment=5.5)
|
||||
self.assertEqual(s, "10 loops, best of 3: 5.5 sec per loop\n")
|
||||
|
||||
def test_main_milliseconds(self):
|
||||
s = self.run_main(seconds_per_increment=0.0055)
|
||||
self.assertEqual(s, "100 loops, best of 3: 5.5 msec per loop\n")
|
||||
|
||||
def test_main_microseconds(self):
|
||||
s = self.run_main(seconds_per_increment=0.0000025, switches=['-n100'])
|
||||
self.assertEqual(s, "100 loops, best of 3: 2.5 usec per loop\n")
|
||||
|
||||
def test_main_fixed_iters(self):
|
||||
s = self.run_main(seconds_per_increment=2.0, switches=['-n35'])
|
||||
self.assertEqual(s, "35 loops, best of 3: 2 sec per loop\n")
|
||||
|
||||
def test_main_setup(self):
|
||||
s = self.run_main(seconds_per_increment=2.0,
|
||||
switches=['-n35', '-s', 'print("CustomSetup")'])
|
||||
self.assertEqual(s, "CustomSetup\n" * 3 +
|
||||
"35 loops, best of 3: 2 sec per loop\n")
|
||||
|
||||
def test_main_fixed_reps(self):
|
||||
s = self.run_main(seconds_per_increment=60.0, switches=['-r9'])
|
||||
self.assertEqual(s, "10 loops, best of 9: 60 sec per loop\n")
|
||||
|
||||
def test_main_negative_reps(self):
|
||||
s = self.run_main(seconds_per_increment=60.0, switches=['-r-5'])
|
||||
self.assertEqual(s, "10 loops, best of 1: 60 sec per loop\n")
|
||||
|
||||
def test_main_help(self):
|
||||
s = self.run_main(switches=['-h'])
|
||||
# Note: It's not clear that the trailing space was intended as part of
|
||||
# the help text, but since it's there, check for it.
|
||||
self.assertEqual(s, timeit.__doc__ + ' ')
|
||||
|
||||
def test_main_using_time(self):
|
||||
fake_timer = FakeTimer()
|
||||
s = self.run_main(switches=['-t'], timer=fake_timer)
|
||||
self.assertEqual(s, self.MAIN_DEFAULT_OUTPUT)
|
||||
self.assertIs(fake_timer.saved_timer, time.time)
|
||||
|
||||
def test_main_using_clock(self):
|
||||
fake_timer = FakeTimer()
|
||||
s = self.run_main(switches=['-c'], timer=fake_timer)
|
||||
self.assertEqual(s, self.MAIN_DEFAULT_OUTPUT)
|
||||
self.assertIs(fake_timer.saved_timer, time.clock)
|
||||
|
||||
def test_main_verbose(self):
|
||||
s = self.run_main(switches=['-v'])
|
||||
self.assertEqual(s, dedent("""\
|
||||
10 loops -> 10 secs
|
||||
raw times: 10 10 10
|
||||
10 loops, best of 3: 1 sec per loop
|
||||
"""))
|
||||
|
||||
def test_main_very_verbose(self):
|
||||
s = self.run_main(seconds_per_increment=0.000050, switches=['-vv'])
|
||||
self.assertEqual(s, dedent("""\
|
||||
10 loops -> 0.0005 secs
|
||||
100 loops -> 0.005 secs
|
||||
1000 loops -> 0.05 secs
|
||||
10000 loops -> 0.5 secs
|
||||
raw times: 0.5 0.5 0.5
|
||||
10000 loops, best of 3: 50 usec per loop
|
||||
"""))
|
||||
|
||||
def test_main_exception(self):
|
||||
with captured_stderr() as error_stringio:
|
||||
s = self.run_main(switches=['1/0'])
|
||||
self.assert_exc_string(error_stringio.getvalue(), 'ZeroDivisionError')
|
||||
|
||||
def test_main_exception_fixed_reps(self):
|
||||
with captured_stderr() as error_stringio:
|
||||
s = self.run_main(switches=['-n1', '1/0'])
|
||||
self.assert_exc_string(error_stringio.getvalue(), 'ZeroDivisionError')
|
||||
|
||||
|
||||
def test_main():
|
||||
run_unittest(TestTimeit)
|
||||
|
||||
if __name__ == '__main__':
|
||||
test_main()
|
|
@ -209,7 +209,7 @@ class TestRunExecCounts(unittest.TestCase):
|
|||
(self.my_py_filename, firstlineno + 4): 1,
|
||||
}
|
||||
|
||||
# When used through 'run', some other spurios counts are produced, like
|
||||
# When used through 'run', some other spurious counts are produced, like
|
||||
# the settrace of threading, which we ignore, just making sure that the
|
||||
# counts fo traced_func_loop were right.
|
||||
#
|
||||
|
|
|
@ -91,7 +91,7 @@ class urlopen_FileTests(unittest.TestCase):
|
|||
"did not return the expected text")
|
||||
|
||||
def test_close(self):
|
||||
# Test close() by calling it hear and then having it be called again
|
||||
# Test close() by calling it here and then having it be called again
|
||||
# by the tearDown() method for the test
|
||||
self.returned_obj.close()
|
||||
|
||||
|
@ -174,6 +174,11 @@ class urlopen_HttpTests(unittest.TestCase):
|
|||
finally:
|
||||
self.unfakehttp()
|
||||
|
||||
def test_willclose(self):
|
||||
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
|
||||
resp = urlopen("http://www.python.org")
|
||||
self.assertTrue(resp.fp.will_close)
|
||||
|
||||
def test_read_0_9(self):
|
||||
# "0.9" response accepted (but not "simple responses" without
|
||||
# a status line)
|
||||
|
@ -1021,7 +1026,7 @@ class URLopener_Tests(unittest.TestCase):
|
|||
|
||||
# Just commented them out.
|
||||
# Can't really tell why keep failing in windows and sparc.
|
||||
# Everywhere else they work ok, but on those machines, someteimes
|
||||
# Everywhere else they work ok, but on those machines, sometimes
|
||||
# fail in one of the tests, sometimes in other. I have a linux, and
|
||||
# the tests go ok.
|
||||
# If anybody has one of the problematic enviroments, please help!
|
||||
|
|
|
@ -332,7 +332,7 @@ class WarnTests(unittest.TestCase):
|
|||
sys.argv = argv
|
||||
|
||||
def test_warn_explicit_type_errors(self):
|
||||
# warn_explicit() shoud error out gracefully if it is given objects
|
||||
# warn_explicit() should error out gracefully if it is given objects
|
||||
# of the wrong types.
|
||||
# lineno is expected to be an integer.
|
||||
self.assertRaises(TypeError, self.module.warn_explicit,
|
||||
|
|
|
@ -232,10 +232,10 @@ def repeat(stmt="pass", setup="pass", timer=default_timer,
|
|||
"""Convenience function to create Timer object and call repeat method."""
|
||||
return Timer(stmt, setup, timer).repeat(repeat, number)
|
||||
|
||||
def main(args=None):
|
||||
def main(args=None, *, _wrap_timer=None):
|
||||
"""Main program, used when run as a script.
|
||||
|
||||
The optional argument specifies the command line to be parsed,
|
||||
The optional 'args' argument specifies the command line to be parsed,
|
||||
defaulting to sys.argv[1:].
|
||||
|
||||
The return value is an exit code to be passed to sys.exit(); it
|
||||
|
@ -244,6 +244,10 @@ def main(args=None):
|
|||
When an exception happens during timing, a traceback is printed to
|
||||
stderr and the return value is 1. Exceptions at other times
|
||||
(including the template compilation) are not caught.
|
||||
|
||||
'_wrap_timer' is an internal interface used for unit testing. If it
|
||||
is not None, it must be a callable that accepts a timer function
|
||||
and returns another timer function (used for unit testing).
|
||||
"""
|
||||
if args is None:
|
||||
args = sys.argv[1:]
|
||||
|
@ -289,6 +293,8 @@ def main(args=None):
|
|||
# directory)
|
||||
import os
|
||||
sys.path.insert(0, os.curdir)
|
||||
if _wrap_timer is not None:
|
||||
timer = _wrap_timer(timer)
|
||||
t = Timer(stmt, setup, timer)
|
||||
if number == 0:
|
||||
# determine number so that 0.2 <= total time < 2.0
|
||||
|
|
|
@ -135,7 +135,7 @@ class InternalFunctionsTest(unittest.TestCase):
|
|||
# minimum acceptable for image type
|
||||
self.assertEqual(ttk._format_elemcreate('image', False, 'test'),
|
||||
("test ", ()))
|
||||
# specifiyng a state spec
|
||||
# specifying a state spec
|
||||
self.assertEqual(ttk._format_elemcreate('image', False, 'test',
|
||||
('', 'a')), ("test {} a", ()))
|
||||
# state spec with multiple states
|
||||
|
|
|
@ -171,7 +171,7 @@ class tixCommand:
|
|||
return self.tk.call('tix', 'getimage', name)
|
||||
|
||||
def tix_option_get(self, name):
|
||||
"""Gets the options manitained by the Tix
|
||||
"""Gets the options maintained by the Tix
|
||||
scheme mechanism. Available options include:
|
||||
|
||||
active_bg active_fg bg
|
||||
|
@ -576,7 +576,7 @@ class ButtonBox(TixWidget):
|
|||
|
||||
class ComboBox(TixWidget):
|
||||
"""ComboBox - an Entry field with a dropdown menu. The user can select a
|
||||
choice by either typing in the entry subwdget or selecting from the
|
||||
choice by either typing in the entry subwidget or selecting from the
|
||||
listbox subwidget.
|
||||
|
||||
Subwidget Class
|
||||
|
@ -869,7 +869,7 @@ class HList(TixWidget, XView, YView):
|
|||
"""HList - Hierarchy display widget can be used to display any data
|
||||
that have a hierarchical structure, for example, file system directory
|
||||
trees. The list entries are indented and connected by branch lines
|
||||
according to their places in the hierachy.
|
||||
according to their places in the hierarchy.
|
||||
|
||||
Subwidgets - None"""
|
||||
|
||||
|
@ -1519,7 +1519,7 @@ class TList(TixWidget, XView, YView):
|
|||
self.tk.call(self._w, 'selection', 'set', first, last)
|
||||
|
||||
class Tree(TixWidget):
|
||||
"""Tree - The tixTree widget can be used to display hierachical
|
||||
"""Tree - The tixTree widget can be used to display hierarchical
|
||||
data in a tree form. The user can adjust
|
||||
the view of the tree by opening or closing parts of the tree."""
|
||||
|
||||
|
|
|
@ -707,7 +707,7 @@ class Combobox(Entry):
|
|||
textvariable, values, width
|
||||
"""
|
||||
# The "values" option may need special formatting, so leave to
|
||||
# _format_optdict the responsability to format it
|
||||
# _format_optdict the responsibility to format it
|
||||
if "values" in kw:
|
||||
kw["values"] = _format_optdict({'v': kw["values"]})[1]
|
||||
|
||||
|
|
|
@ -1488,7 +1488,7 @@ class TurtleScreen(TurtleScreenBase):
|
|||
Optional arguments:
|
||||
canvwidth -- positive integer, new width of canvas in pixels
|
||||
canvheight -- positive integer, new height of canvas in pixels
|
||||
bg -- colorstring or color-tupel, new backgroundcolor
|
||||
bg -- colorstring or color-tuple, new backgroundcolor
|
||||
If no arguments are given, return current (canvaswidth, canvasheight)
|
||||
|
||||
Do not alter the drawing window. To observe hidden parts of
|
||||
|
@ -3242,9 +3242,9 @@ class RawTurtle(TPen, TNavigator):
|
|||
fill="", width=ps)
|
||||
# Turtle now at position old,
|
||||
self._position = old
|
||||
## if undo is done during crating a polygon, the last vertex
|
||||
## will be deleted. if the polygon is entirel deleted,
|
||||
## creatigPoly will be set to False.
|
||||
## if undo is done during creating a polygon, the last vertex
|
||||
## will be deleted. if the polygon is entirely deleted,
|
||||
## creatingPoly will be set to False.
|
||||
## Polygons created before the last one will not be affected by undo()
|
||||
if self._creatingPoly:
|
||||
if len(self._poly) > 0:
|
||||
|
@ -3796,7 +3796,7 @@ class _Screen(TurtleScreen):
|
|||
|
||||
|
||||
class Turtle(RawTurtle):
|
||||
"""RawTurtle auto-crating (scrolled) canvas.
|
||||
"""RawTurtle auto-creating (scrolled) canvas.
|
||||
|
||||
When a Turtle object is created or a function derived from some
|
||||
Turtle method is called a TurtleScreen object is automatically created.
|
||||
|
@ -3836,7 +3836,7 @@ def write_docstringdict(filename="turtle_docstringdict"):
|
|||
filename -- a string, used as filename
|
||||
default value is turtle_docstringdict
|
||||
|
||||
Has to be called explicitely, (not used by the turtle-graphics classes)
|
||||
Has to be called explicitly, (not used by the turtle-graphics classes)
|
||||
The docstring dictionary will be written to the Python script <filname>.py
|
||||
It is intended to serve as a template for translation of the docstrings
|
||||
into different languages.
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
tdemo_bytedesign.py
|
||||
|
||||
An example adapted from the example-suite
|
||||
of PythonCard's turtle graphcis.
|
||||
of PythonCard's turtle graphics.
|
||||
|
||||
It's based on an article in BYTE magazine
|
||||
Problem Solving with Logo: Using Turtle
|
||||
|
|
|
@ -59,6 +59,9 @@ class TestResult(object):
|
|||
"Called when the given test is about to be run"
|
||||
self.testsRun += 1
|
||||
self._mirrorOutput = False
|
||||
self._setupStdout()
|
||||
|
||||
def _setupStdout(self):
|
||||
if self.buffer:
|
||||
if self._stderr_buffer is None:
|
||||
self._stderr_buffer = io.StringIO()
|
||||
|
@ -74,6 +77,10 @@ class TestResult(object):
|
|||
|
||||
def stopTest(self, test):
|
||||
"""Called when the given test has been run"""
|
||||
self._restoreStdout()
|
||||
self._mirrorOutput = False
|
||||
|
||||
def _restoreStdout(self):
|
||||
if self.buffer:
|
||||
if self._mirrorOutput:
|
||||
output = sys.stdout.getvalue()
|
||||
|
@ -93,7 +100,6 @@ class TestResult(object):
|
|||
self._stdout_buffer.truncate()
|
||||
self._stderr_buffer.seek(0)
|
||||
self._stderr_buffer.truncate()
|
||||
self._mirrorOutput = False
|
||||
|
||||
def stopTestRun(self):
|
||||
"""Called once after all tests are executed.
|
||||
|
|
|
@ -8,6 +8,11 @@ from . import util
|
|||
__unittest = True
|
||||
|
||||
|
||||
def _call_if_exists(parent, attr):
|
||||
func = getattr(parent, attr, lambda: None)
|
||||
func()
|
||||
|
||||
|
||||
class BaseTestSuite(object):
|
||||
"""A simple test suite that doesn't provide class or module shared fixtures.
|
||||
"""
|
||||
|
@ -133,6 +138,7 @@ class TestSuite(BaseTestSuite):
|
|||
|
||||
setUpClass = getattr(currentClass, 'setUpClass', None)
|
||||
if setUpClass is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
setUpClass()
|
||||
except Exception as e:
|
||||
|
@ -142,6 +148,8 @@ class TestSuite(BaseTestSuite):
|
|||
className = util.strclass(currentClass)
|
||||
errorName = 'setUpClass (%s)' % className
|
||||
self._addClassOrModuleLevelException(result, e, errorName)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _get_previous_module(self, result):
|
||||
previousModule = None
|
||||
|
@ -167,6 +175,7 @@ class TestSuite(BaseTestSuite):
|
|||
return
|
||||
setUpModule = getattr(module, 'setUpModule', None)
|
||||
if setUpModule is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
setUpModule()
|
||||
except Exception as e:
|
||||
|
@ -175,6 +184,8 @@ class TestSuite(BaseTestSuite):
|
|||
result._moduleSetUpFailed = True
|
||||
errorName = 'setUpModule (%s)' % currentModule
|
||||
self._addClassOrModuleLevelException(result, e, errorName)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _addClassOrModuleLevelException(self, result, exception, errorName):
|
||||
error = _ErrorHolder(errorName)
|
||||
|
@ -198,6 +209,7 @@ class TestSuite(BaseTestSuite):
|
|||
|
||||
tearDownModule = getattr(module, 'tearDownModule', None)
|
||||
if tearDownModule is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
tearDownModule()
|
||||
except Exception as e:
|
||||
|
@ -205,6 +217,8 @@ class TestSuite(BaseTestSuite):
|
|||
raise
|
||||
errorName = 'tearDownModule (%s)' % previousModule
|
||||
self._addClassOrModuleLevelException(result, e, errorName)
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
def _tearDownPreviousClass(self, test, result):
|
||||
previousClass = getattr(result, '_previousTestClass', None)
|
||||
|
@ -220,6 +234,7 @@ class TestSuite(BaseTestSuite):
|
|||
|
||||
tearDownClass = getattr(previousClass, 'tearDownClass', None)
|
||||
if tearDownClass is not None:
|
||||
_call_if_exists(result, '_setupStdout')
|
||||
try:
|
||||
tearDownClass()
|
||||
except Exception as e:
|
||||
|
@ -228,7 +243,8 @@ class TestSuite(BaseTestSuite):
|
|||
className = util.strclass(previousClass)
|
||||
errorName = 'tearDownClass (%s)' % className
|
||||
self._addClassOrModuleLevelException(result, e, errorName)
|
||||
|
||||
finally:
|
||||
_call_if_exists(result, '_restoreStdout')
|
||||
|
||||
|
||||
class _ErrorHolder(object):
|
||||
|
|
|
@ -497,5 +497,72 @@ class TestOutputBuffering(unittest.TestCase):
|
|||
self.assertEqual(result._original_stderr.getvalue(), expectedErrMessage)
|
||||
self.assertMultiLineEqual(message, expectedFullMessage)
|
||||
|
||||
def testBufferSetupClass(self):
|
||||
result = unittest.TestResult()
|
||||
result.buffer = True
|
||||
|
||||
class Foo(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
1/0
|
||||
def test_foo(self):
|
||||
pass
|
||||
suite = unittest.TestSuite([Foo('test_foo')])
|
||||
suite(result)
|
||||
self.assertEqual(len(result.errors), 1)
|
||||
|
||||
def testBufferTearDownClass(self):
|
||||
result = unittest.TestResult()
|
||||
result.buffer = True
|
||||
|
||||
class Foo(unittest.TestCase):
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
1/0
|
||||
def test_foo(self):
|
||||
pass
|
||||
suite = unittest.TestSuite([Foo('test_foo')])
|
||||
suite(result)
|
||||
self.assertEqual(len(result.errors), 1)
|
||||
|
||||
def testBufferSetUpModule(self):
|
||||
result = unittest.TestResult()
|
||||
result.buffer = True
|
||||
|
||||
class Foo(unittest.TestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
class Module(object):
|
||||
@staticmethod
|
||||
def setUpModule():
|
||||
1/0
|
||||
|
||||
Foo.__module__ = 'Module'
|
||||
sys.modules['Module'] = Module
|
||||
self.addCleanup(sys.modules.pop, 'Module')
|
||||
suite = unittest.TestSuite([Foo('test_foo')])
|
||||
suite(result)
|
||||
self.assertEqual(len(result.errors), 1)
|
||||
|
||||
def testBufferTearDownModule(self):
|
||||
result = unittest.TestResult()
|
||||
result.buffer = True
|
||||
|
||||
class Foo(unittest.TestCase):
|
||||
def test_foo(self):
|
||||
pass
|
||||
class Module(object):
|
||||
@staticmethod
|
||||
def tearDownModule():
|
||||
1/0
|
||||
|
||||
Foo.__module__ = 'Module'
|
||||
sys.modules['Module'] = Module
|
||||
self.addCleanup(sys.modules.pop, 'Module')
|
||||
suite = unittest.TestSuite([Foo('test_foo')])
|
||||
suite(result)
|
||||
self.assertEqual(len(result.errors), 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -1657,6 +1657,12 @@ class URLopener:
|
|||
headers["Authorization"] = "Basic %s" % auth
|
||||
if realhost:
|
||||
headers["Host"] = realhost
|
||||
|
||||
# Add Connection:close as we don't support persistent connections yet.
|
||||
# This helps in closing the socket and avoiding ResourceWarning
|
||||
|
||||
headers["Connection"] = "close"
|
||||
|
||||
for header, value in self.addheaders:
|
||||
headers[header] = value
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
#
|
||||
# NodeList -- lightest possible NodeList implementation
|
||||
#
|
||||
# EmptyNodeList -- lightest possible NodeList that is guarateed to
|
||||
# EmptyNodeList -- lightest possible NodeList that is guaranteed to
|
||||
# remain empty (immutable)
|
||||
#
|
||||
# StringTypes -- tuple of defined string types
|
||||
|
|
|
@ -1905,7 +1905,7 @@ def _clone_node(node, deep, newOwnerDocument):
|
|||
e._call_user_data_handler(operation, n, entity)
|
||||
else:
|
||||
# Note the cloning of Document and DocumentType nodes is
|
||||
# implemenetation specific. minidom handles those cases
|
||||
# implementation specific. minidom handles those cases
|
||||
# directly in the cloneNode() methods.
|
||||
raise xml.dom.NotSupportedErr("Cannot clone node %s" % repr(node))
|
||||
|
||||
|
|
|
@ -240,7 +240,7 @@ class SimpleXMLRPCDispatcher:
|
|||
marshalled data. For backwards compatibility, a dispatch
|
||||
function can be provided as an argument (see comment in
|
||||
SimpleXMLRPCRequestHandler.do_POST) but overriding the
|
||||
existing method through subclassing is the prefered means
|
||||
existing method through subclassing is the preferred means
|
||||
of changing method dispatch behavior.
|
||||
"""
|
||||
|
||||
|
|
|
@ -362,7 +362,7 @@ def fileContents(fn):
|
|||
|
||||
def runCommand(commandline):
|
||||
"""
|
||||
Run a command and raise RuntimeError if it fails. Output is surpressed
|
||||
Run a command and raise RuntimeError if it fails. Output is suppressed
|
||||
unless the command fails.
|
||||
"""
|
||||
fd = os.popen(commandline, 'r')
|
||||
|
|
|
@ -227,6 +227,7 @@ Jaromir Dolecek
|
|||
Ismail Donmez
|
||||
Marcos Donolo
|
||||
Dima Dorfman
|
||||
Yves Dorfsman
|
||||
Cesar Douady
|
||||
Dean Draayer
|
||||
Fred L. Drake, Jr.
|
||||
|
@ -450,6 +451,7 @@ Tamito Kajiyama
|
|||
Peter van Kampen
|
||||
Rafe Kaplan
|
||||
Jacob Kaplan-Moss
|
||||
Arkady Koplyarov
|
||||
Lou Kates
|
||||
Hiroaki Kawai
|
||||
Sebastien Keim
|
||||
|
|
52
Misc/NEWS
52
Misc/NEWS
|
@ -10,6 +10,9 @@ What's New in Python 3.3 Alpha 1?
|
|||
Core and Builtins
|
||||
-----------------
|
||||
|
||||
- Issue #11320: fix bogus memory management in Modules/getpath.c, leading to
|
||||
a possible crash when calling Py_SetPath().
|
||||
|
||||
- _ast.__version__ is now a Mercurial integer and hex revision.
|
||||
|
||||
- Issue #11432: A bug was introduced in subprocess.Popen on posix systems with
|
||||
|
@ -72,8 +75,37 @@ Core and Builtins
|
|||
Library
|
||||
-------
|
||||
|
||||
- Issue #5421: Fix misleading error message when one of socket.sendto()'s
|
||||
arguments has the wrong type. Patch by Nikita Vetoshkin.
|
||||
|
||||
- Issue #10812: Add some extra posix functions to the os module.
|
||||
|
||||
- Issue #10979: unittest stdout buffering now works with class and module
|
||||
setup and teardown.
|
||||
|
||||
- Issue #11577: fix ResourceWarning triggered by improved binhex test coverage
|
||||
|
||||
- Issue #11243: fix the parameter querying methods of Message to work if
|
||||
the headers contain un-encoded non-ASCII data.
|
||||
|
||||
- Issue #11401: fix handling of headers with no value; this fixes a regression
|
||||
relative to Python2 and the result is now the same as it was in Python2.
|
||||
|
||||
- Issue #9298: base64 bodies weren't being folded to line lengths less than 78,
|
||||
which was a regression relative to Python2. Unlike Python2, the last line
|
||||
of the folded body now ends with a carriage return.
|
||||
|
||||
- Issue #11560: shutil.unpack_archive now correctly handles the format
|
||||
parameter. Patch by Evan Dandrea.
|
||||
|
||||
- Issue #5870: Add `subprocess.DEVNULL` constant.
|
||||
|
||||
- Issue #11133: fix two cases where inspect.getattr_static can trigger code
|
||||
execution. Patch by Daniel Urban.
|
||||
execution. Patch by Andreas Stührk.
|
||||
|
||||
- Issue #11569: use absolute path to the sysctl command in multiprocessing to
|
||||
ensure that it will be found regardless of the shell PATH. This ensures
|
||||
that multiprocessing.cpu_count works on default installs of MacOSX.
|
||||
|
||||
- Issue #11501: disutils.archive_utils.make_zipfile no longer fails if zlib is
|
||||
not installed. Instead, the zipfile.ZIP_STORED compression is used to create
|
||||
|
@ -223,9 +255,25 @@ Tools/Demos
|
|||
Tests
|
||||
-----
|
||||
|
||||
- Issue #11577: improve test coverage of binhex.py. Patch by Arkady Koplyarov.
|
||||
|
||||
- New test_crashers added to exercise the scripts in the Lib/test/crashers
|
||||
directory and confirm they fail as expected
|
||||
|
||||
- Issue #11578: added test for the timeit module. Patch Michael Henry.
|
||||
|
||||
- Issue #11503: improve test coverage of posixpath.py. Patch by Evan Dandrea.
|
||||
|
||||
- Issue #11505: improves test coverage of string.py. Patch by Alicia
|
||||
Arlen.
|
||||
|
||||
- Issue #11548: Improve test coverage of the shutil module. Patch by
|
||||
Evan Dandrea.
|
||||
|
||||
- Issue #11554: Reactivated test_email_codecs.
|
||||
|
||||
- Issue #11505: improves test coverage of string.py
|
||||
- Issue #11505: improves test coverage of string.py. Patch by Alicia
|
||||
Arlen
|
||||
|
||||
- Issue #11490: test_subprocess:test_leaking_fds_on_error no longer gives a
|
||||
false positive if the last directory in the path is inaccessible.
|
||||
|
|
|
@ -3317,7 +3317,7 @@ PyCFuncPtr_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
|
|||
/* XXX XXX This would allow to pass additional options. For COM
|
||||
method *implementations*, we would probably want different
|
||||
behaviour than in 'normal' callback functions: return a HRESULT if
|
||||
an exception occurrs in the callback, and print the traceback not
|
||||
an exception occurs in the callback, and print the traceback not
|
||||
only on the console, but also to OutputDebugString() or something
|
||||
like that.
|
||||
*/
|
||||
|
|
|
@ -202,7 +202,7 @@ static void _CallPythonObject(void *mem,
|
|||
/* XXX XXX XX
|
||||
We have the problem that c_byte or c_short have dict->size of
|
||||
1 resp. 4, but these parameters are pushed as sizeof(int) bytes.
|
||||
BTW, the same problem occurrs when they are pushed as parameters
|
||||
BTW, the same problem occurs when they are pushed as parameters
|
||||
*/
|
||||
} else if (dict) {
|
||||
/* Hm, shouldn't we use PyCData_AtAddress() or something like that instead? */
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
|
||||
4. _ctypes_callproc is then called with the 'callargs' tuple. _ctypes_callproc first
|
||||
allocates two arrays. The first is an array of 'struct argument' items, the
|
||||
second array has 'void *' entried.
|
||||
second array has 'void *' entries.
|
||||
|
||||
5. If 'converters' are present (converters is a sequence of argtypes'
|
||||
from_param methods), for each item in 'callargs' converter is called and the
|
||||
|
|
|
@ -242,7 +242,7 @@ partial_repr(partialobject *pto)
|
|||
__reduce__ by itself doesn't support getting kwargs in the unpickle
|
||||
operation so we define a __setstate__ that replaces all the information
|
||||
about the partial. If we only replaced part of it someone would use
|
||||
it as a hook to do stange things.
|
||||
it as a hook to do strange things.
|
||||
*/
|
||||
|
||||
static PyObject *
|
||||
|
|
|
@ -50,7 +50,7 @@ PyDoc_STRVAR(iobase_doc,
|
|||
"stream.\n"
|
||||
"\n"
|
||||
"IOBase also supports the :keyword:`with` statement. In this example,\n"
|
||||
"fp is closed after the suite of the with statment is complete:\n"
|
||||
"fp is closed after the suite of the with statement is complete:\n"
|
||||
"\n"
|
||||
"with open('spam.txt', 'r') as fp:\n"
|
||||
" fp.write('Spam and eggs!')\n");
|
||||
|
|
|
@ -157,7 +157,7 @@ write_str(stringio *self, PyObject *obj)
|
|||
0 lo string_size hi
|
||||
| |<---used--->|<----------available----------->|
|
||||
| | <--to pad-->|<---to write---> |
|
||||
0 buf positon
|
||||
0 buf position
|
||||
|
||||
*/
|
||||
memset(self->buf + self->string_size, '\0',
|
||||
|
|
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue