Create xmlrpc package. Issue #2886.

This commit is contained in:
Georg Brandl 2008-05-26 11:14:17 +00:00
parent 7f986acb01
commit 38eceaaf0c
17 changed files with 462 additions and 668 deletions

View file

@ -1,147 +0,0 @@
#! /usr/bin/env python
"""Test script for the dbm.open function based on testdumbdbm.py"""
import os
import unittest
import dbm
import glob
import test.support
_fname = test.support.TESTFN
#
# Iterates over every database module supported by dbm currently available,
# setting dbm to use each in turn, and yielding that module
#
def dbm_iterator():
old_default = dbm._defaultmod
for module in dbm._modules.values():
dbm._defaultmod = module
yield module
dbm._defaultmod = old_default
#
# Clean up all scratch databases we might have created during testing
#
def delete_files():
# we don't know the precise name the underlying database uses
# so we use glob to locate all names
for f in glob.glob(_fname + "*"):
test.support.unlink(f)
class AnyDBMTestCase(unittest.TestCase):
_dict = {'0': b'',
'a': b'Python:',
'b': b'Programming',
'c': b'the',
'd': b'way',
'f': b'Guido',
'g': b'intended',
}
def __init__(self, *args):
unittest.TestCase.__init__(self, *args)
def test_anydbm_creation(self):
f = dbm.open(_fname, 'c')
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key.encode("ascii")] = self._dict[key]
self.read_helper(f)
f.close()
def test_anydbm_modification(self):
self.init_db()
f = dbm.open(_fname, 'c')
self._dict['g'] = f[b'g'] = b"indented"
self.read_helper(f)
f.close()
def test_anydbm_read(self):
self.init_db()
f = dbm.open(_fname, 'r')
self.read_helper(f)
f.close()
def test_anydbm_keys(self):
self.init_db()
f = dbm.open(_fname, 'r')
keys = self.keys_helper(f)
f.close()
def test_anydbm_access(self):
self.init_db()
f = dbm.open(_fname, 'r')
key = "a".encode("ascii")
assert(key in f)
assert(f[key] == b"Python:")
f.close()
def read_helper(self, f):
keys = self.keys_helper(f)
for key in self._dict:
self.assertEqual(self._dict[key], f[key.encode("ascii")])
def init_db(self):
f = dbm.open(_fname, 'n')
for k in self._dict:
f[k.encode("ascii")] = self._dict[k]
f.close()
def keys_helper(self, f):
keys = sorted(k.decode("ascii") for k in f.keys())
dkeys = sorted(self._dict.keys())
self.assertEqual(keys, dkeys)
return keys
def tearDown(self):
delete_files()
def setUp(self):
delete_files()
class WhichDBTestCase(unittest.TestCase):
# Actual test methods are added to namespace after class definition.
def __init__(self, *args):
unittest.TestCase.__init__(self, *args)
def test_whichdb(self):
for module in dbm_iterator():
# Check whether whichdb correctly guesses module name
# for databases opened with "module" module.
# Try with empty files first
name = module.__name__
if name == 'dbm.dumb':
continue # whichdb can't support dbm.dumb
test.support.unlink(_fname)
f = module.open(_fname, 'c')
f.close()
self.assertEqual(name, dbm.whichdb(_fname))
# Now add a key
f = module.open(_fname, 'w')
f[b"1"] = b"1"
# and test that we can find it
self.assertTrue(b"1" in f)
# and read it
self.assertTrue(f[b"1"] == b"1")
f.close()
self.assertEqual(name, dbm.whichdb(_fname))
def tearDown(self):
delete_files()
def setUp(self):
delete_files()
def test_main():
try:
for module in dbm_iterator():
test.support.run_unittest(AnyDBMTestCase, WhichDBTestCase)
finally:
delete_files()
if __name__ == "__main__":
test_main()

View file

