bpo-28134: Auto-detect socket values from file descriptor (#1349)

Fix socket(fileno=fd) by auto-detecting the socket's family, type,
and proto from the file descriptor. The auto-detection can be overruled
by passing in family, type, and proto explicitly.

Without the fix, all socket except for TCP/IP over IPv4 are basically broken:

>>> s = socket.create_connection(('www.python.org', 443))
>>> s
<socket.socket fd=3, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('2003:58:bc4a:3b00:56ee:75ff:fe47:ca7b', 59730, 0, 0), raddr=('2a04:4e42:1b::223', 443, 0, 0)>
>>> socket.socket(fileno=s.fileno())
<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('2003:58:bc4a:3b00::%2550471192', 59730, 0, 2550471192), raddr=('2a04:4e42:1b:0:700c:e70b:ff7f:0%2550471192', 443, 0, 2550471192)>

Signed-off-by: Christian Heimes <christian@python.org>
This commit is contained in:
Christian Heimes 2018-01-29 22:37:58 +01:00 committed by GitHub
parent 72a0d218dc
commit b6e43af669
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 133 additions and 9 deletions

View file

@ -20,6 +20,7 @@ import math
import pickle
import struct
import random
import shutil
import string
import _thread as thread
import threading
@ -1284,6 +1285,7 @@ class GeneralModuleTests(unittest.TestCase):
def testCloseException(self):
sock = socket.socket()
sock.bind((socket._LOCALHOST, 0))
socket.socket(fileno=sock.fileno()).close()
try:
sock.close()
@ -1636,9 +1638,11 @@ class GeneralModuleTests(unittest.TestCase):
) + 1
with socket.socket(
family=unknown_family, type=unknown_type, fileno=fd) as s:
family=unknown_family, type=unknown_type, proto=23,
fileno=fd) as s:
self.assertEqual(s.family, unknown_family)
self.assertEqual(s.type, unknown_type)
self.assertEqual(s.proto, 23)
@unittest.skipUnless(hasattr(os, 'sendfile'), 'test needs os.sendfile()')
def test__sendfile_use_sendfile(self):
@ -1658,6 +1662,45 @@ class GeneralModuleTests(unittest.TestCase):
with self.assertRaises(TypeError):
sock._sendfile_use_sendfile(File(None))
def _test_socket_fileno(self, s, family, stype):
self.assertEqual(s.family, family)
self.assertEqual(s.type, stype)
fd = s.fileno()
s2 = socket.socket(fileno=fd)
self.addCleanup(s2.close)
# detach old fd to avoid double close
s.detach()
self.assertEqual(s2.family, family)
self.assertEqual(s2.type, stype)
self.assertEqual(s2.fileno(), fd)
def test_socket_fileno(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind((support.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_STREAM)
if hasattr(socket, "SOCK_DGRAM"):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
s.bind((support.HOST, 0))
self._test_socket_fileno(s, socket.AF_INET, socket.SOCK_DGRAM)
if support.IPV6_ENABLED:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind((support.HOSTv6, 0, 0, 0))
self._test_socket_fileno(s, socket.AF_INET6, socket.SOCK_STREAM)
if hasattr(socket, "AF_UNIX"):
tmpdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmpdir)
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.addCleanup(s.close)
s.bind(os.path.join(tmpdir, 'socket'))
self._test_socket_fileno(s, socket.AF_UNIX, socket.SOCK_STREAM)
@unittest.skipUnless(HAVE_SOCKET_CAN, 'SocketCan required for this test.')
class BasicCANTest(unittest.TestCase):