1. Implement processing of user code in subprocess MainThread. Pass loop

is now interruptable on Windows.
2. Tweak signal.signal() wait parameters as called by various methods
   to improve I/O response, especially on Windows.
3. Debugger is disabled at this check-in pending further development.

M NEWS.txt
M PyShell.py
M rpc.py
M run.py
This commit is contained in:
Kurt B. Kaiser 2003-05-08 20:26:55 +00:00
parent c4607dadce
commit a00050f209
4 changed files with 267 additions and 193 deletions

View file

@ -28,17 +28,21 @@ accomplished in Idle.
"""
import sys
import os
import socket
import select
import SocketServer
import struct
import cPickle as pickle
import threading
import Queue
import traceback
import copy_reg
import types
import marshal
import interrupt
def unpickle_code(ms):
co = marshal.loads(ms)
assert isinstance(co, types.CodeType)
@ -98,8 +102,6 @@ class RPCServer(SocketServer.TCPServer):
raise
except SystemExit:
raise
except EOFError:
pass
except:
erf = sys.__stderr__
print>>erf, '\n' + '-'*40
@ -110,28 +112,29 @@ class RPCServer(SocketServer.TCPServer):
traceback.print_exc(file=erf)
print>>erf, '\n*** Unrecoverable, server exiting!'
print>>erf, '-'*40
import os
os._exit(0)
#----------------- end class RPCServer --------------------
objecttable = {}
request_queue = Queue.Queue(0)
response_queue = Queue.Queue(0)
class SocketIO:
nextseq = 0
def __init__(self, sock, objtable=None, debugging=None):
self.mainthread = threading.currentThread()
self.sockthread = threading.currentThread()
if debugging is not None:
self.debugging = debugging
self.sock = sock
if objtable is None:
objtable = objecttable
self.objtable = objtable
self.cvar = threading.Condition()
self.responses = {}
self.cvars = {}
self.interrupted = False
def close(self):
sock = self.sock
@ -139,6 +142,10 @@ class SocketIO:
if sock is not None:
sock.close()
def exithook(self):
"override for specific exit action"
os._exit()
def debug(self, *args):
if not self.debugging:
return
@ -156,13 +163,12 @@ class SocketIO:
except KeyError:
pass
def localcall(self, request):
def localcall(self, seq, request):
self.debug("localcall:", request)
try:
how, (oid, methodname, args, kwargs) = request
except TypeError:
return ("ERROR", "Bad request format")
assert how == "call"
if not self.objtable.has_key(oid):
return ("ERROR", "Unknown object id: %s" % `oid`)
obj = self.objtable[oid]
@ -178,14 +184,20 @@ class SocketIO:
return ("ERROR", "Unsupported method name: %s" % `methodname`)
method = getattr(obj, methodname)
try:
ret = method(*args, **kwargs)
if isinstance(ret, RemoteObject):
ret = remoteref(ret)
return ("OK", ret)
if how == 'CALL':
ret = method(*args, **kwargs)
if isinstance(ret, RemoteObject):
ret = remoteref(ret)
return ("OK", ret)
elif how == 'QUEUE':
request_queue.put((seq, (method, args, kwargs)))
return("QUEUED", None)
else:
return ("ERROR", "Unsupported message type: %s" % how)
except SystemExit:
raise
except socket.error:
pass
raise
except:
self.debug("localcall:EXCEPTION")
traceback.print_exc(file=sys.__stderr__)
@ -193,24 +205,37 @@ class SocketIO:
def remotecall(self, oid, methodname, args, kwargs):
self.debug("remotecall:asynccall: ", oid, methodname)
# XXX KBK 06Feb03 self.interrupted logic may not be necessary if
# subprocess is threaded.
if self.interrupted:
self.interrupted = False
raise KeyboardInterrupt
seq = self.asynccall(oid, methodname, args, kwargs)
return self.asyncreturn(seq)
def remotequeue(self, oid, methodname, args, kwargs):
self.debug("remotequeue:asyncqueue: ", oid, methodname)
seq = self.asyncqueue(oid, methodname, args, kwargs)
return self.asyncreturn(seq)
def asynccall(self, oid, methodname, args, kwargs):
request = ("call", (oid, methodname, args, kwargs))
request = ("CALL", (oid, methodname, args, kwargs))
seq = self.newseq()
if threading.currentThread() != self.sockthread:
cvar = threading.Condition()
self.cvars[seq] = cvar
self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
self.putmessage((seq, request))
return seq
def asyncqueue(self, oid, methodname, args, kwargs):
request = ("QUEUE", (oid, methodname, args, kwargs))
seq = self.newseq()
if threading.currentThread() != self.sockthread:
cvar = threading.Condition()
self.cvars[seq] = cvar
self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
self.putmessage((seq, request))
return seq
def asyncreturn(self, seq):
self.debug("asyncreturn:%d:call getresponse(): " % seq)
response = self.getresponse(seq, wait=None)
response = self.getresponse(seq, wait=0.05)
self.debug(("asyncreturn:%d:response: " % seq), response)
return self.decoderesponse(response)
@ -218,25 +243,36 @@ class SocketIO:
how, what = response
if how == "OK":
return what
if how == "QUEUED":
return None
if how == "EXCEPTION":
self.debug("decoderesponse: EXCEPTION")
return None
if how == "EOF":
self.debug("decoderesponse: EOF")
self.decode_interrupthook()
return None
if how == "ERROR":
self.debug("decoderesponse: Internal ERROR:", what)
raise RuntimeError, what
raise SystemError, (how, what)
def decode_interrupthook(self):
""
raise EOFError
def mainloop(self):
"""Listen on socket until I/O not ready or EOF
Main thread pollresponse() will loop looking for seq number None, which
pollresponse() will loop looking for seq number None, which
never comes, and exit on EOFError.
"""
try:
self.getresponse(myseq=None, wait=None)
self.getresponse(myseq=None, wait=0.05)
except EOFError:
pass
self.debug("mainloop:return")
return
def getresponse(self, myseq, wait):
response = self._getresponse(myseq, wait)
@ -256,23 +292,24 @@ class SocketIO:
def _getresponse(self, myseq, wait):
self.debug("_getresponse:myseq:", myseq)
if threading.currentThread() is self.mainthread:
# Main thread: does all reading of requests or responses
# Loop here, blocking each time until socket is ready.
if threading.currentThread() is self.sockthread:
# this thread does all reading of requests or responses
while 1:
response = self.pollresponse(myseq, wait)
if response is not None:
return response
else:
# Auxiliary thread: wait for notification from main thread
self.cvar.acquire()
self.cvars[myseq] = self.cvar
# wait for notification from socket handling thread
cvar = self.cvars[myseq]
cvar.acquire()
while not self.responses.has_key(myseq):
self.cvar.wait()
cvar.wait()
response = self.responses[myseq]
self.debug("_getresponse:%s: thread woke up: response: %s" %
(myseq, response))
del self.responses[myseq]
del self.cvars[myseq]
self.cvar.release()
cvar.release()
return response
def newseq(self):
@ -283,7 +320,7 @@ class SocketIO:
self.debug("putmessage:%d:" % message[0])
try:
s = pickle.dumps(message)
except:
except pickle.UnpicklingError:
print >>sys.__stderr__, "Cannot pickle:", `message`
raise
s = struct.pack("<i", len(s)) + s
@ -293,10 +330,13 @@ class SocketIO:
except AttributeError:
# socket was closed
raise IOError
except socket.error:
self.debug("putmessage:socketerror:pid:%s" % os.getpid())
os._exit(0)
else:
s = s[n:]
def ioready(self, wait=0.0):
def ioready(self, wait):
r, w, x = select.select([self.sock.fileno()], [], [], wait)
return len(r)
@ -304,7 +344,7 @@ class SocketIO:
bufneed = 4
bufstate = 0 # meaning: 0 => reading count; 1 => reading data
def pollpacket(self, wait=0.0):
def pollpacket(self, wait):
self._stage0()
if len(self.buffer) < self.bufneed:
if not self.ioready(wait):
@ -334,7 +374,7 @@ class SocketIO:
self.bufstate = 0
return packet
def pollmessage(self, wait=0.0):
def pollmessage(self, wait):
packet = self.pollpacket(wait)
if packet is None:
return None
@ -348,45 +388,97 @@ class SocketIO:
raise
return message
def pollresponse(self, myseq, wait=0.0):
def pollresponse(self, myseq, wait):
"""Handle messages received on the socket.
Some messages received may be asynchronous 'call' commands, and
some may be responses intended for other threads.
Some messages received may be asynchronous 'call' or 'queue' requests,
and some may be responses for other threads.
Loop until message with myseq sequence number is received. Save others
in self.responses and notify the owning thread, except that 'call'
commands are handed off to localcall() and the response sent back
across the link with the appropriate sequence number.
'call' requests are passed to self.localcall() with the expectation of
immediate execution, during which time the socket is not serviced.
'queue' requests are used for tasks (which may block or hang) to be
processed in a different thread. These requests are fed into
request_queue by self.localcall(). Responses to queued requests are
taken from response_queue and sent across the link with the associated
sequence numbers. Messages in the queues are (sequence_number,
request/response) tuples and code using this module removing messages
from the request_queue is responsible for returning the correct
sequence number in the response_queue.
pollresponse() will loop until a response message with the myseq
sequence number is received, and will save other responses in
self.responses and notify the owning thread.
"""
while 1:
message = self.pollmessage(wait)
if message is None: # socket not ready
# send queued response if there is one available
try:
qmsg = response_queue.get(0)
except Queue.Empty:
pass
else:
seq, response = qmsg
message = (seq, ('OK', response))
self.putmessage(message)
# poll for message on link
try:
message = self.pollmessage(wait)
if message is None: # socket not ready
return None
except EOFError:
self.handle_EOF()
return None
except AttributeError:
return None
#wait = 0.0 # poll on subsequent passes instead of blocking
seq, resq = message
how = resq[0]
self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
if resq[0] == "call":
# process or queue a request
if how in ("CALL", "QUEUE"):
self.debug("pollresponse:%d:localcall:call:" % seq)
response = self.localcall(resq)
response = self.localcall(seq, resq)
self.debug("pollresponse:%d:localcall:response:%s"
% (seq, response))
self.putmessage((seq, response))
if how == "CALL":
self.putmessage((seq, response))
elif how == "QUEUE":
# don't acknowledge the 'queue' request!
pass
continue
# return if completed message transaction
elif seq == myseq:
return resq
# must be a response for a different thread:
else:
self.cvar.acquire()
cv = self.cvars.get(seq)
cv = self.cvars.get(seq, None)
# response involving unknown sequence number is discarded,
# probably intended for prior incarnation
# probably intended for prior incarnation of server
if cv is not None:
cv.acquire()
self.responses[seq] = resq
cv.notify()
self.cvar.release()
cv.release()
continue
def handle_EOF(self):
"action taken upon link being closed by peer"
self.EOFhook()
self.debug("handle_EOF")
for key in self.cvars:
cv = self.cvars[key]
cv.acquire()
self.responses[key] = ('EOF', None)
cv.notify()
cv.release()
interrupt.interrupt_main()
# call our (possibly overridden) exit function
self.exithook()
def EOFhook(self):
"Classes using rpc client/server can override to augment EOF action"
pass
#----------------- end class SocketIO --------------------
class RemoteObject:
@ -465,7 +557,8 @@ class RPCProxy:
self.__getattributes()
if not self.__attributes.has_key(name):
raise AttributeError, name
__getattr__.DebuggerStepThrough=1
__getattr__.DebuggerStepThrough = 1
def __getattributes(self):
self.__attributes = self.sockio.remotecall(self.oid,