bpo-46364: Use sockets for stdin of asyncio only on AIX (GH-30596)

Signed-off-by: Christoph Hamsen <hamsen.christoph@posteo.de>
Co-authored-by: July Tikhonov <july.tikh@gmail.com>
(cherry picked from commit c9ed0327a9)

Co-authored-by: Christoph Hamsen <37963496+xopham@users.noreply.github.com>
This commit is contained in:
Miss Islington (bot) 2022-10-13 10:27:31 -07:00 committed by GitHub
parent c7761bbc32
commit 595ef03c7c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 4 deletions

View file

@ -427,6 +427,26 @@ class SubprocessMixin:
self.assertEqual(output, None)
self.assertEqual(exitcode, 0)
@unittest.skipIf(sys.platform != 'linux', "Don't have /dev/stdin")
def test_devstdin_input(self):
async def devstdin_input(message):
code = 'file = open("/dev/stdin"); data = file.read(); print(len(data))'
proc = await asyncio.create_subprocess_exec(
sys.executable, '-c', code,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
close_fds=False,
)
stdout, stderr = await proc.communicate(message)
exitcode = await proc.wait()
return (stdout, exitcode)
output, exitcode = self.loop.run_until_complete(devstdin_input(b'abc'))
self.assertEqual(output.rstrip(), b'3')
self.assertEqual(exitcode, 0)
def test_cancel_process_wait(self):
# Issue #23140: cancel Process.wait()