diff --git a/ptvsd/messaging.py b/ptvsd/messaging.py
index 8f6c7b0a..af24e2f5 100644
--- a/ptvsd/messaging.py
+++ b/ptvsd/messaging.py
@@ -135,6 +135,14 @@ class RequestFailure(Exception):
def __init__(self, message):
self.message = message
+ def __eq__(self, other):
+ if isinstance(other, RequestFailure) and other.message == self.message:
+ return True
+ return NotImplemented
+
+ def __ne__(self, other):
+ return not self == other
+
class Request(object):
"""Represents a request that was sent to the other party, and is awaiting or has
diff --git a/pytest.ini b/pytest.ini
index c96a6b67..bab96ff4 100644
--- a/pytest.ini
+++ b/pytest.ini
@@ -1,3 +1,3 @@
[pytest]
testpaths=pytests
-timeout=5
+timeout=15
diff --git a/pytests/conftest.py b/pytests/conftest.py
index e4e9324e..0d5f90b2 100644
--- a/pytests/conftest.py
+++ b/pytests/conftest.py
@@ -9,11 +9,34 @@ import pytest
import threading
import types
+from . import helpers
+from .helpers import print
from .helpers.session import DebugSession
+@pytest.hookimpl(hookwrapper=True, tryfirst=True)
+def pytest_runtest_makereport(item, call):
+ # Adds attributes such as setup_result, call_result etc to the item after the
+ # corresponding scope finished running its tests. This can be used in function-level
+ # fixtures to detect failures, e.g.:
+ #
+ # if request.node.call_result.failed: ...
+
+ outcome = yield
+ result = outcome.get_result()
+ setattr(item, result.when + '_result', result)
+
+
+@pytest.hookimpl(hookwrapper=True, tryfirst=True)
+def pytest_pyfunc_call(pyfuncitem):
+ # Resets the timestamp zero for every new test.
+ print('Timestamp clock reset to zero.', timestamped=False)
+ helpers.timestamp_zero = helpers.timestamp()
+ yield
+
+
@pytest.fixture
-def daemon():
+def daemon(request):
"""Provides a factory function for daemon threads. The returned thread is
started immediately, and it must not be alive by the time the test returns.
"""
@@ -30,8 +53,14 @@ def daemon():
yield factory
- for thread in daemons:
- assert not thread.is_alive()
+ try:
+ failed = request.node.call_result.failed
+ except AttributeError:
+ pass
+ else:
+ if not failed:
+ for thread in daemons:
+ assert not thread.is_alive()
@pytest.fixture
@@ -101,6 +130,12 @@ def debug_session(request):
session = DebugSession(request.param)
yield session
try:
- session.wait_for_exit()
+ try:
+ failed = request.node.call_result.failed
+ except AttributeError:
+ pass
+ else:
+ if not failed:
+ session.wait_for_exit()
finally:
session.stop()
diff --git a/pytests/func/test_multiproc.py b/pytests/func/test_multiproc.py
index cdb35b24..80bb0a07 100644
--- a/pytests/func/test_multiproc.py
+++ b/pytests/func/test_multiproc.py
@@ -9,16 +9,18 @@ import pytest
from ..helpers.pattern import ANY, Pattern
from ..helpers.session import DebugSession
-from ..helpers.timeline import Event
+from ..helpers.timeline import Event, Request
-@pytest.mark.timeout(20)
+@pytest.mark.timeout(40)
@pytest.mark.skipif(platform.system() != 'Windows',
reason='Debugging multiprocessing module only works on Windows')
def test_multiprocessing(debug_session, pyfile):
@pyfile
def code_to_debug():
import multiprocessing
+ import platform
+ import sys
def child_of_child(q):
print('entering child of child')
@@ -38,7 +40,11 @@ def test_multiprocessing(debug_session, pyfile):
if __name__ == '__main__':
import pytests.helpers.backchannel as backchannel
- multiprocessing.set_start_method('spawn')
+ if sys.version_info >= (3, 4):
+ multiprocessing.set_start_method('spawn')
+ else:
+ assert platform.system() == 'Windows'
+
q = multiprocessing.Queue()
p = multiprocessing.Process(target=child, args=(q,))
p.start()
@@ -50,37 +56,53 @@ def test_multiprocessing(debug_session, pyfile):
assert q.get() == 4
q.close()
- debug_session.setup_backchannel()
debug_session.multiprocess = True
- debug_session.prepare_to_run(filename=code_to_debug)
- debug_session.start_debugging()
+ debug_session.prepare_to_run(filename=code_to_debug, backchannel=True)
+ start = debug_session.start_debugging()
+
+ with debug_session.timeline.frozen():
+ initial_request, = debug_session.timeline.all_occurrences_of(Request('launch') | Request('attach'))
+ initial_process = (start >> Event('process')).wait()
+ initial_pid = int(initial_process.body['systemProcessId'])
child_pid = debug_session.backchannel.read_json()
- child_subprocess = debug_session.wait_until(Event('ptvsd_subprocess'))
- assert child_subprocess.body in Pattern({
+ child_subprocess = (start >> Event('ptvsd_subprocess')).wait()
+ assert child_subprocess.body == Pattern({
+ 'initialProcessId': initial_pid,
+ 'parentProcessId': initial_pid,
'processId': child_pid,
'port': ANY.int,
+ 'initialRequest': {
+ 'command': initial_request.command,
+ 'arguments': initial_request.arguments,
+ }
})
child_port = child_subprocess.body['port']
child_session = DebugSession(method='attach_socket', ptvsd_port=child_port)
child_session.connect()
child_session.handshake()
- child_session.start_debugging()
+ child_start = child_session.start_debugging()
- child_child_subprocess = child_session.wait_until(Event('ptvsd_subprocess'))
- assert child_child_subprocess.body in Pattern({
+ child_child_subprocess = (child_start >> Event('ptvsd_subprocess')).wait()
+ assert child_child_subprocess.body == Pattern({
+ 'initialProcessId': initial_pid,
+ 'parentProcessId': child_pid,
'processId': ANY.int,
'port': ANY.int,
+ 'initialRequest': {
+ 'command': initial_request.command,
+ 'arguments': initial_request.arguments,
+ }
})
child_child_port = child_child_subprocess.body['port']
child_child_session = DebugSession(method='attach_socket', ptvsd_port=child_child_port)
child_child_session.connect()
child_child_session.handshake()
- child_child_session.start_debugging()
- child_child_session.wait_until(Event('process'))
+ child_child_start = child_child_session.start_debugging()
+ (child_child_start >> Event('process')).wait()
debug_session.backchannel.write_json('continue')
diff --git a/pytests/func/test_run.py b/pytests/func/test_run.py
index a9d4f2be..e25a6eb3 100644
--- a/pytests/func/test_run.py
+++ b/pytests/func/test_run.py
@@ -11,17 +11,20 @@ from ..helpers.timeline import Event
def test_run(debug_session, pyfile):
@pyfile
def code_to_debug():
- print('waiting for input')
- input()
- print('got input!')
+ from pytests.helpers import backchannel
+ print('before')
+ assert backchannel.read_json() == 1
+ print('after')
- debug_session.prepare_to_run(filename=code_to_debug)
- debug_session.start_debugging()
+ timeline = debug_session.timeline
+ debug_session.prepare_to_run(filename=code_to_debug, backchannel=True)
+ start = debug_session.start_debugging()
- t = debug_session.wait_until(Event('process') & Event('thread'))
- assert (
- Event('thread', {'reason': 'started', 'threadId': ANY})
- & (
+ first_thread = (start >> Event('thread', {'reason': 'started', 'threadId': ANY})).wait()
+ with timeline.frozen():
+ assert (
+ timeline.beginning
+ >>
Event('initialized', {})
>>
Event('process', {
@@ -30,10 +33,11 @@ def test_run(debug_session, pyfile):
'startMethod': 'launch' if debug_session.method == 'launch' else 'attach',
'systemProcessId': debug_session.process.pid,
})
- )
- ).has_occurred_by(t)
+ >>
+ first_thread
+ ) in timeline
- with debug_session.causing(Event('terminated', {})):
- debug_session.process.communicate(b'0\n')
+ t = debug_session.write_json(1)
+ (t >> Event('terminated', {})).wait()
debug_session.wait_for_exit()
diff --git a/pytests/helpers/__init__.py b/pytests/helpers/__init__.py
index 7a29c745..6771e1dc 100644
--- a/pytests/helpers/__init__.py
+++ b/pytests/helpers/__init__.py
@@ -11,13 +11,29 @@ import time
import traceback
+if sys.version_info >= (3, 3):
+ clock = time.perf_counter
+else:
+ clock = time.clock
+
+
+timestamp_zero = clock()
+
+def timestamp():
+ return clock() - timestamp_zero
+
+
print_lock = threading.Lock()
real_print = print
def print(*args, **kwargs):
- """Like builtin print(), but synchronized using a global lock.
+ """Like builtin print(), but synchronized using a global lock,
+ and adds a timestamp
"""
+ timestamped = kwargs.pop('timestamped', True)
with print_lock:
+ if timestamped:
+ real_print('@%09.6f: ' % timestamp(), end='')
real_print(*args, **kwargs)
diff --git a/pytests/helpers/backchannel.py b/pytests/helpers/backchannel.py
index d85a539e..892b7cb9 100644
--- a/pytests/helpers/backchannel.py
+++ b/pytests/helpers/backchannel.py
@@ -14,6 +14,7 @@ import socket
from ptvsd.messaging import JsonIOStream
port = int(os.getenv('PTVSD_BACKCHANNEL_PORT'))
+# print('Connecting to bchan#%d' % port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
stream = JsonIOStream.from_socket(sock)
diff --git a/pytests/helpers/pattern.py b/pytests/helpers/pattern.py
index e6a3d170..e8daa0d3 100644
--- a/pytests/helpers/pattern.py
+++ b/pytests/helpers/pattern.py
@@ -116,12 +116,14 @@ class Maybe(BasePattern):
"""A pattern that matches if condition is True.
"""
+ name = None
+
def __init__(self, pattern, condition):
self.pattern = pattern
self.condition = condition
def __repr__(self):
- return 'Maybe(%r)' % self.pattern
+ return self.name or 'Maybe(%r)' % self.pattern
def __eq__(self, value):
return self.condition(value) and value == self.pattern
@@ -159,7 +161,15 @@ SUCCESS = Success(True)
FAILURE = Success(False)
ANY = Any()
+
ANY.bool = ANY.such_that(lambda x: x is True or x is False)
+ANY.bool.name = 'ANY.bool'
+
ANY.str = ANY.such_that(lambda x: isinstance(x, unicode))
+ANY.str.name = 'ANY.str'
+
ANY.num = ANY.such_that(lambda x: isinstance(x, numbers.Real))
+ANY.num.name = 'ANY.num'
+
ANY.int = ANY.such_that(lambda x: isinstance(x, numbers.Integral))
+ANY.int.name = 'ANY.int'
diff --git a/pytests/helpers/session.py b/pytests/helpers/session.py
index 1eaf4aba..2cf44d85 100644
--- a/pytests/helpers/session.py
+++ b/pytests/helpers/session.py
@@ -4,7 +4,6 @@
from __future__ import print_function, with_statement, absolute_import
-import contextlib
import os
import socket
import subprocess
@@ -16,7 +15,8 @@ import ptvsd
from ptvsd.messaging import JsonIOStream, JsonMessageChannel, MessageHandlers, RequestFailure
from . import print, watchdog
from .messaging import LoggingJsonStream
-from .timeline import Timeline, Request, Response, Event
+from .pattern import Pattern
+from .timeline import Timeline, Request, Event
# ptvsd.__file__ will be
/ptvsd/__main__.py - we want .
@@ -70,10 +70,12 @@ class DebugSession(object):
except:
self.backchannel_socket = None
- def prepare_to_run(self, perform_handshake=True, filename=None, module=None):
+ def prepare_to_run(self, perform_handshake=True, filename=None, module=None, backchannel=False):
"""Spawns ptvsd using the configured method, telling it to execute the
provided Python file or module, and establishes a message channel to it.
+ If backchannel is True, calls self.setup_backchannel() before returning.
+
If perform_handshake is True, calls self.handshake() before returning.
"""
@@ -102,6 +104,9 @@ class DebugSession(object):
env = os.environ.copy()
env.update({'PYTHONPATH': PTVSD_SYS_PATH})
+
+ if backchannel:
+ self.setup_backchannel()
if self.backchannel_port:
env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel_port)
@@ -115,10 +120,16 @@ class DebugSession(object):
self.connected.wait()
assert self.ptvsd_port
assert self.socket
- print('ptvsd@%d has pid=%d' % (self.ptvsd_port, self.process.pid))
+ print('ptvsd#%d has pid=%d' % (self.ptvsd_port, self.process.pid))
+
+ self.timeline.beginning.await_following(Event('output', Pattern({
+ 'category': 'telemetry',
+ 'output': 'ptvsd',
+ 'data': {'version': ptvsd.__version__}
+ })))
if perform_handshake:
- self.handshake()
+ return self.handshake()
def wait_for_exit(self, expected_returncode=0):
"""Waits for the spawned ptvsd process to exit. If it doesn't exit within
@@ -128,10 +139,10 @@ class DebugSession(object):
def kill():
time.sleep(self.WAIT_FOR_EXIT_TIMEOUT)
- print('ptvsd process %d timed out, killing it' % self.process.pid)
+ print('ptvsd#%r (pid=%d) timed out, killing it' % (self.ptvsd_port, self.process.pid))
if self.is_running:
self.process.kill()
- kill_thread = threading.Thread(target=kill)
+ kill_thread = threading.Thread(target=kill, name='ptvsd#%r watchdog (pid=%d)' % (self.ptvsd_port, self.process.pid))
kill_thread.daemon = True
kill_thread.start()
@@ -151,12 +162,12 @@ class DebugSession(object):
self.server_socket.listen(0)
def accept_worker():
- print('Listening for incoming connection from ptvsd@%d' % self.ptvsd_port)
+ print('Listening for incoming connection from ptvsd#%d' % self.ptvsd_port)
self.socket, _ = self.server_socket.accept()
- print('Incoming ptvsd@%d connection accepted' % self.ptvsd_port)
+ print('Incoming ptvsd#%d connection accepted' % self.ptvsd_port)
self._setup_channel()
- accept_thread = threading.Thread(target=accept_worker)
+ accept_thread = threading.Thread(target=accept_worker, name='ptvsd#%d listener' % self.ptvsd_port)
accept_thread.daemon = True
accept_thread.start()
@@ -171,15 +182,15 @@ class DebugSession(object):
time.sleep(0.1)
def _try_connect(self):
- print('Trying to connect to ptvsd@%d' % self.ptvsd_port)
+ print('Trying to connect to ptvsd#%d' % self.ptvsd_port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.ptvsd_port))
- print('Successfully connected to ptvsd@%d' % self.ptvsd_port)
+ print('Successfully connected to ptvsd#%d' % self.ptvsd_port)
self.socket = sock
self._setup_channel()
def _setup_channel(self):
- self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd@%d' % self.ptvsd_port)
+ self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd#%d' % self.ptvsd_port)
handlers = MessageHandlers(request=self._process_request, event=self._process_event)
self.channel = JsonMessageChannel(self.stream, handlers)
self.channel.start()
@@ -189,20 +200,13 @@ class DebugSession(object):
request = self.timeline.record_request(command, arguments)
request.sent = self.channel.send_request(command, arguments)
request.sent.on_response(lambda response: self._process_response(request, response))
- assert Request(command, arguments) in self.timeline
+ request.causing = lambda expectation: request.await_following(expectation) and request
+ assert Request(command, arguments).is_realized_by(request)
return request
def mark(self, id):
return self.timeline.mark(id)
- @contextlib.contextmanager
- def causing(self, expectation):
- assert expectation not in self.timeline
- promised_occurrence = ['ONLY VALID AFTER END OF BLOCK!']
- yield promised_occurrence
- occ = self.wait_until(expectation)
- promised_occurrence[:] = (occ,)
-
def handshake(self):
"""Performs the handshake that establishes the debug session ('initialized'
and 'launch' or 'attach').
@@ -213,8 +217,9 @@ class DebugSession(object):
to finalize the configuration stage, and start running code.
"""
- with self.causing(Event('initialized', {})):
- self.send_request('initialize', {'adapterID': 'test'}).wait_for_response()
+ (self.send_request('initialize', {'adapterID': 'test'})
+ .causing(Event('initialized', {}))
+ .wait_for_response())
request = 'launch' if self.method == 'launch' else 'attach'
self.send_request(request, {'debugOptions': self.debug_options}).wait_for_response()
@@ -226,7 +231,7 @@ class DebugSession(object):
After this method returns, ptvsd is running the code in the script file or module
that was specified in prepare_to_run().
"""
- self.send_request('configurationDone').wait_for_response()
+ return self.send_request('configurationDone').wait_for_response()
def _process_event(self, channel, event, body):
self.timeline.record_event(event, body)
@@ -236,30 +241,24 @@ class DebugSession(object):
def _process_response(self, request, response):
body = response.body if response.success else RequestFailure(response.error_message)
self.timeline.record_response(request, body)
- assert Response(request, body) in self.timeline
def _process_request(self, channel, command, arguments):
assert False, 'ptvsd should not be sending requests.'
- def wait_until(self, expectation):
- return self.timeline.wait_until(expectation)
-
- def history(self):
- return self.timeline.history
-
def setup_backchannel(self):
- assert self.process is None, 'setup_backchannel() must be called before prepare_to_run()'
self.backchannel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.backchannel_socket.bind(('localhost', 0))
_, self.backchannel_port = self.backchannel_socket.getsockname()
self.backchannel_socket.listen(0)
- backchannel_thread = threading.Thread(target=self._backchannel_worker)
+ backchannel_thread = threading.Thread(target=self._backchannel_worker, name='ptvsd#%d backchannel' % self.ptvsd_port)
backchannel_thread.daemon = True
backchannel_thread.start()
def _backchannel_worker(self):
+ print('Listening for incoming backchannel connection for bchan#%d' % self.ptvsd_port)
sock, _ = self.backchannel_socket.accept()
- self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan@%d' % self.ptvsd_port)
+ print('Incoming bchan#%d backchannel connection accepted' % self.ptvsd_port)
+ self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan#%d' % self.ptvsd_port)
self.backchannel_established.set()
@property
@@ -267,3 +266,11 @@ class DebugSession(object):
assert self.backchannel_port, 'backchannel() must be called after setup_backchannel()'
self.backchannel_established.wait()
return self._backchannel_stream
+
+ def read_json(self):
+ return self.backchannel.read_json()
+
+ def write_json(self, value):
+ t = self.timeline.mark(('sending', value))
+ self.backchannel.write_json(value)
+ return t
diff --git a/pytests/helpers/test_timeline.py b/pytests/helpers/test_timeline.py
index 2ec6276d..887e68da 100644
--- a/pytests/helpers/test_timeline.py
+++ b/pytests/helpers/test_timeline.py
@@ -10,7 +10,7 @@ import time
from ptvsd.messaging import RequestFailure
-from .pattern import Pattern, ANY, SUCCESS, FAILURE
+from .pattern import Pattern, ANY, SUCCESS, FAILURE, Is
from .timeline import Timeline, Mark, Event, Request, Response
@@ -25,87 +25,111 @@ def make_timeline():
def factory():
timeline = Timeline()
- assert Mark(Pattern('begin')) in timeline
- assert timeline.history() in Pattern([('Mark', 'begin',)])
timelines.append(timeline)
return timeline
yield factory
for timeline in timelines:
- timeline.freeze()
- history = list(timeline.history())
+ timeline.finalize()
+ history = timeline.history()
history.sort(key=lambda occ: occ.timestamp)
assert history == timeline.history()
def history_data(timeline):
- return [occ.__data__() for occ in timeline.history()]
+ with timeline.frozen():
+ return [occ.__data__() for occ in timeline.history()]
def test_simple_1thread(make_timeline):
timeline = make_timeline()
expected_history = history_data(timeline)
- expectations = []
+
+ with timeline.frozen():
+ assert timeline.all_occurrences_of(Mark('tada')) == ()
+ assert not Mark('tada').has_been_realized_in(timeline)
mark = timeline.mark('tada')
+ expected_history += [('Mark', 'tada')]
+
assert mark.circumstances == ('Mark', 'tada')
assert mark.id == 'tada'
- expectations += [Mark('tada')]
- assert expectations in timeline
- expected_history += [('Mark', 'tada')]
- assert timeline.history() in Pattern(expected_history)
+ with timeline.frozen():
+ assert timeline.history() == Pattern(expected_history)
+ assert timeline.all_occurrences_of(Mark('tada')) == Pattern((Is(mark),))
+ assert timeline.last is mark
+ assert Mark('tada').has_been_realized_in(timeline)
request = timeline.record_request('next', {'threadId': 3})
+ expected_history += [('Request', 'next', {'threadId': 3})]
+
assert request.circumstances == ('Request', 'next', {'threadId': 3})
assert request.command == 'next'
assert request.arguments == {'threadId': 3}
- expectations += [Request('next', {'threadId': 3})]
- assert expectations in timeline
- expected_history += [('Request', 'next', {'threadId': 3})]
- assert timeline.history() in Pattern(expected_history)
+ with timeline.frozen():
+ assert timeline.history() == Pattern(expected_history)
+ assert timeline.last is request
+ assert Request('next', {'threadId': 3}).has_been_realized_in(timeline)
+ assert timeline.all_occurrences_of(Request('next', {'threadId': 3})) == Pattern((Is(request),))
response = timeline.record_response(request, {})
- assert response.circumstances == ('Response', request, {})
+ expected_history += [('Response', Is(request), {})]
+
+ assert response.circumstances == Pattern(('Response', Is(request), {}))
assert response.request is request
assert response.body == {}
assert response.success
- expectations += [
+ expectations = [
Response(request, {}),
Response(request, SUCCESS),
+ Response(request),
Response(Request('next', {'threadId': 3}), {}),
Response(Request('next', {'threadId': 3}), SUCCESS),
+ Response(Request('next', {'threadId': 3})),
]
- assert expectations in timeline
- expected_history += [('Response', request, {})]
- assert timeline.history() in Pattern(expected_history)
+ with timeline.frozen():
+ assert timeline.history() == Pattern(expected_history)
+ assert timeline.last is response
+ for exp in expectations:
+ print(exp)
+ assert exp.has_been_realized_in(timeline)
+ print(timeline.all_occurrences_of(exp))
+ assert timeline.all_occurrences_of(exp) == Pattern((Is(response),))
event = timeline.record_event('stopped', {'reason': 'pause'})
- assert event.circumstances == ('Event', 'stopped', {'reason': 'pause'})
- expectations += [Event('stopped', {'reason': 'pause'})]
- assert expectations in timeline
expected_history += [('Event', 'stopped', {'reason': 'pause'})]
- assert timeline.history() in Pattern(expected_history)
- request = timeline.record_request('next', {'threadId': 6})
+ assert event.circumstances == ('Event', 'stopped', {'reason': 'pause'})
+ with timeline.frozen():
+ assert timeline.last is event
+ assert timeline.history() == Pattern(expected_history)
+ assert Event('stopped', {'reason': 'pause'}).has_been_realized_in(timeline)
+ assert timeline.all_occurrences_of(Event('stopped', {'reason': 'pause'})) == Pattern((Is(event),))
+
+ request2 = timeline.record_request('next', {'threadId': 6})
expected_history += [('Request', 'next', {'threadId': 6})]
+ response2 = timeline.record_response(request2, RequestFailure('error!'))
+ expected_history += [('Response', Is(request2), FAILURE)]
- response = timeline.record_response(request, RequestFailure('error!'))
- assert response.circumstances in Pattern((
- 'Response',
- request,
- ANY.such_that(lambda err: isinstance(err, RequestFailure) and err.message == 'error!')
- ))
- assert response.request is request
- assert isinstance(response.body, RequestFailure) and response.body.message == 'error!'
- assert not response.success
- expectations += [
- Response(request, FAILURE),
+ assert response2.circumstances == Pattern(('Response', Is(request2), FAILURE))
+ assert response2.request is request2
+ assert isinstance(response2.body, RequestFailure) and response2.body.message == 'error!'
+ assert not response2.success
+ expectations = [
+ Response(request2, RequestFailure('error!')),
+ Response(request2, FAILURE),
+ Response(request2),
+ Response(Request('next', {'threadId': 6}), RequestFailure('error!')),
Response(Request('next', {'threadId': 6}), FAILURE),
+ Response(Request('next', {'threadId': 6})),
]
- assert expectations in timeline
- expected_history += [('Response', request, FAILURE)]
- assert timeline.history() in Pattern(expected_history)
+ with timeline.frozen():
+ for exp in expectations:
+ print(exp)
+ assert exp.has_been_realized_in(timeline)
+ assert timeline.all_occurrences_of(exp) == Pattern((Is(response2),))
+ assert timeline.all_occurrences_of(Response(Request('next'))) == Pattern((Is(response), Is(response2)))
@pytest.mark.parametrize('occurs_before_wait', (True, False))
@@ -146,34 +170,93 @@ def test_simple_mthread(make_timeline, daemon, occurs_before_wait):
thev.set()
threading.Thread(target=set_later).start()
+ t = timeline.beginning
+
advance_worker()
expectation = Mark('tada')
- timeline.wait_until(expectation)
- assert expectation in timeline
expected_history += [('Mark', 'tada')]
- assert timeline.history() in Pattern(expected_history)
+ t = t.await_following(expectation)
+
+ with timeline.frozen():
+ assert expectation.has_been_realized_in(timeline)
+ assert timeline.history() == Pattern(expected_history)
advance_worker()
- expectation = Request('next', {'threadId': 3})
- timeline.wait_until(expectation)
- assert expectation in timeline
expected_history += [('Request', 'next', {'threadId': 3})]
- assert timeline.history() in Pattern(expected_history)
- request = occurrences[-1]
+ expectation = Request('next', {'threadId': 3})
+ t = t.await_following(expectation)
+
+ with timeline.frozen():
+ assert expectation.has_been_realized_in(timeline)
+ assert timeline.history() == Pattern(expected_history)
+ request = occurrences[-1]
+ assert request is t
advance_worker()
- expectation = Response(request, {}) & Response(Request('next', {'threadId': 3}), {})
- timeline.wait_until(expectation)
- assert expectation in timeline
expected_history += [('Response', request, {})]
- assert timeline.history() in Pattern(expected_history)
+ expectation = Response(request, {}) & Response(Request('next', {'threadId': 3}), {})
+ t = t.await_following(expectation)
+
+ with timeline.frozen():
+ assert expectation.has_been_realized_in(timeline)
+ assert timeline.history() == Pattern(expected_history)
advance_worker()
- expectation = Event('stopped', {'reason': 'pause'})
- timeline.wait_until(expectation)
- assert expectation in timeline
expected_history += [('Event', 'stopped', {'reason': 'pause'})]
- assert timeline.history() in Pattern(expected_history)
+ expectation = Event('stopped', {'reason': 'pause'})
+
+ with timeline.frozen():
+ t = t.await_following(expectation)
+ assert expectation.has_been_realized_in(timeline)
+ assert timeline.history() == Pattern(expected_history)
+
+
+def test_after(make_timeline):
+ timeline = make_timeline()
+ first = timeline.mark('first')
+
+ second_exp = first >> Mark('second')
+ with timeline.frozen():
+ assert second_exp not in timeline
+
+ timeline.mark('second')
+ with timeline.frozen():
+ assert second_exp in timeline
+
+
+def test_before(make_timeline):
+ timeline = make_timeline()
+ t = timeline.beginning
+
+ first = timeline.mark('first')
+ timeline.mark('second')
+
+ with timeline.frozen():
+ assert t >> Mark('second') >> Mark('first') not in timeline
+ assert Mark('second') >> first not in timeline
+
+ third = timeline.mark('third')
+
+ with timeline.frozen():
+ assert t >> Mark('second') >> Mark('first') not in timeline
+ assert Mark('second') >> first not in timeline
+ assert t >> Mark('second') >> Mark('third') in timeline
+ assert Mark('second') >> third in timeline
+
+
+def test_not(make_timeline):
+ timeline = make_timeline()
+ timeline.mark('other')
+
+ with timeline.frozen():
+ assert timeline.beginning >> ~Mark('something') in timeline
+ t = timeline.last
+
+ timeline.mark('something')
+
+ with timeline.frozen():
+ assert timeline.beginning >> ~Mark('something') in timeline
+ assert t >> ~Mark('something') not in timeline
def test_and(make_timeline):
@@ -182,24 +265,30 @@ def test_and(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
- assert eggs_exp & ham_exp not in timeline
- assert ham_exp & eggs_exp not in timeline
- assert cheese_exp & ham_exp & eggs_exp not in timeline
+ t = timeline.beginning
+
+ with timeline.frozen():
+ assert t >> (eggs_exp & ham_exp) not in timeline
+ assert t >> (ham_exp & eggs_exp) not in timeline
+ assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('eggs')
- assert eggs_exp & ham_exp not in timeline
- assert ham_exp & eggs_exp not in timeline
- assert cheese_exp & ham_exp & eggs_exp not in timeline
+ with timeline.frozen():
+ assert t >> (eggs_exp & ham_exp) not in timeline
+ assert t >> (ham_exp & eggs_exp) not in timeline
+ assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('ham')
- assert eggs_exp & ham_exp in timeline
- assert ham_exp & eggs_exp in timeline
- assert cheese_exp & ham_exp & eggs_exp not in timeline
+ with timeline.frozen():
+ assert t >> (eggs_exp & ham_exp) in timeline
+ assert t >> (ham_exp & eggs_exp) in timeline
+ assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('cheese')
- assert eggs_exp & ham_exp in timeline
- assert ham_exp & eggs_exp in timeline
- assert cheese_exp & ham_exp & eggs_exp in timeline
+ with timeline.frozen():
+ assert t >> (eggs_exp & ham_exp) in timeline
+ assert t >> (ham_exp & eggs_exp) in timeline
+ assert t >> (cheese_exp & ham_exp & eggs_exp) in timeline
def test_or(make_timeline):
@@ -208,33 +297,37 @@ def test_or(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
- assert eggs_exp | ham_exp not in timeline
- assert ham_exp | eggs_exp not in timeline
- assert cheese_exp | ham_exp | eggs_exp not in timeline
+ t = timeline.beginning
+
+ with timeline.frozen():
+ assert t >> (eggs_exp | ham_exp) not in timeline
+ assert t >> (ham_exp | eggs_exp) not in timeline
+ assert t >> (cheese_exp | ham_exp | eggs_exp) not in timeline
timeline.mark('eggs')
- assert eggs_exp | ham_exp in timeline
- assert ham_exp | eggs_exp in timeline
- assert cheese_exp | ham_exp | eggs_exp in timeline
+ with timeline.frozen():
+ assert t >> (eggs_exp | ham_exp) in timeline
+ assert t >> (ham_exp | eggs_exp) in timeline
+ assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
timeline.mark('cheese')
- assert eggs_exp | ham_exp in timeline
- assert ham_exp | eggs_exp in timeline
- assert cheese_exp | ham_exp | eggs_exp in timeline
-
- timeline = make_timeline()
+ with timeline.frozen():
+ assert t >> (eggs_exp | ham_exp) in timeline
+ assert t >> (ham_exp | eggs_exp) in timeline
+ assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
timeline.mark('ham')
- assert eggs_exp | ham_exp in timeline
- assert ham_exp | eggs_exp in timeline
- assert cheese_exp | ham_exp | eggs_exp in timeline
-
- timeline = make_timeline()
+ with timeline.frozen():
+ assert t >> (eggs_exp | ham_exp) in timeline
+ assert t >> (ham_exp | eggs_exp) in timeline
+ assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
+ t = timeline.last
timeline.mark('cheese')
- assert eggs_exp | ham_exp not in timeline
- assert ham_exp | eggs_exp not in timeline
- assert cheese_exp | ham_exp | eggs_exp in timeline
+ with timeline.frozen():
+ assert t >> (eggs_exp | ham_exp) not in timeline
+ assert t >> (ham_exp | eggs_exp) not in timeline
+ assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
def test_xor(make_timeline):
@@ -243,60 +336,44 @@ def test_xor(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
- assert eggs_exp ^ ham_exp not in timeline
- assert ham_exp ^ eggs_exp not in timeline
- assert cheese_exp ^ ham_exp ^ eggs_exp not in timeline
+ t1 = timeline.beginning
+
+ with timeline.frozen():
+ assert t1 >> (eggs_exp ^ ham_exp) not in timeline
+ assert t1 >> (ham_exp ^ eggs_exp) not in timeline
+ assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) not in timeline
timeline.mark('eggs')
- assert eggs_exp ^ ham_exp in timeline
- assert ham_exp ^ eggs_exp in timeline
- assert cheese_exp ^ ham_exp ^ eggs_exp in timeline
+ with timeline.frozen():
+ assert t1 >> (eggs_exp ^ ham_exp) in timeline
+ assert t1 >> (ham_exp ^ eggs_exp) in timeline
+ assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) in timeline
+ t2 = timeline.last
timeline.mark('ham')
- assert eggs_exp ^ ham_exp not in timeline
- assert ham_exp ^ eggs_exp not in timeline
- assert cheese_exp ^ ham_exp ^ eggs_exp not in timeline
+ with timeline.frozen():
+ assert t1 >> (eggs_exp ^ ham_exp) in timeline
+ assert t2 >> (eggs_exp ^ ham_exp) not in timeline
+ assert t1 >> (ham_exp ^ eggs_exp) in timeline
+ assert t2 >> (ham_exp ^ eggs_exp) not in timeline
+ assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) in timeline
+ assert t2 >> (cheese_exp ^ ham_exp ^ eggs_exp) not in timeline
def test_conditional(make_timeline):
def is_exciting(occ):
- return occ.circumstances in Pattern(('Event', ANY, 'exciting'))
+ return occ.circumstances == Pattern(('Event', ANY, 'exciting'))
something = Event('something', ANY)
something_exciting = something.when(is_exciting)
timeline = make_timeline()
+ t = timeline.beginning
timeline.record_event('something', 'boring')
- assert something in timeline
- assert something_exciting not in timeline
+ with timeline.frozen():
+ assert t >> something in timeline
+ assert t >> something_exciting not in timeline
timeline.record_event('something', 'exciting')
- assert something_exciting in timeline
-
-
-def test_after(make_timeline):
- timeline = make_timeline()
- first = timeline.mark('first')
-
- second_exp = first >> Mark('second')
- assert second_exp not in timeline
-
- timeline.mark('second')
- assert second_exp in timeline
-
-
-def test_before(make_timeline):
- timeline = make_timeline()
- first = timeline.mark('first')
-
- timeline.mark('second')
- assert Mark('second') >> Mark('first') not in timeline
- assert Mark('second') >> first not in timeline
-
- third = timeline.mark('third')
- assert Mark('second') >> Mark('first') not in timeline
- assert Mark('second') >> first not in timeline
- assert Mark('second') >> Mark('third') in timeline
- assert Mark('second') >> third in timeline
-
-
+ with timeline.frozen():
+ assert t >> something_exciting in timeline
diff --git a/pytests/helpers/timeline.py b/pytests/helpers/timeline.py
index 86b88efc..5fec47b7 100644
--- a/pytests/helpers/timeline.py
+++ b/pytests/helpers/timeline.py
@@ -4,68 +4,132 @@
from __future__ import print_function, with_statement, absolute_import
+import contextlib
+import itertools
import threading
-import time
+
+# This is only imported to ensure that the module is actually installed and the
+# timeout setting in pytest.ini is active, since otherwise most timeline-based
+# tests will hang indefinitely.
import pytest_timeout # noqa
+from pytests.helpers import print, timestamp
import pytests.helpers.pattern as pattern
class Timeline(object):
def __init__(self):
self._cvar = threading.Condition()
- self._is_frozen = False
+ self.index_iter = itertools.count(1)
self._last = None
+ self._is_frozen = False
+ self._is_final = False
+ self.beginning = None # needed for mark() below
self.beginning = self.mark('begin')
+ def assert_frozen(self):
+ assert self.is_frozen, 'Timeline can only be inspected while frozen()'
+
+ @property
def last(self):
with self._cvar:
+ self.assert_frozen()
return self._last
def history(self):
- result = list(self.last().backtrack())
- result.reverse()
- return result
+ self.assert_frozen()
+ return list(self.beginning.and_following())
- def __contains__(self, expectations):
- try:
- iter(expectations)
- except TypeError:
- expectations = (expectations,)
- assert all(isinstance(exp, Expectation) for exp in expectations)
- last = self.last()
- return all(exp.has_occurred_by(last) for exp in expectations)
+ def all_occurrences_of(self, expectation):
+ occs = [occ for occ in self.history() if occ.realizes(expectation)]
+ return tuple(occs)
- def _record(self, occurrence):
- assert isinstance(occurrence, Occurrence)
- assert occurrence.timeline is self
- assert occurrence.preceding is None
+ def __contains__(self, expectation):
+ assert expectation.has_lower_bound, (
+ 'Expectation must have a lower time bound to be used with "in"! '
+ 'Use >> to sequence an expectation against an occurrence to establish a lower bound, '
+ 'or use has_been_realized_in() to test for unbounded expectations in the timeline.'
+ )
+ return expectation.has_been_realized_in(self)
+
+ def __getitem__(self, index):
+ assert index is slice
+ assert index.step is None
+ start = index.start or self.beginning
+ stop = index.stop
+ if stop is None:
+ assert self._is_frozen
+ stop = self._last
+ return self.Interval(self, start, stop)
+
+ def wait_until(self, condition):
with self._cvar:
- assert not self._is_frozen
- occurrence.timestamp = time.clock()
- occurrence.preceding = self._last
- self._last = occurrence
- self._cvar.notify_all()
-
- def freeze(self):
- with self._cvar:
- self._is_frozen = True
-
- def wait_until(self, expectation):
- assert isinstance(expectation, Expectation)
- with self._cvar:
- while expectation not in self:
+ while True:
+ with self.frozen():
+ if condition():
+ break
+ assert not self._is_final
self._cvar.wait()
return self._last
+ def wait_for(self, expectation):
+ assert expectation.has_lower_bound, (
+ 'Expectation must have a lower time bound to be used with wait_for()!'
+ 'Use >> to sequence an expectation against an occurrence to establish a lower bound.'
+ )
+ print('Waiting for %r' % expectation)
+ return self.wait_until(lambda: expectation in self)
+
+ def _record(self, occurrence):
+ t = timestamp()
+ assert isinstance(occurrence, Occurrence)
+ assert occurrence.timeline is self
+ assert occurrence.timestamp is None
+ with self._cvar:
+ assert not self._is_final
+ occurrence.timestamp = t
+ occurrence.index = next(self.index_iter)
+ if self._last is None:
+ self.beginning = occurrence
+ self._last = occurrence
+ else:
+ occurrence.previous = self._last
+ self._last._next = occurrence
+ self._last = occurrence
+ self._cvar.notify_all()
+
+ @contextlib.contextmanager
+ def frozen(self):
+ with self._cvar:
+ was_frozen = self._is_frozen
+ self._is_frozen = True
+ yield
+ self._is_frozen = was_frozen
+
+ @property
+ def is_frozen(self):
+ return self._is_frozen
+
+ def finalize(self):
+ with self._cvar:
+ self._is_final = True
+ self._is_frozen = True
+
+ @property
+ def is_finalized(self):
+ return self._is_finalized
+
def __repr__(self):
- return '|' + ' >> '.join(repr(occ) for occ in self.history()) + '|'
+ with self.frozen():
+ return '|' + ' >> '.join(repr(occ) for occ in self.history()) + '|'
def __str__(self):
- return '\n'.join(repr(occ) for occ in self.history())
+ with self.frozen():
+ return '\n'.join(repr(occ) for occ in self.history())
def __data__(self):
- return self.history()
+ with self.frozen():
+ return self.history()
def mark(self, id):
occ = Occurrence(self, 'Mark', id)
@@ -76,11 +140,7 @@ class Timeline(object):
occ = Occurrence(self, 'Request', command, arguments)
occ.command = command
occ.arguments = arguments
-
- def wait_for_response():
- self.wait_until(Response(occ, pattern.ANY))
-
- occ.wait_for_response = wait_for_response
+ occ.wait_for_response = lambda: Response(occ, pattern.ANY).wait()
return occ
def record_response(self, request, body):
@@ -97,76 +157,79 @@ class Timeline(object):
occ.body = body
return occ
+ class Interval(tuple):
+ def __init__(self, timeline, start, stop):
+ assert isinstance(start, Occurrence)
+ assert isinstance(stop, Occurrence)
-class Occurrence(object):
- def __init__(self, timeline, *circumstances):
- assert circumstances
- self.timeline = timeline
- self.preceding = None
- self.timestamp = None
- self.circumstances = circumstances
- timeline._record(self)
- assert self.timestamp is not None
+ if start is stop:
+ occs = ()
+ else:
+ assert start < stop
+ occs = tuple(self.start.and_following(until=self.stop))
+ super(Timeline.Interval, self).__init__(occs)
- def backtrack(self, until=None):
- assert until is None or isinstance(until, Occurrence)
- occ = self
- while occ is not until:
- yield occ
- occ = occ.preceding
+ self.timeline = timeline
+ self.start = start
+ self.stop = stop
- def precedes(self, occurrence):
- assert isinstance(occurrence, Occurrence)
- preceding = occurrence.backtrack()
- next(preceding)
- return any(occ is self for occ in preceding)
-
- def follows(self, occurrence):
- assert isinstance(occurrence, Occurrence)
- return occurrence.precedes(self)
-
- def __lt__(self, occurrence):
- return self.precedes(occurrence)
-
- def __gt__(self, occurrence):
- return self.follows(occurrence)
-
- def __le__(self, occurrence):
- return self == occurrence or self < occurrence
-
- def __ge__(self, occurrence):
- return self == occurrence or self > occurrence
-
- def __rshift__(self, expectation):
- assert isinstance(expectation, Expectation)
- return expectation.after(self)
-
- def __hash__(self):
- return hash(id(self))
-
- def __data__(self):
- return self.circumstances
-
- def __repr__(self):
- timestamp = int(self.timestamp * 100000)
- return '@%06d:%s%r' % (timestamp, self.circumstances[0], self.circumstances[1:])
+ def __contains__(self, expectation):
+ occs = [occ for occ in self if expectation.is_realized_by(occ)]
+ occs.reverse()
+ return tuple(occs)
class Expectation(object):
- def has_occurred_by(self, occurrence):
+ timeline = None
+ has_lower_bound = False
+
+ def is_realized_by(self, occurrence):
raise NotImplementedError()
- def after(self, other):
- return BoundedExpectation(self, must_follow=other)
+ def is_realized_by_any_of(self, occurrences):
+ return any(self.is_realized_by(occ) for occ in occurrences)
- def before(self, other):
- return BoundedExpectation(self, must_precede=other)
+ def has_been_realized_before(self, occurrence):
+ return self.is_realized_by_any_of(occurrence.preceding())
+
+ def has_been_realized_after(self, occurrence):
+ return self.is_realized_by_any_of(occurrence.following())
+
+ def has_been_realized_at_or_before(self, occurrence):
+ return self.is_realized_by_any_of(occurrence.and_preceding())
+
+ def has_been_realized_at_or_after(self, occurrence):
+ return self.is_realized_by_any_of(occurrence.and_following())
+
+ def has_been_realized_in(self, timeline):
+ return timeline.all_occurrences_of(self) != ()
+
+ def wait(self):
+ assert self.timeline and self.has_lower_bound, (
+ 'Expectation must belong to a timeline and have a lower time bound to be used wait()! '
+ 'Use >> to sequence an expectation against an occurrence to establish a lower bound.'
+ )
+ return self.timeline.wait_for(self)
+
+ def __eq__(self, other):
+ if self is other:
+ return True
+ elif isinstance(other, Occurrence) and self.is_realized_by(other):
+ return True
+ else:
+ return NotImplemented
+
+ def __ne__(self, other):
+ return not self == other
+
+ def after(self, other):
+ return SequencedExpectation(self, only_after=other)
def when(self, condition):
return ConditionalExpectation(self, condition)
def __rshift__(self, other):
- return self.before(other)
+ return self if other is None else other.after(self)
def __and__(self, other):
assert isinstance(other, Expectation)
@@ -180,80 +243,86 @@ class Expectation(object):
assert isinstance(other, Expectation)
return XorExpectation(self, other)
+ def __invert__(self):
+ return NotExpectation(self)
+
def __repr__(self):
raise NotImplementedError()
-class BoundedExpectation(Expectation):
- def __init__(self, expectation, must_follow=None, must_precede=None):
- self.expectation = expectation
- self.must_follow = Occurred(must_follow) if isinstance(must_follow, Occurrence) else must_follow
- self.must_precede = Occurred(must_precede) if isinstance(must_precede, Occurrence) else must_precede
- assert isinstance(self.expectation, Expectation)
- assert self.must_follow is None or isinstance(self.must_follow, Expectation)
- assert self.must_precede is None or isinstance(self.must_precede, Expectation)
-
- def has_occurred_by(self, occurrence):
- assert isinstance(occurrence, Occurrence)
-
- expectation = self.expectation
- must_follow = self.must_follow
- must_precede = self.must_precede
- occ = occurrence
-
- if must_precede is not None:
- for occ in occ.backtrack():
- if not must_precede.has_occurred_by(occ):
- break
- else:
- return False
-
- for occ in occ.backtrack():
- if expectation.has_occurred_by(occ):
- break
- else:
- return False
-
- return must_follow is None or must_follow.has_occurred_by(occ)
-
- def __repr__(self):
- s = '('
- if self.must_follow is not None:
- s += repr(self.must_follow) + ' >> '
- s += repr(self.expectation)
- if self.must_precede is not None:
- s += ' >> ' + repr(self.must_precede)
- s += ')'
- return s
-
-
-class AndExpectation(Expectation):
+class DerivativeExpectation(Expectation):
def __init__(self, *expectations):
+ self.expectations = expectations
assert len(expectations) > 0
assert all(isinstance(exp, Expectation) for exp in expectations)
- self.expectations = expectations
- def has_occurred_by(self, occurrence):
+ timelines = {id(exp.timeline): exp.timeline for exp in expectations}
+ timelines.pop(id(None), None)
+ if len(timelines) > 1:
+ print('Cannot mix expectations from multiple timelines:')
+ for tl_id, tl in timelines.items():
+ print('\n %d: %r' % (tl_id, tl))
+ print()
+ raise ValueError('Cannot mix expectations from multiple timelines')
+ for tl in timelines.values():
+ self.timeline = tl
+
+ @property
+ def has_lower_bound(self):
+ return all(exp.has_lower_bound for exp in self.expectations)
+
+
+class SequencedExpectation(DerivativeExpectation):
+ def __init__(self, expectation, only_after):
+ super(SequencedExpectation, self).__init__(expectation, only_after)
+
+ @property
+ def expectation(self):
+ return self.expectations[0]
+
+ @property
+ def only_after(self):
+ return self.expectations[1]
+
+ def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
- return all(exp.has_occurred_by(occurrence) for exp in self.expectations)
+ return (
+ occurrence.realizes(self.expectation) and
+ self.only_after.has_been_realized_before(occurrence)
+ )
- def __and__(self, other):
- assert isinstance(other, Expectation)
- return AndExpectation(*(self.expectations + (other,)))
+ @property
+ def has_lower_bound(self):
+ return self.expectation.has_lower_bound or self.only_after.has_lower_bound
def __repr__(self):
- return '(' + ' & '.join(repr(exp) for exp in self.expectations) + ')'
+ return '(%r >> %r)' % (self.only_after, self.expectation)
-class OrExpectation(Expectation):
- def __init__(self, *expectations):
- assert len(expectations) > 0
- assert all(isinstance(exp, Expectation) for exp in expectations)
- self.expectations = expectations
+class NotExpectation(DerivativeExpectation):
+ def __init__(self, expectation):
+ super(NotExpectation, self).__init__(expectation)
- def has_occurred_by(self, occurrence):
+ @property
+ def expectation(self):
+ return self.expectations[0]
+
+ def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
- return any(exp.has_occurred_by(occurrence) for exp in self.expectations)
+ return not occurrence.realizes(self.expectation)
+
+ @property
+ def has_lower_bound(self):
+ return self.expectation.has_lower_bound
+
+ def __repr__(self):
+ return '~%r' % self.expectation
+
+
+class OrExpectation(DerivativeExpectation):
+ def is_realized_by(self, occurrence):
+ assert isinstance(occurrence, Occurrence)
+ return any(occurrence.realizes(exp) for exp in self.expectations)
def __or__(self, other):
assert isinstance(other, Expectation)
@@ -263,15 +332,55 @@ class OrExpectation(Expectation):
return '(' + ' | '.join(repr(exp) for exp in self.expectations) + ')'
-class XorExpectation(Expectation):
- def __init__(self, *expectations):
- assert len(expectations) > 0
- assert all(isinstance(exp, Expectation) for exp in expectations)
- self.expectations = expectations
-
- def has_occurred_by(self, occurrence):
+class AndExpectation(DerivativeExpectation):
+ def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
- return sum(exp.has_occurred_by(occurrence) for exp in self.expectations) == 1
+
+ # At least one expectation must be realized by the occurrence.
+ expectations = list(self.expectations)
+ for exp in expectations:
+ if occurrence.realizes(exp):
+ break
+ else:
+ return False
+
+ # And then all of the remaining expectations must have been realized
+ # at or sometime before that occurrence.
+ expectations.remove(exp)
+ return all(exp.has_been_realized_at_or_before(occurrence) for exp in expectations)
+
+ @property
+ def has_lower_bound(self):
+ return any(exp.has_lower_bound for exp in self.expectations)
+
+ def __and__(self, other):
+ assert isinstance(other, Expectation)
+ return AndExpectation(*(self.expectations + (other,)))
+
+ def __repr__(self):
+ return '(' + ' & '.join(repr(exp) for exp in self.expectations) + ')'
+
+
+class XorExpectation(DerivativeExpectation):
+ def is_realized_by(self, occurrence):
+ assert isinstance(occurrence, Occurrence)
+
+ # At least one expectation must be realized by the occurrence.
+ expectations = list(self.expectations)
+ for exp in expectations:
+ if occurrence.realizes(exp):
+ break
+ else:
+ return False
+
+ # And then none of the remaining expectations must have been realized
+ # at or sometime before that occurrence.
+ expectations.remove(exp)
+ return not any(exp.has_been_realized_at_or_before(occurrence) for exp in expectations)
+
+ @property
+ def has_lower_bound(self):
+ return all(exp.has_lower_bound for exp in self.expectations)
def __xor__(self, other):
assert isinstance(other, Expectation)
@@ -281,62 +390,153 @@ class XorExpectation(Expectation):
return '(' + ' ^ '.join(repr(exp) for exp in self.expectations) + ')'
-class ConditionalExpectation(Expectation):
+class ConditionalExpectation(DerivativeExpectation):
def __init__(self, expectation, condition):
- assert isinstance(expectation, Expectation)
- self.expectation = expectation
self.condition = condition
+ super(ConditionalExpectation, self).__init__(expectation)
- def has_occurred_by(self, occurrence):
+ @property
+ def expectation(self):
+ return self.expectations[0]
+
+ def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
- return self.condition(occurrence) and self.expectation.has_occurred_by(occurrence)
+ return self.condition(occurrence) and occurrence.realizes(self.expectation)
def __repr__(self):
return '%r?' % self.expectation
-class BasicExpectation(Expectation):
+class PatternExpectation(Expectation):
def __init__(self, *circumstances):
self.circumstances = pattern.Pattern(circumstances)
- def has_occurred_by(self, occurrence):
+ def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
- return any(
- occ.circumstances
- in self.circumstances
- for occ in occurrence.backtrack()
- )
+ return occurrence.circumstances == self.circumstances
def __repr__(self):
circumstances = self.circumstances.pattern
return '%s%r' % (circumstances[0], circumstances[1:])
-class Occurred(BasicExpectation):
- def __init__(self, occurrence):
- assert isinstance(occurrence, Occurrence)
- self.occurrence = occurrence
-
- def has_occurred_by(self, occurrence):
- assert isinstance(occurrence, Occurrence)
- return any(occ is self.occurrence for occ in occurrence.backtrack())
-
- def __repr__(self):
- return 'Occurred(%r)' % self.occurrence
-
-
def Mark(id):
- return BasicExpectation('Mark', id)
+ return PatternExpectation('Mark', id)
def Request(command, arguments=pattern.ANY):
- return BasicExpectation('Request', command, arguments)
+ return PatternExpectation('Request', command, arguments)
def Response(request, body=pattern.ANY):
assert isinstance(request, Expectation) or isinstance(request, Occurrence)
- return BasicExpectation('Response', request, body)
+ exp = PatternExpectation('Response', request, body)
+ exp.timeline = request.timeline
+ exp.has_lower_bound = request.has_lower_bound
+ return exp
def Event(event, body=pattern.ANY):
- return BasicExpectation('Event', event, body)
+ return PatternExpectation('Event', event, body)
+
+
+class Occurrence(Expectation):
+ has_lower_bound = True
+
+ def __init__(self, timeline, *circumstances):
+ assert circumstances
+ self.timeline = timeline
+ self.previous = None
+ self._next = None
+ self.timestamp = None
+ self.index = None
+ self.circumstances = circumstances
+ timeline._record(self)
+ assert self.timestamp is not None
+
+ @property
+ def next(self):
+ with self.timeline.frozen():
+ occ = self._next
+ was_last = occ is self.timeline.last
+ if was_last:
+ # The .next property of the last occurrence in a timeline can change
+ # at any moment when timeline isn't frozen. So if it wasn't frozen by
+ # the caller, this was an unsafe operation, and we should complain.
+ self.timeline.assert_frozen()
+ return occ
+
+ def preceding(self):
+ it = self.and_preceding()
+ next(it)
+ return it
+
+ def and_preceding(self, up_to=None):
+ assert up_to is None or isinstance(up_to, Expectation)
+ if isinstance(up_to, Occurrence):
+ assert self > up_to
+ occ = self
+ while occ != up_to:
+ yield occ
+ occ = occ.previous
+
+ def following(self):
+ self.timeline.assert_frozen()
+ it = self.and_following()
+ next(it)
+ return it
+
+ def and_following(self, up_to=None):
+ assert up_to is None or isinstance(up_to, Expectation)
+ self.timeline.assert_frozen()
+ if isinstance(up_to, Occurrence):
+ assert self < up_to
+ occ = self
+ while occ != up_to:
+ yield occ
+ occ = occ.next
+
+ def precedes(self, occurrence):
+ assert isinstance(occurrence, Occurrence)
+ return any(occ is self for occ in occurrence.preceding())
+
+ def follows(self, occurrence):
+ assert isinstance(occurrence, Occurrence)
+ return any(occ is self for occ in occurrence.following())
+
+ def realizes(self, expectation):
+ assert isinstance(expectation, Expectation)
+ return expectation.is_realized_by(self)
+
+ def await_following(self, expectation):
+ assert isinstance(expectation, Expectation)
+ expectation = self >> expectation
+ return self.timeline.wait_for(expectation)
+
+ def is_realized_by(self, other):
+ return self is other
+
+ def __lt__(self, occurrence):
+ return self.precedes(occurrence)
+
+ def __gt__(self, occurrence):
+ return self.follows(occurrence)
+
+ def __le__(self, occurrence):
+ return self is occurrence or self < occurrence
+
+ def __ge__(self, occurrence):
+ return self is occurrence or self > occurrence
+
+ def __rshift__(self, expectation):
+ assert isinstance(expectation, Expectation)
+ return expectation.after(self)
+
+ def __hash__(self):
+ return hash(id(self))
+
+ def __data__(self):
+ return self.circumstances
+
+ def __repr__(self):
+ return '%d!%s%r' % (self.index, self.circumstances[0], self.circumstances[1:])
\ No newline at end of file