mirror of
https://github.com/python/cpython.git
synced 2025-08-28 12:45:07 +00:00
gh-59705: Add _thread.set_name() function (#127338)
On Linux, threading.Thread now sets the thread name to the operating system. * configure now checks if pthread_getname_np() and pthread_setname_np() functions are available. * Add PYTHREAD_NAME_MAXLEN macro. * Add _thread._NAME_MAXLEN constant for test_threading. Co-authored-by: Serhiy Storchaka <storchaka@gmail.com>
This commit is contained in:
parent
12680ec5bd
commit
67b18a18b6
8 changed files with 342 additions and 2 deletions
|
@ -2104,6 +2104,66 @@ class MiscTestCase(unittest.TestCase):
|
|||
support.check__all__(self, threading, ('threading', '_thread'),
|
||||
extra=extra, not_exported=not_exported)
|
||||
|
||||
@unittest.skipUnless(hasattr(_thread, 'set_name'), "missing _thread.set_name")
|
||||
@unittest.skipUnless(hasattr(_thread, '_get_name'), "missing _thread._get_name")
|
||||
def test_set_name(self):
|
||||
# set_name() limit in bytes
|
||||
truncate = getattr(_thread, "_NAME_MAXLEN", None)
|
||||
limit = truncate or 100
|
||||
|
||||
tests = [
|
||||
# test short ASCII name
|
||||
"CustomName",
|
||||
|
||||
# test short non-ASCII name
|
||||
"namé€",
|
||||
|
||||
# embedded null character: name is truncated
|
||||
# at the first null character
|
||||
"embed\0null",
|
||||
|
||||
# Test long ASCII names (not truncated)
|
||||
"x" * limit,
|
||||
|
||||
# Test long ASCII names (truncated)
|
||||
"x" * (limit + 10),
|
||||
|
||||
# Test long non-ASCII name (truncated)
|
||||
"x" * (limit - 1) + "é€",
|
||||
]
|
||||
if os_helper.FS_NONASCII:
|
||||
tests.append(f"nonascii:{os_helper.FS_NONASCII}")
|
||||
if os_helper.TESTFN_UNENCODABLE:
|
||||
tests.append(os_helper.TESTFN_UNENCODABLE)
|
||||
|
||||
if sys.platform.startswith("solaris"):
|
||||
encoding = "utf-8"
|
||||
else:
|
||||
encoding = sys.getfilesystemencoding()
|
||||
|
||||
def work():
|
||||
nonlocal work_name
|
||||
work_name = _thread._get_name()
|
||||
|
||||
for name in tests:
|
||||
encoded = name.encode(encoding, "replace")
|
||||
if b'\0' in encoded:
|
||||
encoded = encoded.split(b'\0', 1)[0]
|
||||
if truncate is not None:
|
||||
encoded = encoded[:truncate]
|
||||
if sys.platform.startswith("solaris"):
|
||||
expected = encoded.decode("utf-8", "surrogateescape")
|
||||
else:
|
||||
expected = os.fsdecode(encoded)
|
||||
|
||||
with self.subTest(name=name, expected=expected):
|
||||
work_name = None
|
||||
thread = threading.Thread(target=work, name=name)
|
||||
thread.start()
|
||||
thread.join()
|
||||
self.assertEqual(work_name, expected,
|
||||
f"{len(work_name)=} and {len(expected)=}")
|
||||
|
||||
|
||||
class InterruptMainTests(unittest.TestCase):
|
||||
def check_interrupt_main_with_signal_handler(self, signum):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue