This commit is contained in:
Jeremiah Paige 2025-12-23 01:07:50 -05:00 committed by GitHub
commit 97b2972adc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 102 additions and 2 deletions

View file

@ -4,6 +4,7 @@ import os
import sys
import _collections_abc
from collections import deque
from errno import ENAMETOOLONG
from functools import wraps
from types import MethodType, GenericAlias
@ -802,8 +803,9 @@ class nullcontext(AbstractContextManager, AbstractAsyncContextManager):
class chdir(AbstractContextManager):
"""Non thread-safe context manager to change the current working directory."""
def __init__(self, path):
def __init__(self, path, *, ignore_errors=False):
self.path = path
self.ignore_errors = ignore_errors
self._old_cwd = []
def __enter__(self):
@ -811,4 +813,19 @@ class chdir(AbstractContextManager):
os.chdir(self.path)
def __exit__(self, *excinfo):
os.chdir(self._old_cwd.pop())
abs_return = self._old_cwd.pop()
try:
os.chdir(abs_return)
except OSError as exc:
if exc.errno == ENAMETOOLONG:
try:
cwd = os.getcwd()
if os.path.commonpath([abs_return, cwd]):
os.chdir(os.path.relpath(abs_return, cwd))
else:
raise
except OSError:
if not self.ignore_errors:
raise
elif not self.ignore_errors:
raise

View file

@ -1,7 +1,9 @@
"""Unit tests for contextlib.py, and other context managers."""
import io
import errno
import os
import shutil
import sys
import tempfile
import threading
@ -1361,6 +1363,87 @@ class TestChdir(unittest.TestCase):
self.assertEqual(str(re), "boom")
self.assertEqual(os.getcwd(), old_cwd)
def test_with_os_chdir(self):
old_cwd = os.getcwd()
target1 = os.path.join(os.path.dirname(__file__), 'data')
target2 = os.path.join(os.path.dirname(__file__), 'ziptestdata')
with chdir(target1):
self.assertEqual(os.getcwd(), target1)
os.chdir(target2)
self.assertEqual(os.getcwd(), target2)
self.assertEqual(os.getcwd(), old_cwd)
def test_long_path(self):
if not (hasattr(os, 'pathconf_names')
and 'PC_NAME_MAX' in os.pathconf_names
and 'PC_PATH_MAX' in os.pathconf_names):
self.skipTest('Cannot determine a path max to test against.')
nmax = os.pathconf(os.getcwd(), 'PC_NAME_MAX')
pmax = os.pathconf(os.getcwd(), 'PC_PATH_MAX')
long_path = old_cwd = os.getcwd()
dirname = '_' * nmax
try:
for _ in range(pmax // (nmax + 1)):
long_path = os.path.join(long_path, dirname)
os.mkdir(dirname)
os.chdir(dirname)
os.mkdir(dirname)
try:
os.getcwd()
except OSError as exc:
if exc.errno == errno.ENOENT:
# This will rais an exception in __enter__ when we are
# testing for an exception in __exit__
self.skipTest('Cannot retrieve cwd that is longer than '
'PC_PATH_MAX')
raise
with chdir(dirname):
self.assertEqual(os.getcwd(), os.path.join(long_path, dirname))
self.assertEqual(os.getcwd(), long_path)
finally:
os.chdir(old_cwd)
shutil.rmtree(os.path.join(old_cwd, dirname), ignore_errors=True)
def test_deleted_dir_without_ignore(self):
old_cwd = os.getcwd()
os.mkdir('dead')
os.mkdir('alive')
try:
with self.assertRaises(FileNotFoundError):
with chdir('dead'):
with chdir(os.path.join(old_cwd, 'alive')):
os.rmdir(os.path.join(old_cwd, 'dead'))
self.assertEqual(os.getcwd(), old_cwd)
finally:
os.chdir(old_cwd)
if os.path.isdir('dead'):
os.rmdir('dead')
if os.path.isdir('alive'):
os.rmdir('alive')
def test_deleted_dir_with_ignore(self):
old_cwd = os.getcwd()
os.mkdir('dead')
os.mkdir('alive')
try:
with chdir('dead', ignore_errors=True):
with chdir(os.path.join(old_cwd, 'alive'), ignore_errors=True):
os.rmdir(os.path.join(old_cwd, 'dead'))
self.assertEqual(os.getcwd(), old_cwd)
finally:
os.chdir(old_cwd)
if os.path.isdir('dead'):
os.rmdir('dead')
if os.path.isdir('alive'):
os.rmdir('alive')
if __name__ == "__main__":
unittest.main()