bpo-41001: Add os.eventfd() (#20930)

Co-authored-by: Kyle Stanley <aeros167@gmail.com>
This commit is contained in:
Christian Heimes 2020-11-13 19:48:52 +01:00 committed by GitHub
parent bbeb2d266d
commit cd9fed6afb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 471 additions and 3 deletions

View file

@ -15,10 +15,12 @@ import locale
import mmap
import os
import pickle
import select
import shutil
import signal
import socket
import stat
import struct
import subprocess
import sys
import sysconfig
@ -59,6 +61,7 @@ try:
except ImportError:
INT_MAX = PY_SSIZE_T_MAX = sys.maxsize
from test.support.script_helper import assert_python_ok
from test.support import unix_shell
from test.support.os_helper import FakePath
@ -3528,6 +3531,89 @@ class MemfdCreateTests(unittest.TestCase):
self.assertFalse(os.get_inheritable(fd2))
@unittest.skipUnless(hasattr(os, 'eventfd'), 'requires os.eventfd')
@support.requires_linux_version(2, 6, 30)
class EventfdTests(unittest.TestCase):
def test_eventfd_initval(self):
def pack(value):
"""Pack as native uint64_t
"""
return struct.pack("@Q", value)
size = 8 # read/write 8 bytes
initval = 42
fd = os.eventfd(initval)
self.assertNotEqual(fd, -1)
self.addCleanup(os.close, fd)
self.assertFalse(os.get_inheritable(fd))
# test with raw read/write
res = os.read(fd, size)
self.assertEqual(res, pack(initval))
os.write(fd, pack(23))
res = os.read(fd, size)
self.assertEqual(res, pack(23))
os.write(fd, pack(40))
os.write(fd, pack(2))
res = os.read(fd, size)
self.assertEqual(res, pack(42))
# test with eventfd_read/eventfd_write
os.eventfd_write(fd, 20)
os.eventfd_write(fd, 3)
res = os.eventfd_read(fd)
self.assertEqual(res, 23)
def test_eventfd_semaphore(self):
initval = 2
flags = os.EFD_CLOEXEC | os.EFD_SEMAPHORE | os.EFD_NONBLOCK
fd = os.eventfd(initval, flags)
self.assertNotEqual(fd, -1)
self.addCleanup(os.close, fd)
# semaphore starts has initval 2, two reads return '1'
res = os.eventfd_read(fd)
self.assertEqual(res, 1)
res = os.eventfd_read(fd)
self.assertEqual(res, 1)
# third read would block
with self.assertRaises(BlockingIOError):
os.eventfd_read(fd)
with self.assertRaises(BlockingIOError):
os.read(fd, 8)
# increase semaphore counter, read one
os.eventfd_write(fd, 1)
res = os.eventfd_read(fd)
self.assertEqual(res, 1)
# next read would block, too
with self.assertRaises(BlockingIOError):
os.eventfd_read(fd)
def test_eventfd_select(self):
flags = os.EFD_CLOEXEC | os.EFD_NONBLOCK
fd = os.eventfd(0, flags)
self.assertNotEqual(fd, -1)
self.addCleanup(os.close, fd)
# counter is zero, only writeable
rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
self.assertEqual((rfd, wfd, xfd), ([], [fd], []))
# counter is non-zero, read and writeable
os.eventfd_write(fd, 23)
rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
self.assertEqual((rfd, wfd, xfd), ([fd], [fd], []))
self.assertEqual(os.eventfd_read(fd), 23)
# counter at max, only readable
os.eventfd_write(fd, (2**64) - 2)
rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
os.eventfd_read(fd)
class OSErrorTests(unittest.TestCase):
def setUp(self):
class Str(str):