mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
1. Eliminate putrequest(): only used in asynccall(), merge it there.
2. Add additional debugging statements and enhance others. 3. Clarify comments. 4. Move SocketIO.nextseq class attribute to beginning of class.
This commit is contained in:
parent
8d81a012ef
commit
0a0e6c3b5e
1 changed files with 36 additions and 19 deletions
|
@ -90,6 +90,8 @@ objecttable = {}
|
||||||
|
|
||||||
class SocketIO:
|
class SocketIO:
|
||||||
|
|
||||||
|
nextseq = 0
|
||||||
|
|
||||||
def __init__(self, sock, objtable=None, debugging=None):
|
def __init__(self, sock, objtable=None, debugging=None):
|
||||||
self.mainthread = threading.currentThread()
|
self.mainthread = threading.currentThread()
|
||||||
if debugging is not None:
|
if debugging is not None:
|
||||||
|
@ -174,19 +176,21 @@ class SocketIO:
|
||||||
return ("EXCEPTION", (mod, name, args, tb))
|
return ("EXCEPTION", (mod, name, args, tb))
|
||||||
|
|
||||||
def remotecall(self, oid, methodname, args, kwargs):
|
def remotecall(self, oid, methodname, args, kwargs):
|
||||||
self.debug("remotecall:")
|
self.debug("calling asynccall via remotecall")
|
||||||
seq = self.asynccall(oid, methodname, args, kwargs)
|
seq = self.asynccall(oid, methodname, args, kwargs)
|
||||||
return self.asyncreturn(seq)
|
return self.asyncreturn(seq)
|
||||||
|
|
||||||
def asynccall(self, oid, methodname, args, kwargs):
|
def asynccall(self, oid, methodname, args, kwargs):
|
||||||
request = ("call", (oid, methodname, args, kwargs))
|
request = ("call", (oid, methodname, args, kwargs))
|
||||||
seq = self.putrequest(request)
|
seq = self.newseq()
|
||||||
self.debug(("asyncall:%d:" % seq), oid, methodname, args, kwargs)
|
self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
|
||||||
|
self.putmessage((seq, request))
|
||||||
return seq
|
return seq
|
||||||
|
|
||||||
def asyncreturn(self, seq):
|
def asyncreturn(self, seq):
|
||||||
|
self.debug("asyncreturn:%d:call getresponse(): " % seq)
|
||||||
response = self.getresponse(seq)
|
response = self.getresponse(seq)
|
||||||
self.debug(("asyncreturn:%d:" % seq), response)
|
self.debug(("asyncreturn:%d:response: " % seq), response)
|
||||||
return self.decoderesponse(response)
|
return self.decoderesponse(response)
|
||||||
|
|
||||||
def decoderesponse(self, response):
|
def decoderesponse(self, response):
|
||||||
|
@ -194,7 +198,7 @@ class SocketIO:
|
||||||
if how == "OK":
|
if how == "OK":
|
||||||
return what
|
return what
|
||||||
if how == "EXCEPTION":
|
if how == "EXCEPTION":
|
||||||
self.debug("decoderesponse: Internal EXCEPTION:", what)
|
self.debug("decoderesponse: EXCEPTION:", what)
|
||||||
mod, name, args, tb = what
|
mod, name, args, tb = what
|
||||||
self.traceback = tb
|
self.traceback = tb
|
||||||
if mod: # not string exception
|
if mod: # not string exception
|
||||||
|
@ -220,6 +224,12 @@ class SocketIO:
|
||||||
raise SystemError, (how, what)
|
raise SystemError, (how, what)
|
||||||
|
|
||||||
def mainloop(self):
|
def mainloop(self):
|
||||||
|
"""Listen on socket until I/O not ready or EOF
|
||||||
|
|
||||||
|
pollpacket() will loop looking for seq number None, which never
|
||||||
|
comes. The loop will exit when self.ioready() returns 0.
|
||||||
|
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.getresponse(None)
|
self.getresponse(None)
|
||||||
except EOFError:
|
except EOFError:
|
||||||
|
@ -242,8 +252,10 @@ class SocketIO:
|
||||||
return obj
|
return obj
|
||||||
|
|
||||||
def _getresponse(self, myseq):
|
def _getresponse(self, myseq):
|
||||||
|
self.debug("_getresponse:myseq:", myseq)
|
||||||
if threading.currentThread() is self.mainthread:
|
if threading.currentThread() is self.mainthread:
|
||||||
# Main thread: does all reading of requests and responses
|
# Main thread: does all reading of requests or responses
|
||||||
|
# Loop here until there is message traffic on the socket
|
||||||
while 1:
|
while 1:
|
||||||
response = self.pollresponse(myseq, None)
|
response = self.pollresponse(myseq, None)
|
||||||
if response is not None:
|
if response is not None:
|
||||||
|
@ -259,21 +271,14 @@ class SocketIO:
|
||||||
del self.responses[myseq]
|
del self.responses[myseq]
|
||||||
del self.cvars[myseq]
|
del self.cvars[myseq]
|
||||||
self.statelock.release()
|
self.statelock.release()
|
||||||
return response
|
return response # might be None
|
||||||
|
|
||||||
def putrequest(self, request):
|
|
||||||
seq = self.newseq()
|
|
||||||
self.putmessage((seq, request))
|
|
||||||
return seq
|
|
||||||
|
|
||||||
nextseq = 0
|
|
||||||
|
|
||||||
def newseq(self):
|
def newseq(self):
|
||||||
self.nextseq = seq = self.nextseq + 2
|
self.nextseq = seq = self.nextseq + 2
|
||||||
return seq
|
return seq
|
||||||
|
|
||||||
def putmessage(self, message):
|
def putmessage(self, message):
|
||||||
##self.debug("putmessage: ", message)
|
self.debug("putmessage:%d:" % message[0])
|
||||||
try:
|
try:
|
||||||
s = pickle.dumps(message)
|
s = pickle.dumps(message)
|
||||||
except:
|
except:
|
||||||
|
@ -337,16 +342,28 @@ class SocketIO:
|
||||||
return message
|
return message
|
||||||
|
|
||||||
def pollresponse(self, myseq, wait=0.0):
|
def pollresponse(self, myseq, wait=0.0):
|
||||||
# Loop while there's no more buffered input or until specific response
|
"""Handle messages received on the socket.
|
||||||
|
|
||||||
|
Some messages received may be asynchronous 'call' commands, and
|
||||||
|
some may be responses intended for other threads.
|
||||||
|
|
||||||
|
Loop until message with myseq sequence number is received. Save others
|
||||||
|
in self.responses and notify the owning thread, except that 'call'
|
||||||
|
commands are handed off to localcall() and the response sent back
|
||||||
|
across the link with the appropriate sequence number.
|
||||||
|
|
||||||
|
"""
|
||||||
while 1:
|
while 1:
|
||||||
message = self.pollmessage(wait)
|
message = self.pollmessage(wait)
|
||||||
if message is None:
|
if message is None: # socket not ready
|
||||||
return None
|
return None
|
||||||
wait = 0.0
|
wait = 0.0
|
||||||
seq, resq = message
|
seq, resq = message
|
||||||
|
self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
|
||||||
if resq[0] == "call":
|
if resq[0] == "call":
|
||||||
self.debug("call_localcall:%d:" % seq)
|
self.debug("pollresponse:%d:call_localcall" % seq)
|
||||||
response = self.localcall(resq)
|
response = self.localcall(resq)
|
||||||
|
self.debug("pollresponse:%d:response:%s" % (seq, response))
|
||||||
self.putmessage((seq, response))
|
self.putmessage((seq, response))
|
||||||
continue
|
continue
|
||||||
elif seq == myseq:
|
elif seq == myseq:
|
||||||
|
@ -410,7 +427,7 @@ class RPCClient(SocketIO):
|
||||||
def accept(self):
|
def accept(self):
|
||||||
working_sock, address = self.listening_sock.accept()
|
working_sock, address = self.listening_sock.accept()
|
||||||
if self.debugging:
|
if self.debugging:
|
||||||
print>>sys.__stderr__, "** Connection request from ", address
|
print>>sys.__stderr__, "****** Connection request from ", address
|
||||||
if address[0] == '127.0.0.1':
|
if address[0] == '127.0.0.1':
|
||||||
SocketIO.__init__(self, working_sock)
|
SocketIO.__init__(self, working_sock)
|
||||||
else:
|
else:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue