mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 16:27:06 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			91 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import array
 | |
| import unittest
 | |
| from test.support import run_unittest, import_module, get_attribute
 | |
| import os, struct
 | |
| fcntl = import_module('fcntl')
 | |
| termios = import_module('termios')
 | |
| get_attribute(termios, 'TIOCGPGRP') #Can't run tests without this feature
 | |
| 
 | |
| try:
 | |
|     tty = open("/dev/tty", "rb")
 | |
| except OSError:
 | |
|     raise unittest.SkipTest("Unable to open /dev/tty")
 | |
| else:
 | |
|     # Skip if another process is in foreground
 | |
|     r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
 | |
|     tty.close()
 | |
|     rpgrp = struct.unpack("i", r)[0]
 | |
|     if rpgrp not in (os.getpgrp(), os.getsid(0)):
 | |
|         raise unittest.SkipTest("Neither the process group nor the session "
 | |
|                                 "are attached to /dev/tty")
 | |
|     del tty, r, rpgrp
 | |
| 
 | |
| try:
 | |
|     import pty
 | |
| except ImportError:
 | |
|     pty = None
 | |
| 
 | |
| class IoctlTests(unittest.TestCase):
 | |
|     def test_ioctl(self):
 | |
|         # If this process has been put into the background, TIOCGPGRP returns
 | |
|         # the session ID instead of the process group id.
 | |
|         ids = (os.getpgrp(), os.getsid(0))
 | |
|         with open("/dev/tty", "rb") as tty:
 | |
|             r = fcntl.ioctl(tty, termios.TIOCGPGRP, "    ")
 | |
|             rpgrp = struct.unpack("i", r)[0]
 | |
|             self.assertIn(rpgrp, ids)
 | |
| 
 | |
|     def _check_ioctl_mutate_len(self, nbytes=None):
 | |
|         buf = array.array('i')
 | |
|         intsize = buf.itemsize
 | |
|         ids = (os.getpgrp(), os.getsid(0))
 | |
|         # A fill value unlikely to be in `ids`
 | |
|         fill = -12345
 | |
|         if nbytes is not None:
 | |
|             # Extend the buffer so that it is exactly `nbytes` bytes long
 | |
|             buf.extend([fill] * (nbytes // intsize))
 | |
|             self.assertEqual(len(buf) * intsize, nbytes)   # sanity check
 | |
|         else:
 | |
|             buf.append(fill)
 | |
|         with open("/dev/tty", "rb") as tty:
 | |
|             r = fcntl.ioctl(tty, termios.TIOCGPGRP, buf, 1)
 | |
|         rpgrp = buf[0]
 | |
|         self.assertEqual(r, 0)
 | |
|         self.assertIn(rpgrp, ids)
 | |
| 
 | |
|     def test_ioctl_mutate(self):
 | |
|         self._check_ioctl_mutate_len()
 | |
| 
 | |
|     def test_ioctl_mutate_1024(self):
 | |
|         # Issue #9758: a mutable buffer of exactly 1024 bytes wouldn't be
 | |
|         # copied back after the system call.
 | |
|         self._check_ioctl_mutate_len(1024)
 | |
| 
 | |
|     def test_ioctl_mutate_2048(self):
 | |
|         # Test with a larger buffer, just for the record.
 | |
|         self._check_ioctl_mutate_len(2048)
 | |
| 
 | |
|     def test_ioctl_signed_unsigned_code_param(self):
 | |
|         if not pty:
 | |
|             raise unittest.SkipTest('pty module required')
 | |
|         mfd, sfd = pty.openpty()
 | |
|         try:
 | |
|             if termios.TIOCSWINSZ < 0:
 | |
|                 set_winsz_opcode_maybe_neg = termios.TIOCSWINSZ
 | |
|                 set_winsz_opcode_pos = termios.TIOCSWINSZ & 0xffffffff
 | |
|             else:
 | |
|                 set_winsz_opcode_pos = termios.TIOCSWINSZ
 | |
|                 set_winsz_opcode_maybe_neg, = struct.unpack("i",
 | |
|                         struct.pack("I", termios.TIOCSWINSZ))
 | |
| 
 | |
|             our_winsz = struct.pack("HHHH",80,25,0,0)
 | |
|             # test both with a positive and potentially negative ioctl code
 | |
|             new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_pos, our_winsz)
 | |
|             new_winsz = fcntl.ioctl(mfd, set_winsz_opcode_maybe_neg, our_winsz)
 | |
|         finally:
 | |
|             os.close(mfd)
 | |
|             os.close(sfd)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 | 
