mirror of
				https://github.com/python/cpython.git
				synced 2025-10-25 15:58:57 +00:00 
			
		
		
		
	 2539e6744b
			
		
	
	
		2539e6744b
		
	
	
	
	
		
			
			Patch by Milan Oberkirch, developed as part of his 2014 GSOC project.
Note that this also fixes a bug in mock_socket ('getpeername' was returning a
simple string instead of the tuple required for IPvX protocols), a bug in
DebugServer with respect to handling binary data (should have been fixed when
decode_data was introduced, but wasn't found until this patch was written),
and a long-standing bug in DebugServer (it was printing an extra blank line at
the end of the displayed message text).
		
	
			
		
			
				
	
	
		
			159 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			159 lines
		
	
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| """Mock socket module used by the smtpd and smtplib tests.
 | |
| """
 | |
| 
 | |
| # imported for _GLOBAL_DEFAULT_TIMEOUT
 | |
| import socket as socket_module
 | |
| 
 | |
| # Mock socket module
 | |
| _defaulttimeout = None
 | |
| _reply_data = None
 | |
| 
 | |
| # This is used to queue up data to be read through socket.makefile, typically
 | |
| # *before* the socket object is even created. It is intended to handle a single
 | |
| # line which the socket will feed on recv() or makefile().
 | |
| def reply_with(line):
 | |
|     global _reply_data
 | |
|     _reply_data = line
 | |
| 
 | |
| 
 | |
| class MockFile:
 | |
|     """Mock file object returned by MockSocket.makefile().
 | |
|     """
 | |
|     def __init__(self, lines):
 | |
|         self.lines = lines
 | |
|     def readline(self, limit=-1):
 | |
|         result = self.lines.pop(0) + b'\r\n'
 | |
|         if limit >= 0:
 | |
|             # Re-insert the line, removing the \r\n we added.
 | |
|             self.lines.insert(0, result[limit:-2])
 | |
|             result = result[:limit]
 | |
|         return result
 | |
|     def close(self):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| class MockSocket:
 | |
|     """Mock socket object used by smtpd and smtplib tests.
 | |
|     """
 | |
|     def __init__(self, family=None):
 | |
|         global _reply_data
 | |
|         self.family = family
 | |
|         self.output = []
 | |
|         self.lines = []
 | |
|         if _reply_data:
 | |
|             self.lines.append(_reply_data)
 | |
|             _reply_data = None
 | |
|         self.conn = None
 | |
|         self.timeout = None
 | |
| 
 | |
|     def queue_recv(self, line):
 | |
|         self.lines.append(line)
 | |
| 
 | |
|     def recv(self, bufsize, flags=None):
 | |
|         data = self.lines.pop(0) + b'\r\n'
 | |
|         return data
 | |
| 
 | |
|     def fileno(self):
 | |
|         return 0
 | |
| 
 | |
|     def settimeout(self, timeout):
 | |
|         if timeout is None:
 | |
|             self.timeout = _defaulttimeout
 | |
|         else:
 | |
|             self.timeout = timeout
 | |
| 
 | |
|     def gettimeout(self):
 | |
|         return self.timeout
 | |
| 
 | |
|     def setsockopt(self, level, optname, value):
 | |
|         pass
 | |
| 
 | |
|     def getsockopt(self, level, optname, buflen=None):
 | |
|         return 0
 | |
| 
 | |
|     def bind(self, address):
 | |
|         pass
 | |
| 
 | |
|     def accept(self):
 | |
|         self.conn = MockSocket()
 | |
|         return self.conn, 'c'
 | |
| 
 | |
|     def getsockname(self):
 | |
|         return ('0.0.0.0', 0)
 | |
| 
 | |
|     def setblocking(self, flag):
 | |
|         pass
 | |
| 
 | |
|     def listen(self, backlog):
 | |
|         pass
 | |
| 
 | |
|     def makefile(self, mode='r', bufsize=-1):
 | |
|         handle = MockFile(self.lines)
 | |
|         return handle
 | |
| 
 | |
|     def sendall(self, buffer, flags=None):
 | |
|         self.last = data
 | |
|         self.output.append(data)
 | |
|         return len(data)
 | |
| 
 | |
|     def send(self, data, flags=None):
 | |
|         self.last = data
 | |
|         self.output.append(data)
 | |
|         return len(data)
 | |
| 
 | |
|     def getpeername(self):
 | |
|         return ('peer-address', 'peer-port')
 | |
| 
 | |
|     def close(self):
 | |
|         pass
 | |
| 
 | |
| 
 | |
| def socket(family=None, type=None, proto=None):
 | |
|     return MockSocket(family)
 | |
| 
 | |
| def create_connection(address, timeout=socket_module._GLOBAL_DEFAULT_TIMEOUT,
 | |
|                       source_address=None):
 | |
|     try:
 | |
|         int_port = int(address[1])
 | |
|     except ValueError:
 | |
|         raise error
 | |
|     ms = MockSocket()
 | |
|     if timeout is socket_module._GLOBAL_DEFAULT_TIMEOUT:
 | |
|         timeout = getdefaulttimeout()
 | |
|     ms.settimeout(timeout)
 | |
|     return ms
 | |
| 
 | |
| 
 | |
| def setdefaulttimeout(timeout):
 | |
|     global _defaulttimeout
 | |
|     _defaulttimeout = timeout
 | |
| 
 | |
| 
 | |
| def getdefaulttimeout():
 | |
|     return _defaulttimeout
 | |
| 
 | |
| 
 | |
| def getfqdn():
 | |
|     return ""
 | |
| 
 | |
| 
 | |
| def gethostname():
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def gethostbyname(name):
 | |
|     return ""
 | |
| 
 | |
| def getaddrinfo(*args, **kw):
 | |
|     return socket_module.getaddrinfo(*args, **kw)
 | |
| 
 | |
| gaierror = socket_module.gaierror
 | |
| error = socket_module.error
 | |
| 
 | |
| 
 | |
| # Constants
 | |
| AF_INET = socket_module.AF_INET
 | |
| AF_INET6 = socket_module.AF_INET6
 | |
| SOCK_STREAM = socket_module.SOCK_STREAM
 | |
| SOL_SOCKET = None
 | |
| SO_REUSEADDR = None
 |