@ -1,10 +1,9 @@
from DocXMLRPCServer import DocXMLRPCServer
from xmlrpc.server import DocXMLRPCServer
import httplib
from test import support
import threading
import time
import unittest
import xmlrpclib
PORT = None

View file

@ -3,8 +3,8 @@ import datetime
import sys
import time
import unittest
import xmlrpclib
import SimpleXMLRPCServer
import xmlrpc.client as xmlrpclib
import xmlrpc.server
import threading
import mimetools
import httplib
@ -160,9 +160,9 @@ class FaultTestCase(unittest.TestCase):
# this will raise AttirebuteError because code don't want us to use
# private methods
self.assertRaises(AttributeError,
SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
xmlrpc.server.resolve_dotted_attribute, str, '__add')
self.assert_(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
self.assert_(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
class DateTimeTestCase(unittest.TestCase):
def test_default(self):
@ -249,7 +249,7 @@ def http_server(evt, numrequests):
'''This is my function'''
return True
class MyXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer):
class MyXMLRPCServer(xmlrpc.server.SimpleXMLRPCServer):
def get_request(self):
# Ensure the socket is always non-blocking. On Linux, socket
# attributes are not inherited like they are on *BSD and Windows.
@ -306,7 +306,7 @@ def is_unavailable_exception(e):
class SimpleServerTestCase(unittest.TestCase):
def setUp(self):
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
self.evt = threading.Event()
# start server thread to handle requests
@ -326,7 +326,7 @@ class SimpleServerTestCase(unittest.TestCase):
raise RuntimeError("timeout reached, test has failed")
# disable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
def test_simple1(self):
try:
@ -443,9 +443,9 @@ class SimpleServerTestCase(unittest.TestCase):
def test_dotted_attribute(self):
# Raises an AttributeError because private methods are not allowed.
self.assertRaises(AttributeError,
SimpleXMLRPCServer.resolve_dotted_attribute, str, '__add')
xmlrpc.server.resolve_dotted_attribute, str, '__add')
self.assert_(SimpleXMLRPCServer.resolve_dotted_attribute(str, 'title'))
self.assert_(xmlrpc.server.resolve_dotted_attribute(str, 'title'))
# Get the test to run faster by sending a request with test_simple1.
# This avoids waiting for the socket timeout.
self.test_simple1()
@ -475,17 +475,17 @@ class FailingServerTestCase(unittest.TestCase):
# wait on the server thread to terminate
self.evt.wait()
# reset flag
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = False
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = False
# reset message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = mimetools.Message
def test_basic(self):
# check that flag is false by default
flagval = SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header
flagval = xmlrpc.server.SimpleXMLRPCServer._send_traceback_header
self.assertEqual(flagval, False)
# enable traceback reporting
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
# test a call that shouldn't fail just as a smoke test
try:
@ -499,7 +499,7 @@ class FailingServerTestCase(unittest.TestCase):
def test_fail_no_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
try:
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
@ -515,11 +515,11 @@ class FailingServerTestCase(unittest.TestCase):
def test_fail_with_info(self):
# use the broken message class
SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
xmlrpc.server.SimpleXMLRPCRequestHandler.MessageClass = FailingMessageClass
# Check that errors in the server send back exception/traceback
# info when flag is set
SimpleXMLRPCServer.SimpleXMLRPCServer._send_traceback_header = True
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
try:
p = xmlrpclib.ServerProxy('http://localhost:%d' % PORT)
@ -536,7 +536,7 @@ class FailingServerTestCase(unittest.TestCase):
class CGIHandlerTestCase(unittest.TestCase):
def setUp(self):
self.cgi = SimpleXMLRPCServer.CGIXMLRPCRequestHandler()
self.cgi = xmlrpc.server.CGIXMLRPCRequestHandler()
def tearDown(self):
self.cgi = None

View file

@ -6,7 +6,7 @@ import sys
import unittest
from test import support
import xmlrpclib
import xmlrpclib.client as xmlrpclib
class CurrentTimeTest(unittest.TestCase):