bpo-40275: Adding threading_helper submodule in test.support (GH-20263)

This commit is contained in:
Hai Shi 2020-05-28 06:10:27 +08:00 committed by GitHub
parent 7d80b35af1
commit e80697d687
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
46 changed files with 483 additions and 428 deletions

View file

@ -22,7 +22,7 @@ import unittest
from test import support, mock_socket
from test.support import hashlib_helper
from test.support import socket_helper
from test.support import threading_setup, threading_cleanup, join_thread
from test.support import threading_helper
from unittest.mock import Mock
HOST = socket_helper.HOST
@ -217,7 +217,7 @@ class DebuggingServerTests(unittest.TestCase):
maxDiff = None
def setUp(self):
self.thread_key = threading_setup()
self.thread_key = threading_helper.threading_setup()
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
# temporarily replace sys.stdout to capture DebuggingServer output
@ -249,7 +249,7 @@ class DebuggingServerTests(unittest.TestCase):
self.client_evt.set()
# wait for the server thread to terminate
self.serv_evt.wait()
join_thread(self.thread)
threading_helper.join_thread(self.thread)
# restore sys.stdout
sys.stdout = self.old_stdout
# restore DEBUGSTREAM
@ -257,7 +257,7 @@ class DebuggingServerTests(unittest.TestCase):
smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
del self.thread
self.doCleanups()
threading_cleanup(*self.thread_key)
threading_helper.threading_cleanup(*self.thread_key)
def get_output_without_xpeer(self):
test_output = self.output.getvalue()
@ -704,7 +704,7 @@ class TooLongLineTests(unittest.TestCase):
respdata = b'250 OK' + (b'.' * smtplib._MAXLINE * 2) + b'\n'
def setUp(self):
self.thread_key = threading_setup()
self.thread_key = threading_helper.threading_setup()
self.old_stdout = sys.stdout
self.output = io.StringIO()
sys.stdout = self.output
@ -722,10 +722,10 @@ class TooLongLineTests(unittest.TestCase):
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
join_thread(self.thread)
threading_helper.join_thread(self.thread)
del self.thread
self.doCleanups()
threading_cleanup(*self.thread_key)
threading_helper.threading_cleanup(*self.thread_key)
def testLineTooLong(self):
self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
@ -955,7 +955,7 @@ class SimSMTPServer(smtpd.SMTPServer):
class SMTPSimTests(unittest.TestCase):
def setUp(self):
self.thread_key = threading_setup()
self.thread_key = threading_helper.threading_setup()
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
self.serv_evt = threading.Event()
@ -978,10 +978,10 @@ class SMTPSimTests(unittest.TestCase):
self.client_evt.set()
# wait for the server thread to terminate
self.serv_evt.wait()
join_thread(self.thread)
threading_helper.join_thread(self.thread)
del self.thread
self.doCleanups()
threading_cleanup(*self.thread_key)
threading_helper.threading_cleanup(*self.thread_key)
def testBasic(self):
# smoke test
@ -1268,7 +1268,7 @@ class SMTPUTF8SimTests(unittest.TestCase):
maxDiff = None
def setUp(self):
self.thread_key = threading_setup()
self.thread_key = threading_helper.threading_setup()
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
self.serv_evt = threading.Event()
@ -1293,10 +1293,10 @@ class SMTPUTF8SimTests(unittest.TestCase):
self.client_evt.set()
# wait for the server thread to terminate
self.serv_evt.wait()
join_thread(self.thread)
threading_helper.join_thread(self.thread)
del self.thread
self.doCleanups()
threading_cleanup(*self.thread_key)
threading_helper.threading_cleanup(*self.thread_key)
def test_test_server_supports_extensions(self):
smtp = smtplib.SMTP(
@ -1397,7 +1397,7 @@ class SimSMTPAUTHInitialResponseServer(SimSMTPServer):
class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
def setUp(self):
self.thread_key = threading_setup()
self.thread_key = threading_helper.threading_setup()
self.real_getfqdn = socket.getfqdn
socket.getfqdn = mock_socket.getfqdn
self.serv_evt = threading.Event()
@ -1421,10 +1421,10 @@ class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
self.client_evt.set()
# wait for the server thread to terminate
self.serv_evt.wait()
join_thread(self.thread)
threading_helper.join_thread(self.thread)
del self.thread
self.doCleanups()
threading_cleanup(*self.thread_key)
threading_helper.threading_cleanup(*self.thread_key)
def testAUTH_PLAIN_initial_response_login(self):
self.serv.add_feature('AUTH PLAIN')